Lesson 6: Capstone — AI Agent System for Multi-Source Data Processing
Opening Hook: Real AI Systems Are Hybrid
You've learned how asyncio handles concurrency, how InterpreterPoolExecutor enables true parallelism, and how to combine them. But you've studied these patterns in isolation—single API calls, simple parallel tasks.
Now imagine building a real AI system. A production language model agent that needs to:
- Fetch context from multiple sources concurrently — Weather API, News API, knowledge base (all at the same time, not sequentially)
- Process each response in parallel — Simulate LLM inference on each result using multiple CPU cores
- Handle failures gracefully — If one source times out, continue with others
- Aggregate results — Combine partial results into a cohesive response
- Measure performance — Prove that hybrid execution is 2-3x faster than doing everything sequentially
That's what this capstone builds. You'll create a production-style AI agent system that integrates every concept from Lessons 1-5 into one working example. This isn't a learning exercise—it's a pattern you'll use in real applications.
By the end of this lesson, you'll have built a system that:
- Fetches from 3+ sources concurrently using
TaskGroup()(Lesson 2) - Processes results in parallel using
InterpreterPoolExecutor(Lesson 4) - Applies timeout controls using
asyncio.timeout()(Lesson 3) - Handles errors gracefully with try/except patterns (Lesson 3)
- Demonstrates measurable performance improvement (Lesson 5)
Project Overview: The Multi-Source AI Agent
What You're Building
An AI Agent System that simulates how real AI applications work:
┌─────────────────────────────────────────────────────────────┐
│ User Query: "What's happening now?" │
└────────┬──────────────────────────────────────────────────┬──┘
│ │
┌────▼─────┐ ┌──────────────┐ ┌──────────────────┐ │
│ Weather │ │ News │ │ Knowledge Base │ │
│ API │ │ API │ │ Query │ │
│(2s delay)│ │ (1.5s delay) │ │ (3s delay) │ │
└────┬─────┘ └──────┬───────┘ └────────┬─────────┘ │
│ │ │ │
└────────────────┼────────────────────┘ │
│ │
┌─────▼─────────┐ │
│ TaskGroup │ Concurrent Fetching │
│ (All 3 at │ Total: ~3s │
│ once) │ (not 6.5s!) │
└─────┬─────────┘ │
│ │
┌─────────────┼────────────────┐ │
│ │ │ │
┌───▼─┐ ┌──▼──┐ ┌───▼──┐ │
│LLM │ │LLM │ │LLM │ │
│Infer│ │Infer│ │Infer │ │
│(2s) │ │(2s) │ │(2s) │ │
└───┬─┘ └──┬──┘ └───┬──┘ │
│ │ │ │
└────────┬───┴───────────────┘ │
│ │
┌────────▼──────────┐ │
│ InterpreterPool │ Parallel Processing │
│ Executor (3 cores │ Total: ~2s │
│ in parallel) │ (not 6s!) │
└────────┬──────────┘ │
│ │
┌────────────▼─────────────┐ │
│ Aggregate Results │ │
│ Return unified response│ │
└──────────────────────────┘ │
│ │
└─────────────────────────────────────┘
TOTAL TIME: ~5 seconds (with timeouts)
Sequential would take: ~6.5 seconds just for fetching + 6 seconds for inference = 12.5 seconds!
Hybrid saves ~150% time through I/O concurrency + CPU parallelism
System Components
Concurrent Fetchers (TaskGroup handles these):
fetch_weather(city)— Simulates API call with 2s latencyfetch_news(query)— Simulates API call with 1.5s latencyquery_knowledge_base(question)— Simulates API call with 3s latency
Parallel Processor (InterpreterPoolExecutor handles this):
simulate_llm_inference(text)— CPU-intensive text analysis (2s per result)
Main Agent (orchestrates everything):
ai_agent_query(user_question)— Coordinates concurrent fetch + parallel processing
Part 1: System Requirements and Architecture
Functional Requirements
Your system must:
- Fetch from 3 concurrent sources using
TaskGroup()with structured concurrency - Process each result with simulated LLM inference in parallel via
InterpreterPoolExecutor - Apply timeouts:
- Individual API calls: 5 seconds each
- Overall system: 15 seconds total
- Handle partial failures: If one source times out, continue with others (graceful degradation)
- Return aggregated results containing:
- Successful results from all working sources
- Error messages from failed sources
- Total execution time
- Count of parallel cores used
- Demonstrate performance: Complete 2-3x faster than sequential execution
Non-Functional Requirements
- All code uses Python 3.14+ patterns (type hints, modern syntax)
- Error handling is explicit (no silent failures)
- Logging captures execution flow for debugging
- System is resilient (handles timeouts and exceptions gracefully)
Architecture Principles
Use the Graduated Teaching Pattern from Lesson 5:
- Manual Foundation (Lesson 1): Understand sequential execution first
- Concurrent I/O (Lessons 2-3): Replace sequential with TaskGroup
- Parallel Processing (Lesson 4): Add CPU work in InterpreterPoolExecutor
- Integration (Lesson 5): Combine both patterns
- System Design (Lesson 6 - THIS LESSON): Real-world architecture with error handling and optimization
Your capstone applies this progression in one integrated system.
Part 2: Code Structure and Implementation Approach
Skeleton Code: Understanding the Pattern
Here's the structure of the system students implement (with AI guidance):
# File: ai_agent.py
import asyncio
from concurrent.futures import Executor, ProcessPoolExecutor
from typing import Any
import logging
import time
# Configure logging for visibility
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================================================
# PART 1: Simulated Data Sources (Students implement these first)
# ============================================================================
async def fetch_weather(city: str, *, timeout: float = 5.0) -> dict[str, Any]:
"""
Fetch weather data for a city.
Simulates API call with 2-second latency.
Args:
city: City name to fetch weather for
timeout: Maximum wait time in seconds
Returns:
Dictionary with weather data or error message
"""
try:
async with asyncio.timeout(timeout):
logger.info(f"Fetching weather for {city}...")
await asyncio.sleep(2) # Simulate 2s API latency
return {
"source": "weather",
"city": city,
"temp": 72,
"condition": "sunny",
"humidity": 45
}
except asyncio.TimeoutError:
logger.warning(f"Weather API timeout for {city}")
return {"source": "weather", "error": "timeout"}
async def fetch_news(query: str, *, timeout: float = 5.0) -> list[dict[str, Any]]:
"""
Fetch news articles matching query.
Simulates API call with 1.5-second latency.
Args:
query: Search query for news
timeout: Maximum wait time in seconds
Returns:
List of news articles or error dict
"""
try:
async with asyncio.timeout(timeout):
logger.info(f"Fetching news for '{query}'...")
await asyncio.sleep(1.5) # Simulate 1.5s API latency
return [
{
"title": f"Article 1 about {query}",
"source": "news",
"snippet": "Breaking updates...",
"url": f"https://news.example.com/1"
},
{
"title": f"Article 2 about {query}",
"source": "news",
"snippet": "Latest developments...",
"url": f"https://news.example.com/2"
}
]
except asyncio.TimeoutError:
logger.warning(f"News API timeout for '{query}'")
return [{"source": "news", "error": "timeout"}]
async def query_knowledge_base(
question: str,
*,
timeout: float = 5.0
) -> str:
"""
Query knowledge base for information.
Simulates knowledge base lookup with 3-second latency.
Args:
question: Question to search knowledge base for
timeout: Maximum wait time in seconds
Returns:
Knowledge base result or error message
"""
try:
async with asyncio.timeout(timeout):
logger.info(f"Querying knowledge base for '{question}'...")
await asyncio.sleep(3) # Simulate 3s knowledge base latency
return (
f"Knowledge base result for '{question}': "
f"This is contextual information from our knowledge base "
f"containing domain-specific expertise."
)
except asyncio.TimeoutError:
logger.warning(f"Knowledge base query timeout for '{question}'")
return "Knowledge base: timeout"
# ============================================================================
# PART 2: CPU-Intensive Processing (Runs in InterpreterPoolExecutor)
# ============================================================================
def simulate_llm_inference(data: str) -> str:
"""
Simulate LLM inference on input data.
This is CPU-intensive processing that runs in a separate interpreter
via InterpreterPoolExecutor. Each call simulates ~2 seconds of inference.
Args:
data: Input text for the LLM to process
Returns:
Simulated LLM analysis result
"""
start_time = time.time()
# Simulate CPU-intensive work
# (In real application, this would be tokenization + forward pass)
while time.time() - start_time < 2:
# CPU-bound work that actually uses multiple cores with InterpreterPoolExecutor
_ = sum(i**2 for i in range(100000))
# Return simulated inference result
return f"[LLM Analysis] Processed: {data[:80]}... Result: Summary of key insights."
# ============================================================================
# PART 3: Main Agent Orchestration (Students implement this - THE KEY PART)
# ============================================================================
async def ai_agent_query(
user_question: str,
executor: Executor | None = None
) -> dict[str, Any]:
"""
Main AI agent function that orchestrates concurrent fetching and parallel processing.
This is the capstone implementation that integrates:
1. TaskGroup for concurrent API fetching
2. InterpreterPoolExecutor for parallel inference
3. Error handling and timeouts
4. Result aggregation
**Students implement this function with AI guidance using pattern:**
1. Create executor if not provided
2. Start overall system timeout (15 seconds)
3. Use TaskGroup to fetch from all 3 sources concurrently
4. For each successful result, submit to executor for LLM inference
5. Wait for all inference tasks to complete
6. Aggregate and return results
Args:
user_question: Question from user to process
executor: Optional executor for CPU-bound work (created if not provided)
Returns:
Dictionary with aggregated results, errors, and metadata
"""
# Student implementation starts here...
#
# Pseudocode (students fill in real implementation):
#
# start_time = perf_counter()
# loop = asyncio.get_running_loop()
#
# if executor is None:
# executor = ProcessPoolExecutor(max_workers=os.cpu_count())
#
# async with asyncio.timeout(15): # Overall system timeout
# async with asyncio.TaskGroup() as tg:
# # Fetch from all 3 sources concurrently
# weather_task = tg.create_task(fetch_weather("New York"))
# news_task = tg.create_task(fetch_news(user_question))
# kb_task = tg.create_task(query_knowledge_base(user_question))
#
# # Process each result with LLM inference in parallel
# inference_tasks = [
# loop.run_in_executor(executor, simulate_llm_inference, str(result))
# for result in [weather_task.result(), news_task.result(), kb_task.result()]
# ]
#
# inference_results = await asyncio.gather(*inference_tasks, return_exceptions=True)
#
# # Aggregate results...
#
pass # Placeholder for student implementation
# ============================================================================
# PART 4: Performance Benchmarking
# ============================================================================
async def benchmark_sequential(user_question: str) -> tuple[dict[str, Any], float]:
"""
Baseline: Sequential execution (fetch one at a time, process one at a time).
This shows why concurrency and parallelism matter.
"""
start = time.perf_counter()
# Sequential fetching
weather = await fetch_weather("New York")
news = await fetch_news(user_question)
kb = await query_knowledge_base(user_question)
# Sequential processing
executor = ProcessPoolExecutor(max_workers=1) # Only 1 worker = sequential
loop = asyncio.get_running_loop()
result1 = await loop.run_in_executor(executor, simulate_llm_inference, str(weather))
result2 = await loop.run_in_executor(executor, simulate_llm_inference, str(news))
result3 = await loop.run_in_executor(executor, simulate_llm_inference, str(kb))
elapsed = time.perf_counter() - start
return {
"approach": "Sequential",
"results": [result1, result2, result3],
"errors": []
}, elapsed
async def benchmark_hybrid(user_question: str) -> tuple[dict[str, Any], float]:
"""
Optimized: Hybrid execution (concurrent fetching + parallel processing).
This is what students build.
"""
start = time.perf_counter()
result = await ai_agent_query(user_question)
elapsed = time.perf_counter() - start
return result, elapsed
async def run_benchmarks(user_question: str = "What's happening right now?") -> None:
"""
Compare sequential vs hybrid execution.
Students run this to validate that their hybrid system is faster.
"""
print("\n" + "="*70)
print("ASYNCIO CAPSTONE: PERFORMANCE BENCHMARK")
print("="*70)
print(f"Question: {user_question}\n")
# Run sequential baseline
print("Running SEQUENTIAL baseline (fetch one at a time, process one at a time)...")
seq_result, seq_time = await benchmark_sequential(user_question)
print(f"Sequential execution: {seq_time:.2f} seconds")
# Run hybrid optimized
print("\nRunning HYBRID execution (concurrent fetch + parallel processing)...")
hybrid_result, hybrid_time = await benchmark_hybrid(user_question)
print(f"Hybrid execution: {hybrid_time:.2f} seconds")
# Calculate improvement
speedup = seq_time / hybrid_time
improvement_pct = (1 - hybrid_time / seq_time) * 100
print("\n" + "-"*70)
print(f"Speedup: {speedup:.2f}x")
print(f"Time saved: {seq_time - hybrid_time:.2f} seconds ({improvement_pct:.1f}%)")
print("-"*70)
# Validation
if speedup >= 1.4: # Target: 40%+ improvement
print("✓ PERFORMANCE TARGET MET: Hybrid is 40%+ faster than sequential")
else:
print("⚠ Performance below target. Review executor configuration and task distribution.")
print("="*70 + "\n")
# ============================================================================
# ENTRY POINT
# ============================================================================
if __name__ == "__main__":
# Run the benchmark
asyncio.run(run_benchmarks("What is AI-native development?"))
💬 AI Colearning Prompt
"Walk me through how the
ai_agent_query()function coordinates TaskGroup and InterpreterPoolExecutor. Which part runs concurrently (TaskGroup)? Which part runs in parallel (InterpreterPoolExecutor)? Draw a timeline showing when each stage executes."
Part 3: Implementation Walkthrough
Step 1: Understand the Skeleton
Before implementing, study the provided skeleton code above. Notice:
- Three data sources with built-in timeouts
- One CPU-intensive function (
simulate_llm_inference) - Main orchestrator (
ai_agent_query) — this is where you work - Benchmark functions to prove your system is faster
Your job: Implement ai_agent_query() function using TaskGroup and InterpreterPoolExecutor.
🎓 Instructor Commentary
In AI-native development, you don't code from scratch. You understand a pattern, see a skeleton, and ask AI to help fill the gaps. The
ai_agent_query()function is that gap—it's where all concepts combine. You're not memorizing how to use TaskGroup and InterpreterPoolExecutor in isolation. You're architecting a system where both work together, optimally using I/O concurrency and CPU parallelism.
Step 2: Implement Concurrent Fetching with TaskGroup
The first task: fetch from all 3 sources at the same time using structured concurrency.
🚀 CoLearning Challenge
Ask your AI:
"I need to fetch from 3 async sources concurrently and handle timeouts gracefully. The fetchers are: fetch_weather(), fetch_news(), query_knowledge_base(). Show me how to use TaskGroup to fetch all 3 in parallel. If one times out, the system should continue with the others. Explain the TaskGroup pattern step-by-step."
Expected Output: AI shows code using async with asyncio.TaskGroup() as tg: with proper error handling.
Your Validation: Run the fetching code and verify:
- All 3 sources start fetching (logs should show starting almost simultaneously)
- Total fetch time is ~3 seconds (limited by longest source), not 6.5 seconds
- TimeoutError from any source doesn't crash the system
Step 3: Implement Parallel Processing with InterpreterPoolExecutor
Once you have all results, process each with LLM inference in parallel.
✨ Teaching Tip
Use your AI to explore the bridge between async and parallel execution: "How do I use
loop.run_in_executor()to callsimulate_llm_inference()from async code? Why do I need an executor instead of just calling the function directly?"
Pattern you're building:
TaskGroup fetches 3 sources concurrently (~3s)
↓
Results returned (weather, news, KB answer)
↓
InterpreterPoolExecutor processes each (~2s per source, in parallel across cores)
↓
All inference results available
↓
Aggregate and return
Your implementation should:
- Get the event loop:
loop = asyncio.get_running_loop() - Create executor:
executor = ProcessPoolExecutor(max_workers=os.cpu_count()) - Submit each inference:
await loop.run_in_executor(executor, simulate_llm_inference, data) - Wait for all:
results = await asyncio.gather(...)
Step 4: Add Error Handling and Timeouts
Real systems fail. Your agent must handle:
- Individual API timeouts: Apply 5-second timeout to each fetch
- Overall system timeout: Apply 15-second timeout to entire operation
- Partial success: If 2 sources work and 1 times out, return the 2 results with error info for the 3rd
- Logging: Log all operations for debugging
Pattern:
async with asyncio.timeout(15): # Overall system timeout
# If anything inside takes >15s, TimeoutError is raised
# Your code must handle this gracefully
try:
# Fetching and processing
except asyncio.TimeoutError:
# Return whatever you have collected so far
Step 5: Aggregate and Return Results
Collect all results (successful and failed) into a comprehensive response:
result = {
"query": user_question,
"success": True/False,
"sources": {
"weather": { ... successful results or error },
"news": { ... },
"knowledge_base": { ... }
},
"inferences": [
"LLM analysis of weather...",
"LLM analysis of news...",
"LLM analysis of KB result..."
],
"errors": [], # Any errors encountered
"execution_time": total_time,
"cores_used": os.cpu_count()
}
Part 4: Testing and Validation
Test 1: Verify Concurrent Fetching
What to check: All 3 sources start nearly simultaneously (not sequentially).
# Run with logging enabled
asyncio.run(ai_agent_query("Test query"))
# Expected logs (notice timestamps are close):
# INFO - Fetching weather for New York...
# INFO - Fetching news for 'Test query'...
# INFO - Querying knowledge base for 'Test query'...
# (All start within milliseconds of each other)
# Expected total fetch time: ~3 seconds (longest source)
# NOT 6.5 seconds (sum of all)
Test 2: Verify Parallel Processing
What to check: Multiple cores are used for inference (CPU spikes).
# Terminal 1: Run your agent
python ai_agent.py
# Terminal 2: Watch CPU usage
# For Linux/Mac:
top -o %CPU
# For Windows:
tasklist /V /FI "IMAGENAME eq python.exe"
# You should see:
# - CPU jumps to 200%+ (multiple cores active)
# - Not just 100% (single core)
Test 3: Handle Timeout Gracefully
What to check: System continues if one source times out.
# Modify fetch_weather to always timeout:
async def fetch_weather(...):
await asyncio.sleep(6) # Exceeds 5s timeout
# Expected result:
# {"source": "weather", "error": "timeout"}
#
# System still returns results from news and knowledge_base
# Total execution should be ~5 seconds (timeout applied)
# Not crash or hang
Test 4: Run Benchmark
What to check: Hybrid execution is 2-3x faster than sequential.
asyncio.run(run_benchmarks())
# Expected output:
# Sequential execution: ~12.5 seconds (6.5 fetch + 6 inference)
# Hybrid execution: ~5 seconds (3 concurrent fetch + 2 parallel inference)
# Speedup: 2.5x
Part 5: Extension Challenges
Once your capstone works, push further:
Challenge 1: Add Caching
Modify ai_agent_query() to cache results by question. If the same question is asked within 60 seconds, return cached results instead of re-fetching.
# Ask your AI: "How do I implement a simple in-memory cache that expires results after 60 seconds? Should I use a dict with timestamps?"
Challenge 2: Implement Retry Logic
If a source times out, retry up to 2 times before giving up.
# Ask your AI: "How do I retry a timed-out request? Should the retry timeout be shorter or longer?"
Challenge 3: Add More Sources
Extend to 5+ sources. Verify that performance scales linearly with TaskGroup concurrency.
# Ask your AI: "I want to add 2 more async data sources. How should I structure TaskGroup to handle 5 sources?"
Challenge 4: Simulate Real AI Processing
Replace simulate_llm_inference() with a more realistic simulation that:
- Tokenizes input (split into words)
- Computes embeddings (calculate vector for each token)
- Ranks top results
- Returns structured JSON
Part 6: Architecture Review with AI
Once your capstone is complete, validate it with your AI:
💬 AI Colearning Prompt
"I built an AI agent that fetches from 3 sources concurrently and processes each with LLM inference in parallel. Here's my code: [paste your ai_agent_query() function]. Can you:
- Explain how TaskGroup ensures structured concurrency (what happens if one fetch fails?)
- Explain how InterpreterPoolExecutor achieves true parallelism (why can't threading do this?)
- Identify any error handling gaps or improvements"
Expected Outcome: AI reviews your architecture and provides concrete feedback on structured concurrency, error recovery, and GIL implications.
Key Learnings Summary
This capstone demonstrates:
- TaskGroup provides structured concurrency — all tasks must complete before moving on, and failure in one cancels others (intentional design)
- InterpreterPoolExecutor provides true parallelism — multiple Python interpreters with separate GILs let CPU-bound work run on multiple cores
- Hybrid workloads combine both patterns:
- Use TaskGroup for I/O-bound work (API calls)
- Use InterpreterPoolExecutor for CPU-bound work (inference)
- Overlap them: fetch while processing, process while fetching
- Error handling is critical — graceful degradation (continue with partial results) is more valuable than failing completely
- Timeouts prevent hang states — always apply timeouts to external operations
- Performance measurement proves the pattern works — measure sequential vs hybrid to quantify the benefit
Try With AI
Your capstone is now complete and tested. Use these prompts to deepen your understanding and explore extensions:
1. Understand Level: System Execution Flow
Ask your AI:
"Walk me through the execution timeline of the AI agent system. Specifically: (1) When does TaskGroup start fetching from all 3 sources? (2) When does InterpreterPoolExecutor start processing? (3) Can they overlap? (4) What's the critical path—the bottleneck that limits overall execution time?"
Expected Output: AI explains that TaskGroup fetches all 3 sources concurrently (bottleneck: 3-second longest source), then InterpreterPoolExecutor processes all 3 in parallel (bottleneck: 2 seconds per result, ~2 seconds total if you have enough cores). Total time is ~5 seconds because the stages overlap.
What This Proves: You understand system architecture and critical path analysis.
2. Apply Level: Handle Partial Failures
Tell your AI:
"My current implementation might crash if 2 sources timeout before returning results. I want graceful degradation—the system should continue with whatever results arrived and return them plus error messages. Show me how to catch TimeoutError and continue processing only the successful results. Then explain why this is better than failing completely."
Expected Output: AI shows how to use gather(return_exceptions=True) or try/except blocks to isolate failures, then filter results to process only successful ones.
What This Proves: You can handle real-world unreliability (not everything works perfectly).
3. Analyze Level: Performance Optimization
Ask your AI:
"My hybrid system runs in 5 seconds, but I want it faster. Looking at my code, what's the bottleneck? Is it the TaskGroup (fetching), the InterpreterPoolExecutor (processing), or something else? How could I optimize each stage?"
Expected Output: AI analyzes your specific implementation and identifies bottlenecks (e.g., "Your longest fetch takes 3 seconds, and you have 3 cores for inference. The bottleneck is the longest fetch. To optimize: fetch from faster sources first, implement timeouts to fail fast, or add caching").
What This Proves: You can reason about system performance and make optimization decisions.
4. Synthesize Level: Extend to Real AI Workload
Challenge your AI:
"Now extend this to a real language model API (like Claude's API or GPT-4). What changes? Should I cache results? Should I implement backoff/retry for rate limits? How does the architecture change when using real async libraries like httpx instead of simulated sleeps?"
Expected Output: AI guides you through real API integration (httpx, API key management, rate limiting, retry strategies) while maintaining the TaskGroup + InterpreterPoolExecutor pattern.
What This Proves: You can apply the pattern to production systems.
Congratulations! You've built a production-style AI agent system that integrates all asyncio concepts from this chapter. You understand:
- How event loops manage concurrency
- When to use TaskGroup for structured I/O concurrency
- How InterpreterPoolExecutor solves the GIL for parallelism
- How to combine both for optimal hybrid workloads
- How to handle errors gracefully in real systems
- How to measure performance and prove improvements
You're ready to build production AI systems with Python 3.14+ asyncio patterns.