Async Programming

Introduction to Async/Await

Asynchronous programming allows your program to handle multiple tasks concurrently without blocking.

import asyncio
import time

# Synchronous version (blocking)
def sync_task(name, duration):
    print(f"Starting {name}")
    time.sleep(duration)  # Blocks the entire program
    print(f"Finished {name}")
    return f"Result from {name}"

# Asynchronous version (non-blocking)
async def async_task(name, duration):
    print(f"Starting {name}")
    await asyncio.sleep(duration)  # Yields control to other tasks
    print(f"Finished {name}")
    return f"Result from {name}"

# Running async functions
async def main():
    # Sequential execution
    start_time = time.time()
    result1 = await async_task("Task 1", 2)
    result2 = await async_task("Task 2", 1)
    print(f"Sequential time: {time.time() - start_time:.2f}s")
    
    # Concurrent execution
    start_time = time.time()
    results = await asyncio.gather(
        async_task("Task A", 2),
        async_task("Task B", 1),
        async_task("Task C", 1.5)
    )
    print(f"Concurrent time: {time.time() - start_time:.2f}s")
    print(f"Results: {results}")

# Run the async function
asyncio.run(main())

Creating and Managing Tasks

import asyncio

async def fetch_data(url, delay):
    """Simulate fetching data from a URL"""
    print(f"Fetching {url}...")
    await asyncio.sleep(delay)
    return f"Data from {url}"

async def process_data(data):
    """Simulate processing data"""
    print(f"Processing {data}...")
    await asyncio.sleep(0.5)
    return f"Processed: {data}"

async def main():
    # Create tasks
    task1 = asyncio.create_task(fetch_data("api.example.com/users", 2))
    task2 = asyncio.create_task(fetch_data("api.example.com/posts", 1))
    task3 = asyncio.create_task(fetch_data("api.example.com/comments", 1.5))
    
    # Wait for all tasks to complete
    results = await asyncio.gather(task1, task2, task3)
    
    # Process results
    processed_results = []
    for result in results:
        processed = await process_data(result)
        processed_results.append(processed)
    
    return processed_results

# Alternative: using TaskGroup (Python 3.11+)
async def main_with_taskgroup():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data("api.example.com/users", 2))
        task2 = tg.create_task(fetch_data("api.example.com/posts", 1))
        task3 = tg.create_task(fetch_data("api.example.com/comments", 1.5))
    
    # All tasks completed successfully
    results = [task1.result(), task2.result(), task3.result()]
    return results

Async Context Managers and Iterators

# Async context manager
class AsyncDatabaseConnection:
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(0.1)  # Simulate connection time
        print("Connected!")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.1)  # Simulate cleanup time
        print("Connection closed!")
    
    async def query(self, sql):
        print(f"Executing: {sql}")
        await asyncio.sleep(0.2)  # Simulate query time
        return f"Results for: {sql}"

# Using async context manager
async def database_example():
    async with AsyncDatabaseConnection() as db:
        result1 = await db.query("SELECT * FROM users")
        result2 = await db.query("SELECT * FROM posts")
        return [result1, result2]

# Async iterator
class AsyncRange:
    def __init__(self, start, stop):
        self.start = start
        self.stop = stop
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.start >= self.stop:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.1)  # Simulate async work
        current = self.start
        self.start += 1
        return current

# Using async iterator
async def async_iterator_example():
    async for num in AsyncRange(1, 5):
        print(f"Got number: {num}")

# Async generator
async def async_fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        await asyncio.sleep(0.1)  # Simulate async work
        yield a
        a, b = b, a + b

async def async_generator_example():
    async for fib in async_fibonacci(10):
        print(f"Fibonacci: {fib}")

Error Handling in Async Code

import asyncio
import random

async def unreliable_task(name):
    """Task that might fail randomly"""
    await asyncio.sleep(1)
    if random.random() < 0.3:
        raise Exception(f"Task {name} failed!")
    return f"Success from {name}"

async def error_handling_example():
    tasks = []
    
    # Create multiple tasks
    for i in range(5):
        task = asyncio.create_task(unreliable_task(f"Task-{i}"))
        tasks.append(task)
    
    # Handle errors individually
    results = []
    for task in tasks:
        try:
            result = await task
            results.append(result)
        except Exception as e:
            print(f"Error: {e}")
            results.append(None)
    
    return results

