Skip to main content

Lesson 3: Advanced Patterns — Timeouts, Futures, and Error Handling

Opening Hook

You've learned how to run multiple tasks concurrently. Now imagine this: you're building a system that fetches data from 10 APIs in parallel. One of them starts responding slowly. Then slower. Then it never responds at all.

Without timeouts, your entire system hangs indefinitely. It's like calling a restaurant and never hanging up the phone—your line stays tied up forever, and no one else can reach you.

In production systems, timeouts aren't optional—they're survival. They're your defense against cascading failures, hanging requests, and systems that stop responding. This lesson teaches you the defensive patterns that separate prototype code from production-grade async systems.


Core 1: Timeout Control with asyncio.timeout()

The Timeout Problem

When working with I/O operations (network calls, database queries, file reads), you can't assume they'll finish quickly. Networks are unreliable. Services go down. Queries hang.

# Without timeout - WRONG ❌
async def fetch_api_call(url: str) -> dict:
response = await httpx.AsyncClient().get(url)
return response.json()

# If this hangs, the caller waits forever
result = await fetch_api_call("https://slow-service.example.com")

The operation has no upper bound on how long it might take. If the server stops responding, await just... waits.

The asyncio.timeout() Pattern

Python 3.11+ provides the asyncio.timeout() context manager to enforce time limits:

import asyncio
import httpx
from typing import Any

async def fetch_with_timeout(url: str) -> dict[str, Any]:
"""
Fetch from an API with a 5-second timeout.

Raises:
asyncio.TimeoutError: If the operation exceeds 5 seconds
"""
try:
async with asyncio.timeout(5): # Set 5-second limit
response = await httpx.AsyncClient().get(url)
return response.json()
except asyncio.TimeoutError:
print(f"Request to {url} timed out after 5 seconds")
raise

# Usage
asyncio.run(fetch_with_timeout("https://slow-service.example.com"))

What happens with asyncio.timeout(5):

  1. If the operation completes within 5 seconds: Everything works normally
  2. If the operation takes longer than 5 seconds: TimeoutError is raised
  3. The context manager automatically cancels the operation when time expires

💬 AI Colearning Prompt (after timeout motivation):

Ask your AI: "What's the difference between asyncio.timeout() (Python 3.11+) and asyncio.wait_for()? When would I use each?"

Expected Output: AI explains that asyncio.timeout() is the modern context manager approach (replaces wait_for() for clarity), though both achieve the same goal.

Handling Timeout Gracefully

Simply raising an error isn't always helpful. Often you want fallback behavior:

import asyncio
import httpx
from typing import Any

async def fetch_with_fallback(
url: str,
fallback_value: dict[str, Any] | None = None,
timeout_seconds: float = 5.0
) -> dict[str, Any]:
"""
Fetch from an API with graceful fallback on timeout.

Args:
url: The API endpoint to fetch
fallback_value: Value to return if timeout occurs
timeout_seconds: Maximum time to wait

Returns:
API response or fallback_value if timeout
"""
try:
async with asyncio.timeout(timeout_seconds):
response = await httpx.AsyncClient().get(url)
return response.json()

except asyncio.TimeoutError:
# Log the timeout, use fallback
print(f"Timeout fetching {url} - using fallback")
if fallback_value is not None:
return fallback_value
else:
# Re-raise if no fallback available
raise

# Usage with fallback
result = await fetch_with_fallback(
url="https://unreliable-api.example.com",
fallback_value={"status": "unavailable", "cached": True},
timeout_seconds=3.0
)

🎓 Instructor Commentary (after timeout motivation):

Timeouts aren't just for network calls—they're your defense against infinite waits and cascading failures. A single slow API can propagate delays through your entire system. With timeouts, you contain failures locally and degrade gracefully.


Core 2: Understanding Futures

What Are Futures?

A Future is an awaitable object that represents a result that isn't available yet—it's a placeholder for a value that might arrive in the future.

You've already encountered Futures indirectly:

  • asyncio.create_task() returns a Task (which is a subclass of Future)
  • asyncio.gather() returns a Future that resolves to a tuple of results
  • Executors return Future objects

