Asynchronous programming allows your program to handle multiple tasks concurrently without blocking.
import asyncio
import time
# Synchronous version (blocking)
def sync_task(name, duration):
print(f"Starting {name}")
time.sleep(duration) # Blocks the entire program
print(f"Finished {name}")
return f"Result from {name}"
# Asynchronous version (non-blocking)
async def async_task(name, duration):
print(f"Starting {name}")
await asyncio.sleep(duration) # Yields control to other tasks
print(f"Finished {name}")
return f"Result from {name}"
# Running async functions
async def main():
# Sequential execution
start_time = time.time()
result1 = await async_task("Task 1", 2)
result2 = await async_task("Task 2", 1)
print(f"Sequential time: {time.time() - start_time:.2f}s")
# Concurrent execution
start_time = time.time()
results = await asyncio.gather(
async_task("Task A", 2),
async_task("Task B", 1),
async_task("Task C", 1.5)
)
print(f"Concurrent time: {time.time() - start_time:.2f}s")
print(f"Results: {results}")
# Run the async function
asyncio.run(main())
import asyncio
async def fetch_data(url, delay):
"""Simulate fetching data from a URL"""
print(f"Fetching {url}...")
await asyncio.sleep(delay)
return f"Data from {url}"
async def process_data(data):
"""Simulate processing data"""
print(f"Processing {data}...")
await asyncio.sleep(0.5)
return f"Processed: {data}"
async def main():
# Create tasks
task1 = asyncio.create_task(fetch_data("api.example.com/users", 2))
task2 = asyncio.create_task(fetch_data("api.example.com/posts", 1))
task3 = asyncio.create_task(fetch_data("api.example.com/comments", 1.5))
# Wait for all tasks to complete
results = await asyncio.gather(task1, task2, task3)
# Process results
processed_results = []
for result in results:
processed = await process_data(result)
processed_results.append(processed)
return processed_results
# Alternative: using TaskGroup (Python 3.11+)
async def main_with_taskgroup():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data("api.example.com/users", 2))
task2 = tg.create_task(fetch_data("api.example.com/posts", 1))
task3 = tg.create_task(fetch_data("api.example.com/comments", 1.5))
# All tasks completed successfully
results = [task1.result(), task2.result(), task3.result()]
return results
# Async context manager
class AsyncDatabaseConnection:
async def __aenter__(self):
print("Connecting to database...")
await asyncio.sleep(0.1) # Simulate connection time
print("Connected!")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.1) # Simulate cleanup time
print("Connection closed!")
async def query(self, sql):
print(f"Executing: {sql}")
await asyncio.sleep(0.2) # Simulate query time
return f"Results for: {sql}"
# Using async context manager
async def database_example():
async with AsyncDatabaseConnection() as db:
result1 = await db.query("SELECT * FROM users")
result2 = await db.query("SELECT * FROM posts")
return [result1, result2]
# Async iterator
class AsyncRange:
def __init__(self, start, stop):
self.start = start
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.start >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simulate async work
current = self.start
self.start += 1
return current
# Using async iterator
async def async_iterator_example():
async for num in AsyncRange(1, 5):
print(f"Got number: {num}")
# Async generator
async def async_fibonacci(n):
a, b = 0, 1
for _ in range(n):
await asyncio.sleep(0.1) # Simulate async work
yield a
a, b = b, a + b
async def async_generator_example():
async for fib in async_fibonacci(10):
print(f"Fibonacci: {fib}")
import asyncio
import random
async def unreliable_task(name):
"""Task that might fail randomly"""
await asyncio.sleep(1)
if random.random() < 0.3:
raise Exception(f"Task {name} failed!")
return f"Success from {name}"
async def error_handling_example():
tasks = []
# Create multiple tasks
for i in range(5):
task = asyncio.create_task(unreliable_task(f"Task-{i}"))
tasks.append(task)
# Handle errors individually
results = []
for task in tasks:
try:
result = await task
results.append(result)
except Exception as e:
print(f"Error: {e}")
results.append(None)
return results
# Using asyncio.gather with return_exceptions
async def gather_with_exceptions():
tasks = [unreliable_task(f"Task-{i}") for i in range(5)]
# return_exceptions=True prevents gather from raising on first exception
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
# Timeout handling
async def timeout_example():
try:
# Wait maximum 2 seconds
result = await asyncio.wait_for(
unreliable_task("slow-task"),
timeout=2.0
)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Task timed out!")
except Exception as e:
print(f"Task failed: {e}")
# Async Lock
async def shared_resource_example():
lock = asyncio.Lock()
shared_counter = 0
async def increment_counter(name):
nonlocal shared_counter
for _ in range(5):
async with lock: # Only one coroutine can access at a time
current = shared_counter
await asyncio.sleep(0.1) # Simulate work
shared_counter = current + 1
print(f"{name}: {shared_counter}")
# Run multiple coroutines concurrently
await asyncio.gather(
increment_counter("Worker-1"),
increment_counter("Worker-2"),
increment_counter("Worker-3")
)
# Async Semaphore (limit concurrent access)
async def semaphore_example():
# Allow maximum 2 concurrent downloads
semaphore = asyncio.Semaphore(2)
async def download_file(url):
async with semaphore:
print(f"Starting download: {url}")
await asyncio.sleep(2) # Simulate download
print(f"Finished download: {url}")
return f"Content from {url}"
urls = [f"file{i}.txt" for i in range(5)]
tasks = [download_file(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Async Event
async def event_example():
event = asyncio.Event()
async def waiter(name):
print(f"{name} waiting for event...")
await event.wait()
print(f"{name} received event!")
async def setter():
await asyncio.sleep(2)
print("Setting event...")
event.set()
# Start waiters and setter
await asyncio.gather(
waiter("Waiter-1"),
waiter("Waiter-2"),
waiter("Waiter-3"),
setter()
)
# Producer-Consumer pattern with async queue
async def producer_consumer_example():
queue = asyncio.Queue(maxsize=5)
async def producer(name, items):
for item in items:
await queue.put(f"{name}-{item}")
print(f"Produced: {name}-{item}")
await asyncio.sleep(0.1)
await queue.put(None) # Sentinel value
async def consumer(name):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # Pass sentinel to other consumers
break
print(f"{name} consumed: {item}")
await asyncio.sleep(0.2) # Simulate processing
queue.task_done()
# Start producers and consumers
await asyncio.gather(
producer("Producer-1", range(5)),
producer("Producer-2", range(5, 10)),
consumer("Consumer-1"),
consumer("Consumer-2")
)
# Rate limiting with async
class AsyncRateLimiter:
def __init__(self, max_calls, time_window):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = asyncio.get_event_loop().time()
# Remove old calls
self.calls = [call_time for call_time in self.calls
if now - call_time < self.time_window]
if len(self.calls) >= self.max_calls:
sleep_time = self.time_window - (now - self.calls[0])
await asyncio.sleep(sleep_time)
return await self.acquire()
self.calls.append(now)
async def rate_limited_api_calls():
limiter = AsyncRateLimiter(max_calls=3, time_window=5)
async def api_call(i):
await limiter.acquire()
print(f"Making API call {i}")
await asyncio.sleep(0.1)
return f"Response {i}"
tasks = [api_call(i) for i in range(10)]
results = await asyncio.gather(*tasks)
return results
time.sleep()
in async functions - use await asyncio.sleep()
await
when calling async functions