# Using asyncio.gather with return_exceptions
async def gather_with_exceptions():
    tasks = [unreliable_task(f"Task-{i}") for i in range(5)]
    
    # return_exceptions=True prevents gather from raising on first exception
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

# Timeout handling
async def timeout_example():
    try:
        # Wait maximum 2 seconds
        result = await asyncio.wait_for(
            unreliable_task("slow-task"), 
            timeout=2.0
        )
        print(f"Result: {result}")
    except asyncio.TimeoutError:
        print("Task timed out!")
    except Exception as e:
        print(f"Task failed: {e}")

Async Synchronization Primitives

# Async Lock
async def shared_resource_example():
    lock = asyncio.Lock()
    shared_counter = 0
    
    async def increment_counter(name):
        nonlocal shared_counter
        for _ in range(5):
            async with lock:  # Only one coroutine can access at a time
                current = shared_counter
                await asyncio.sleep(0.1)  # Simulate work
                shared_counter = current + 1
                print(f"{name}: {shared_counter}")
    
    # Run multiple coroutines concurrently
    await asyncio.gather(
        increment_counter("Worker-1"),
        increment_counter("Worker-2"),
        increment_counter("Worker-3")
    )

# Async Semaphore (limit concurrent access)
async def semaphore_example():
    # Allow maximum 2 concurrent downloads
    semaphore = asyncio.Semaphore(2)
    
    async def download_file(url):
        async with semaphore:
            print(f"Starting download: {url}")
            await asyncio.sleep(2)  # Simulate download
            print(f"Finished download: {url}")
            return f"Content from {url}"
    
    urls = [f"file{i}.txt" for i in range(5)]
    tasks = [download_file(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

# Async Event
async def event_example():
    event = asyncio.Event()
    
    async def waiter(name):
        print(f"{name} waiting for event...")
        await event.wait()
        print(f"{name} received event!")
    
    async def setter():
        await asyncio.sleep(2)
        print("Setting event...")
        event.set()
    
    # Start waiters and setter
    await asyncio.gather(
        waiter("Waiter-1"),
        waiter("Waiter-2"),
        waiter("Waiter-3"),
        setter()
    )

Real-World Async Patterns

# Producer-Consumer pattern with async queue
async def producer_consumer_example():
    queue = asyncio.Queue(maxsize=5)
    
    async def producer(name, items):
        for item in items:
            await queue.put(f"{name}-{item}")
            print(f"Produced: {name}-{item}")
            await asyncio.sleep(0.1)
        await queue.put(None)  # Sentinel value
    
    async def consumer(name):
        while True:
            item = await queue.get()
            if item is None:
                await queue.put(None)  # Pass sentinel to other consumers
                break
            print(f"{name} consumed: {item}")
            await asyncio.sleep(0.2)  # Simulate processing
            queue.task_done()
    
    # Start producers and consumers
    await asyncio.gather(
        producer("Producer-1", range(5)),
        producer("Producer-2", range(5, 10)),
        consumer("Consumer-1"),
        consumer("Consumer-2")
    )

# Rate limiting with async
class AsyncRateLimiter:
    def __init__(self, max_calls, time_window):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        async with self.lock:
            now = asyncio.get_event_loop().time()
            # Remove old calls
            self.calls = [call_time for call_time in self.calls 
                         if now - call_time < self.time_window]
            
            if len(self.calls) >= self.max_calls:
                sleep_time = self.time_window - (now - self.calls[0])
                await asyncio.sleep(sleep_time)
                return await self.acquire()
            
            self.calls.append(now)

async def rate_limited_api_calls():
    limiter = AsyncRateLimiter(max_calls=3, time_window=5)
    
    async def api_call(i):
        await limiter.acquire()
        print(f"Making API call {i}")
        await asyncio.sleep(0.1)
        return f"Response {i}"
    
    tasks = [api_call(i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    return results
⚠️ Common Pitfalls:
💡 When to Use Async:
← Generators Back to Home