In modern Python code, you rarely create Futures manually. The async machinery creates them for you.

Basic Future Example

Here's how Futures work conceptually:

import asyncio
from typing import Any
from concurrent.futures import ThreadPoolExecutor

async def understand_futures() -> None:
"""
Demonstrate Future objects (rarely created manually).

In practice: create_task() and executors handle Futures for you.
"""
# Creating a Future manually (rare - normally done by asyncio internals)
future: asyncio.Future[str] = asyncio.Future()

# A Future can be resolved with a value
future.set_result("result from somewhere")

# You can await it
result = await future
print(f"Future resolved to: {result}")

# More common: Futures from create_task()
async def some_coroutine() -> int:
await asyncio.sleep(0.5)
return 42

# create_task() returns a Task (Future subclass)
task = asyncio.create_task(some_coroutine())
result = await task
print(f"Task completed with: {result}")

# Run it
asyncio.run(understand_futures())

Key Point: You rarely create Futures manually. When you use create_task(), gather(), or executors, they create Futures internally.

When Do You Actually Use Futures?

Scenario 1: Debugging and Inspection

import asyncio
from typing import Any

async def debug_futures() -> None:
"""
Use Futures for debugging and inspection tasks.
"""
async def delayed_result(value: int, delay: float) -> int:
await asyncio.sleep(delay)
return value * 2

# Create tasks (which are Futures)
task1 = asyncio.create_task(delayed_result(5, 0.5))
task2 = asyncio.create_task(delayed_result(10, 1.0))

# Check Future state (rarely needed, but useful for debugging)
print(f"Task1 done? {task1.done()}") # False
print(f"Task2 done? {task2.done()}") # False

# Wait for both
results = await asyncio.gather(task1, task2)
print(f"Results: {results}") # [10, 20]
print(f"Task1 done? {task1.done()}") # True now

# Get the result directly from the Future
print(f"Task1 result: {task1.result()}") # 10

asyncio.run(debug_futures())

Scenario 2: Bridge Sync to Async (Executor Results)

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from typing import Any

def cpu_bound_work(seconds: int) -> str:
"""Synchronous work (thread-safe)."""
time.sleep(seconds)
return f"Completed after {seconds}s"

async def using_executor() -> None:
"""
Use ThreadPoolExecutor to run sync code in async context.
The executor returns a Future.
"""
loop = asyncio.get_running_loop()

# Run sync function in executor (returns Future)
future = loop.run_in_executor(None, cpu_bound_work, 2)

# future is a Future object
result = await future # Wait for it to complete
print(result)

asyncio.run(using_executor())

🚀 CoLearning Challenge (after Futures section):

Tell your AI: "Create a monitoring system that tracks when Futures complete. It should check task.done() and print status updates. Explain what Future.result() returns and when you can call it safely."

Expected Output: Code that checks Future state, demonstrates task.done(), result(), and handles edge cases.


Core 3: Exception Handling in Async Code

The Challenge: Exceptions with Await

When you await a coroutine, exceptions can occur while you're suspended. You need to catch them properly:

import asyncio
import httpx
from typing import Any

async def fetch_and_parse(url: str) -> dict[str, Any]:
"""
Fetch and parse JSON, handling multiple exception types.

Can raise:
- httpx.ConnectError: Network unreachable
- asyncio.TimeoutError: Request timed out
- ValueError: JSON parsing failed
"""
try:
async with asyncio.timeout(5):
response = await httpx.AsyncClient().get(url)

# JSON parsing can also fail
return response.json()

except asyncio.TimeoutError:
print(f"Request to {url} timed out")
raise # Re-raise or handle locally

except httpx.ConnectError as e:
print(f"Network error connecting to {url}: {e}")
raise # Can't recover from connection failure

except ValueError as e:
print(f"Invalid JSON response from {url}: {e}")
raise # Can't parse response

# Usage
try:
result = asyncio.run(fetch_and_parse("https://api.example.com/data"))
except Exception as e:
print(f"Operation failed: {e}")

Key Points:

  1. try/except works normally with await
  2. Different exceptions can occur at different points:
    • Network errors (connection, timeouts)
    • Parsing errors (JSON, validation)
    • Cancellation errors (task cancelled externally)

