Async Programming Skill
File Organization
- SKILL.md: Core principles, patterns, essential security (this file)
- references/security-examples.md: Race condition and resource safety examples
- references/advanced-patterns.md: Advanced async patterns and optimization
Validation Gates
Gate 0.1: Domain Expertise Validation
- Status: PASSED
- Expertise Areas: asyncio, Tokio, race conditions, resource management, concurrent safety
Gate 0.2: Vulnerability Research
- Status: PASSED (3+ issues for MEDIUM-RISK)
- Research Date: 2025-11-20
- Issues: CVE-2024-12254 (asyncio memory), Redis race condition (CVE-2023-28858/9)
Gate 0.11: File Organization Decision
- Decision: Split structure (MEDIUM-RISK, ~400 lines main + references)
1. Overview
Risk Level: MEDIUM
Justification: Async programming introduces race conditions, resource leaks, and timing-based vulnerabilities. While not directly exposed to external attacks, improper async code can cause data corruption, deadlocks, and security-sensitive race conditions like double-spending or TOCTOU (time-of-check-time-of-use).
You are an expert in asynchronous programming patterns for Python (asyncio) and Rust (Tokio). You write concurrent code that is free from race conditions, properly manages resources, and handles errors gracefully.
Core Expertise Areas
- Race condition identification and prevention
- Async resource management (connections, locks, files)
- Error handling in concurrent contexts
- Performance optimization for async workloads
- Graceful shutdown and cancellation
2. Core Principles
- TDD First: Write async tests before implementation using pytest-asyncio
- Performance Aware: Use asyncio.gather, semaphores, and avoid blocking calls
- Identify Race Conditions: Recognize shared state accessed across await points
- Protect Shared State: Use locks, atomic operations, or message passing
- Manage Resources: Ensure cleanup happens even on cancellation
- Handle Errors: Don't let one task's failure corrupt others
- Avoid Deadlocks: Consistent lock ordering, timeouts on locks
Decision Framework
| Situation |
Approach |
| Shared mutable state |
Use asyncio.Lock or RwLock |
| Database transaction |
Use atomic operations, SELECT FOR UPDATE |
| Resource cleanup |
Use async context managers |
| Task coordination |
Use asyncio.Event, Queue, or Semaphore |
| Background tasks |
Track tasks, handle cancellation |
3. Implementation Workflow (TDD)
Step 1: Write Failing Test First
import pytest
import asyncio
@pytest.mark.asyncio
async def test_concurrent_counter_safety():
"""Test counter maintains consistency under concurrent access."""
counter = SafeCounter()
async def increment_many():
for _ in range(100):
await counter.increment()
await asyncio.gather(*[increment_many() for _ in range(10)])
assert await counter.get() == 1000
@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
"""Test resources are cleaned up even when task is cancelled."""
cleanup_called = False
async def task_with_resource():
nonlocal cleanup_called
async with managed_resource() as resource:
await asyncio.sleep(10)
cleanup_called = True
task = asyncio.create_task(task_with_resource())
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert cleanup_called
Step 2: Implement Minimum to Pass
import asyncio
from contextlib import asynccontextmanager
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource)
Step 3: Refactor Following Patterns
Apply performance patterns, add timeouts, improve error handling.
Step 4: Run Full Verification
pytest tests/ -v --asyncio-mode=auto
python -m asyncio debug
pytest tests/ -v -n auto --asyncio-mode=auto
4. Performance Patterns
Pattern 1: asyncio.gather for Concurrency
async def fetch_all_sequential(urls: list[str]) -> list[str]:
results = []
for url in urls:
result = await fetch(url)
results.append(result)
return results
async def fetch_all_concurrent(urls: list[str]) -> list[str]:
return await asyncio.gather(*[fetch(url) for url in urls])
Pattern 2: Semaphores for Rate Limiting
async def fetch_many(urls: list[str]):
return await asyncio.gather(*[fetch(url) for url in urls])
async def fetch_many_limited(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url: str):
async with semaphore:
return await fetch(url)
return await asyncio.gather(*[fetch_with_limit(url) for url in urls])
Pattern 3: Task Groups (Python 3.11+)
async def process_items_manual(items):
tasks = []
for item in items:
task = asyncio.create_task(process(item))
tasks.append(task)
return await asyncio.gather(*tasks)