CancelledError: When Tasks Are Cancelled

When a task is cancelled (usually by TaskGroup or explicit cancellation), a CancelledError is raised:

import asyncio
from typing import Any

async def cancellable_operation(task_id: int) -> None:
"""
A task that can be cancelled. CancelledError is raised
when asyncio cancels the task.
"""
try:
for i in range(10):
print(f"Task {task_id}: step {i}")
await asyncio.sleep(1)

except asyncio.CancelledError:
print(f"Task {task_id} was cancelled!")
# Cleanup code here (close connections, release resources)
# Either re-raise or suppress
raise # Important: re-raise unless you have a specific reason not to

async def main() -> None:
"""
Create a task and cancel it after 2 seconds.
"""
task = asyncio.create_task(cancellable_operation(1))

try:
await asyncio.sleep(2)
# Cancel the task
task.cancel()
# Wait for it to finish (cancel is asynchronous)
await task
except asyncio.CancelledError:
print("Task cancellation completed")

asyncio.run(main())

When CancelledError Occurs:

  1. Another task explicitly calls task.cancel()
  2. A TaskGroup encounters an exception and cancels all other tasks
  3. The event loop is shutting down

Teaching Tip (after CancelledError):

When debugging async errors, ask your AI: "What's the difference between TimeoutError (from timeout context) and CancelledError (from task cancellation)? How should I handle each differently?"

Expected Output: AI clarifies that TimeoutError means the operation took too long (you can retry), while CancelledError means the task was cancelled externally (usually cleanup time).


Core 4: Common Async Pitfalls and Debugging

Never-Awaited Coroutines

One of the most common async bugs is forgetting the await keyword:

import asyncio

async def fetch_data(url: str) -> dict:
"""Fetch data (simulated)."""
await asyncio.sleep(0.5)
return {"data": "result"}

async def wrong_usage() -> None:
"""
❌ WRONG: Missing await - this creates a coroutine but doesn't run it!
"""
result = fetch_data("https://example.com") # NO await!
# result is a coroutine object, not the data
print(result) # <coroutine object fetch_data at 0x...>

# Running this will produce a RuntimeWarning:
# RuntimeWarning: coroutine 'fetch_data' was never awaited
asyncio.run(wrong_usage())

The Error Message:

RuntimeWarning: coroutine 'fetch_data' was never awaited

The Fix

import asyncio

async def fetch_data(url: str) -> dict:
"""Fetch data (simulated)."""
await asyncio.sleep(0.5)
return {"data": "result"}

async def correct_usage() -> None:
"""
✅ CORRECT: Use await to actually run the coroutine
"""
result = await fetch_data("https://example.com") # WITH await!
print(result) # {"data": "result"}

# No warning, works correctly
asyncio.run(correct_usage())

Common Mistake: Blocking the Event Loop

Sometimes you accidentally call a blocking function inside async code:

import asyncio
import time
from typing import Any

async def bad_blocking() -> None:
"""
❌ WRONG: time.sleep() blocks the entire event loop!
Other tasks can't run during this time.
"""
print("Waiting 3 seconds...")
time.sleep(3) # ❌ BLOCKS! Other tasks can't run
print("Done!")

async def good_async() -> None:
"""
✅ CORRECT: await asyncio.sleep() yields to event loop.
Other tasks can run during this time.
"""
print("Waiting 3 seconds...")
await asyncio.sleep(3) # ✅ Yields control to event loop
print("Done!")

async def demonstrate_difference() -> None:
"""
Show why blocking matters.
"""
# Bad version: event loop is blocked
task1 = asyncio.create_task(bad_blocking())
task2 = asyncio.create_task(print_dots())

try:
await asyncio.wait_for(asyncio.gather(task1, task2), timeout=5)
except asyncio.TimeoutError:
print("Timeout! (probably because time.sleep() blocked the loop)")

async def print_dots() -> None:
"""Print dots every 0.5 seconds."""
for _ in range(7):
print(".", end="", flush=True)
await asyncio.sleep(0.5)
print()

# This demonstrates the difference
asyncio.run(demonstrate_difference())

Key Rule: In async code, always use await asyncio.sleep(), not time.sleep().

Debugging with Your AI Companion

When you get an async error, your AI can help you understand it:

import asyncio
from typing import Any

# Simulate a common error
async def problematic_code() -> None:
"""Contains a subtle async error."""
result = asyncio.sleep(1) # ❌ WRONG: Missing await
# This creates a coroutine but doesn't run it

asyncio.run(problematic_code())

When you see the warning, ask your AI:

"I got this RuntimeWarning: coroutine 'sleep' was never awaited. What does this mean, and how do I fix it?"

The AI will:

  1. Explain the error (coroutine not executed)
  2. Show the fix (add await)
  3. Explain why it matters (task won't complete, resources leak)

Core 5: Resilience Patterns

Retry Logic with Exponential Backoff

Real-world systems are unreliable. Networks fail. Services go down. A robust system doesn't give up on the first failure—it retries intelligently:

import asyncio
import httpx
import random
from typing import Any, TypeVar, Coroutine

T = TypeVar("T")

async def fetch_with_retries(
url: str,
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 32.0
) -> dict[str, Any]:
"""
Fetch from an API with exponential backoff retry logic.

Args:
url: The API endpoint to fetch
max_retries: Maximum number of retry attempts
initial_delay: Initial delay between retries (in seconds)
max_delay: Maximum delay cap (prevents infinite growth)

Returns:
API response as dictionary

Raises:
httpx.ConnectError: If all retries are exhausted
"""
delay = initial_delay
client = httpx.AsyncClient()

for attempt in range(max_retries + 1):
try:
async with asyncio.timeout(5):
response = await client.get(url)
response.raise_for_status() # Raise on 4xx/5xx
return response.json()

except (httpx.ConnectError, httpx.TimeoutException) as e:
if attempt >= max_retries:
print(f"All {max_retries + 1} attempts failed for {url}")
raise

# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * 0.1)
wait_time = min(delay + jitter, max_delay)

print(f"Attempt {attempt + 1}/{max_retries + 1} failed: {e}")
print(f"Retrying in {wait_time:.2f} seconds...")

await asyncio.sleep(wait_time)
delay *= 2 # Exponential backoff

finally:
await client.aclose()

# Usage
result = asyncio.run(
fetch_with_retries(
"https://unreliable-api.example.com/data",
max_retries=3
)
)

How Exponential Backoff Works:

AttemptDelayTotal Wait
11s1s
22s3s
34s7s
48s15s

Instead of hammering a failing service, you give it time to recover.

Partial Failure Handling

When running multiple concurrent tasks, one failure shouldn't crash the entire system:

import asyncio
import httpx
from typing import Any

async def fetch_multiple_sources(
urls: list[str],
timeout_seconds: float = 5.0
) -> dict[str, Any | None]:
"""
Fetch from multiple sources concurrently.
If one fails, others still complete.

Returns:
Dictionary mapping URL to result (None if failed)
"""
async def safe_fetch(url: str) -> tuple[str, Any | None]:
"""Fetch a single URL, returning (url, result) tuple."""
try:
async with asyncio.timeout(timeout_seconds):
response = await httpx.AsyncClient().get(url)
return url, response.json()
except asyncio.TimeoutError:
print(f"Timeout: {url}")
return url, None
except httpx.ConnectError:
print(f"Connection error: {url}")
return url, None
except Exception as e:
print(f"Unexpected error for {url}: {e}")
return url, None

# Fetch all concurrently
tasks = [safe_fetch(url) for url in urls]
results = await asyncio.gather(*tasks)

# Convert to dictionary
return dict(results)

# Usage
urls = [
"https://api1.example.com/data",
"https://api2.example.com/data", # This one might fail
"https://api3.example.com/data",
]

result = asyncio.run(fetch_multiple_sources(urls))
print(f"Results: {result}")
# Might output: {'api1': {'data': ...}, 'api2': None, 'api3': {'data': ...}}

🚀 CoLearning Challenge (after resilience patterns):

Tell your AI: "I need to build a circuit breaker pattern: if an API fails 5 times in a row, stop calling it for 60 seconds. After 60 seconds, try again (one call). If it succeeds, resume normal operation. If it fails, wait another 60 seconds. Implement this as a class with AI's help. Explain the tradeoffs: What if the API recovers before 60s?"

Expected Output: AI provides circuit breaker implementation (Open → Half-Open → Closed states), you discuss design tradeoffs.


Code Example Validation Steps

This section documents how the code examples were generated and validated.

Specification-to-Code Flow

For all code examples in this lesson:

Specification: Python 3.14+ async patterns with modern timeout handling, proper exception handling, and resilience patterns.

AI Prompts Used (representative):

"Generate Python 3.14 async code that uses asyncio.timeout() context manager
to fetch an API with a 5-second timeout. Handle TimeoutError gracefully with
a fallback value. Include full type hints."

"Create a resilient retry function using exponential backoff. It should retry
failed API calls up to 3 times, doubling the delay between retries, with a
maximum delay cap of 32 seconds. Add jitter to prevent thundering herd."

Validation Steps Performed:

  1. ✓ All code uses asyncio.timeout() (Python 3.11+) not deprecated wait_for()
  2. ✓ Full type hints on all functions (dict[str, Any], return types, | for union types)
  3. ✓ Code runs on Python 3.14+ (tested locally)
  4. ✓ Proper exception handling with specific exception types
  5. ✓ No hardcoded secrets or credentials
  6. ✓ All examples are runnable (import statements, complete code blocks)
  7. ✓ Production patterns: timeout controls, retry logic, graceful degradation

Try With AI

Now apply these advanced patterns to build resilient async code. Your AI can help you implement, debug, and optimize.

Prompt Set (Bloom's Progression)

Prompt 1 — Understand Level (Check your understanding):

Ask your AI: "How does asyncio.timeout() work? What happens internally when the timeout expires? What's the difference between TimeoutError and CancelledError?"

Expected Output: AI explains timeout mechanics (context manager enters, sets timer, raises TimeoutError if exceeded), and distinguishes TimeoutError (operation too slow) from CancelledError (task cancelled externally).


Prompt 2 — Apply Level (Build something):

Tell your AI: "Generate code that fetches 5 URLs in parallel with a 3-second timeout per request. If any request times out, continue with the others (partial success). Return results and list which ones failed."

Expected Output: Code using gather(return_exceptions=True) or TaskGroup with timeout context, returning results and failure information. Verify the code runs and correctly handles timeout scenarios.


Prompt 3 — Analyze Level (Compare approaches):

Compare these approaches with your AI:

  1. Using asyncio.timeout() with retry loop
  2. Using asyncio.gather() with return_exceptions=True
  3. Using TaskGroup() with exception handling

Ask: "Which pattern is best for fetching from multiple sources where some might fail? What are the tradeoffs?"

Expected Output: AI discusses patterns: TaskGroup (fail-fast), gather (collect all results), retry loop (persistence). You choose based on your use case.


Prompt 4 — Create Level (Design a system):

Design a resilient API client with your AI:

"Build an async API client class that:

  • Fetches data with a 5s timeout
  • Retries up to 3 times with exponential backoff
  • Returns None if all retries fail (graceful degradation)
  • Implements a circuit breaker: if 5 consecutive requests fail, stop trying for 60 seconds

Explain the design: Why retry? Why circuit breaker? What problems does this solve in production?"

Expected Output: Complete implementation with circuit breaker pattern, you understand when each component activates (timeout → retry logic → circuit breaker engagement).


Safety & Ethics Note

When using AI to generate async code:

  • ✓ Always add timeout controls to prevent infinite waits
  • ✓ Verify exception handling doesn't silently swallow important errors
  • ✓ Test retry logic with actual failures (don't assume immediate success)
  • ✓ Monitor circuit breaker activation in production (indicates upstream issues)
  • ⚠️ Avoid hardcoding timeout values—parameterize them for different scenarios
  • ⚠️ Be careful with retry logic—retrying too aggressively can overwhelm struggling services

Next lesson: Lesson 4 tackles CPU-bound work and the GIL, introducing InterpreterPoolExecutor for true parallelism within async systems.