Skip to main content

Capstone — AI Agent System for Multi-Source Data Processing

You've learned how asyncio handles concurrency, how InterpreterPoolExecutor enables true parallelism, and how to combine them. But you've studied these patterns in isolation—single API calls, simple parallel tasks.

Now imagine building a real AI system. A production language model agent that needs to:

  1. Fetch context from multiple sources concurrently — Weather API, News API, knowledge base (all at the same time, not sequentially)
  2. Process each response in parallel — Simulate LLM inference on each result using multiple CPU cores
  3. Handle failures gracefully — If one source times out, continue with others
  4. Aggregate results — Combine partial results into a cohesive response
  5. Measure performance — Prove that hybrid execution is 2–3x faster than doing everything sequentially

That's what this capstone builds. You'll create a production-style AI agent system that integrates every concept from Lessons 1-5 into one working example. This isn't a learning exercise—it's a pattern you'll use in real applications.

By the end of this lesson, you'll have built a system that:

  • Fetches from 3+ sources concurrently using TaskGroup() (Lesson 2)
  • Processes results in parallel using InterpreterPoolExecutor (Lesson 4)
  • Applies timeout controls using asyncio.timeout() (Lesson 3)
  • Handles errors gracefully with try/except patterns (Lesson 3)
  • Demonstrates measurable performance improvement (Lesson 5)

Project Overview: The Multi-Source AI Agent

What You're Building

An AI Agent System that simulates how real AI applications work:

┌─────────────────────────────────────────────────────────────┐
│ User Query: "What's happening now?" │
└────────┬──────────────────────────────────────────────────┬──┘
│ │
┌────▼─────┐ ┌──────────────┐ ┌──────────────────┐ │
│ Weather │ │ News │ │ Knowledge Base │ │
│ API │ │ API │ │ Query │ │
│(2s delay)│ │ (1.5s delay) │ │ (3s delay) │ │
└────┬─────┘ └──────┬───────┘ └────────┬─────────┘ │
│ │ │ │
└────────────────┼────────────────────┘ │
│ │
┌─────▼─────────┐ │
│ TaskGroup │ Concurrent Fetching │
│ (All 3 at │ Total: ~3s │
│ once) │ (not 6.5s!) │
└─────┬─────────┘ │
│ │
┌─────────────┼────────────────┐ │
│ │ │ │
┌───▼─┐ ┌──▼──┐ ┌───▼──┐ │
│LLM │ │LLM │ │LLM │ │
│Infer│ │Infer│ │Infer │ │
│(2s) │ │(2s) │ │(2s) │ │
└───┬─┘ └──┬──┘ └───┬──┘ │
│ │ │ │
└────────┬───┴───────────────┘ │
│ │
┌────────▼──────────┐ │
│ InterpreterPool │ Parallel Processing │
│ Executor (3 cores │ Total: ~2s │
│ in parallel) │ (not 6s!) │
└────────┬──────────┘ │
│ │
┌────────────▼─────────────┐ │
│ Aggregate Results │ │
│ Return unified response│ │
└──────────────────────────┘ │
│ │
└─────────────────────────────────────┘

TOTAL TIME: ~5 seconds (with timeouts)
Sequential would take: ~6.5 seconds just for fetching + 6 seconds for inference = 12.5 seconds!
Hybrid saves ~150% time through I/O concurrency + CPU parallelism

System Components

Concurrent Fetchers (TaskGroup handles these):

  • fetch_weather(city) — Simulates API call with 2s latency
  • fetch_news(query) — Simulates API call with 1.5s latency
  • query_knowledge_base(question) — Simulates API call with 3s latency

Parallel Processor (InterpreterPoolExecutor handles this):

  • simulate_llm_inference(text) — CPU-intensive text analysis (2s per result)

Main Agent (orchestrates everything):

  • ai_agent_query(user_question) — Coordinates concurrent fetch + parallel processing

Part 1: System Requirements and Architecture

Functional Requirements

Your system must:

  1. Fetch from 3 concurrent sources using TaskGroup() with structured concurrency
  2. Process each result with simulated LLM inference in parallel via InterpreterPoolExecutor
  3. Apply timeouts:
    • Individual API calls: 5 seconds each
    • Overall system: 15 seconds total
  4. Handle partial failures: If one source times out, continue with others (graceful degradation)
  5. Return aggregated results containing:
    • Successful results from all working sources
    • Error messages from failed sources
    • Total execution time
    • Count of parallel cores used
  6. Demonstrate performance: Complete 2–3x faster than sequential execution

Non-Functional Requirements

  • All code uses Python 3.14+ patterns (type hints, modern syntax)
  • Error handling is explicit (no silent failures)
  • Logging captures execution flow for debugging
  • System is resilient (handles timeouts and exceptions gracefully)

Architecture Principles

Use the Graduated Teaching Pattern from Lesson 5:

  1. Manual Foundation (Lesson 1): Understand sequential execution first
  2. Concurrent I/O (Lessons 2-3): Replace sequential with TaskGroup
  3. Parallel Processing (Lesson 4): Add CPU work in InterpreterPoolExecutor
  4. Integration (Lesson 5): Combine both patterns
  5. System Design (Lesson 6 - THIS LESSON): Real-world architecture with error handling and optimization

Your capstone applies this progression in one integrated system.


Part 2: Code Structure and Implementation Approach

Skeleton Code: Understanding the Pattern

Here's the structure of the system students implement (with AI guidance):

Loading Python environment...

💬 AI Colearning Prompt

"Walk me through how the ai_agent_query() function coordinates TaskGroup and InterpreterPoolExecutor. Which part runs concurrently (TaskGroup)? Which part runs in parallel (InterpreterPoolExecutor)? Draw a timeline showing when each stage executes."


Part 3: Implementation Walkthrough

Step 1: Understand the Skeleton

Before implementing, study the provided skeleton code above. Notice:

  1. Three data sources with built-in timeouts
  2. One CPU-intensive function (simulate_llm_inference)
  3. Main orchestrator (ai_agent_query) — this is where you work
  4. Benchmark functions to prove your system is faster

Your job: Implement ai_agent_query() function using TaskGroup and InterpreterPoolExecutor.

🎓 Expert Insight

In AI-native development, you don't code from scratch. You understand a pattern, see a skeleton, and ask AI to help fill the gaps. The ai_agent_query() function is that gap—it's where all concepts combine. You're not memorizing how to use TaskGroup and InterpreterPoolExecutor in isolation. You're architecting a system where both work together, optimally using I/O concurrency and CPU parallelism.

Step 2: Implement Concurrent Fetching with TaskGroup

The first task: fetch from all 3 sources at the same time using structured concurrency.

🚀 CoLearning Challenge

Ask your AI:

"I need to fetch from 3 async sources concurrently and handle timeouts gracefully. The fetchers are: fetch_weather(), fetch_news(), query_knowledge_base(). Show me how to use TaskGroup to fetch all 3 in parallel. If one times out, the system should continue with the others. Explain the TaskGroup pattern step-by-step."

Expected Output: AI shows code using async with asyncio.TaskGroup() as tg: with proper error handling.

Your Validation: Run the fetching code and verify:

  • All 3 sources start fetching (logs should show starting almost simultaneously)
  • Total fetch time is ~3 seconds (limited by longest source), not 6.5 seconds
  • TimeoutError from any source doesn't crash the system

Step 3: Implement Parallel Processing with InterpreterPoolExecutor

Once you have all results, process each with LLM inference in parallel.

✨ Teaching Tip

Use your AI to explore the bridge between async and parallel execution: "How do I use loop.run_in_executor() to call simulate_llm_inference() from async code? Why do I need an executor instead of just calling the function directly?"

Pattern you're building:

TaskGroup fetches 3 sources concurrently (~3s)

Results returned (weather, news, KB answer)

InterpreterPoolExecutor processes each (~2s per source, in parallel across cores)

All inference results available

Aggregate and return

Your implementation should:

  1. Get the event loop: loop = asyncio.get_running_loop()
  2. Create executor: executor = ProcessPoolExecutor(max_workers=os.cpu_count())
  3. Submit each inference: await loop.run_in_executor(executor, simulate_llm_inference, data)
  4. Wait for all: results = await asyncio.gather(...)

Step 4: Add Error Handling and Timeouts

Real systems fail. Your agent must handle:

  • Individual API timeouts: Apply 5-second timeout to each fetch
  • Overall system timeout: Apply 15-second timeout to entire operation
  • Partial success: If 2 sources work and 1 times out, return the 2 results with error info for the 3rd
  • Logging: Log all operations for debugging

Pattern:

Loading Python environment...

Step 5: Aggregate and Return Results

Collect all results (successful and failed) into a comprehensive response:

Loading Python environment...


Part 4: Testing and Validation

Test 1: Verify Concurrent Fetching

What to check: All 3 sources start nearly simultaneously (not sequentially).

Loading Python environment...

Test 2: Verify Parallel Processing

What to check: Multiple cores are used for inference (CPU spikes).

# Terminal 1: Run your agent
python ai_agent.py

# Terminal 2: Watch CPU usage
# For Linux/Mac:
top -o %CPU

# For Windows:
tasklist /V /FI "IMAGENAME eq python.exe"

# You should see:
# - CPU jumps to 200%+ (multiple cores active)
# - Not just 100% (single core)

Test 3: Handle Timeout Gracefully

What to check: System continues if one source times out.

Loading Python environment...

Test 4: Run Benchmark

What to check: Hybrid execution is 2–3x faster than sequential.

Loading Python environment...


Part 5: Extension Challenges

Once your capstone works, push further:

Challenge 1: Add Caching

Modify ai_agent_query() to cache results by question. If the same question is asked within 60 seconds, return cached results instead of re-fetching.

Loading Python environment...

Challenge 2: Implement Retry Logic

If a source times out, retry up to 2 times before giving up.

Loading Python environment...

Challenge 3: Add More Sources

Extend to 5+ sources. Verify that performance scales linearly with TaskGroup concurrency.

Loading Python environment...

Challenge 4: Simulate Real AI Processing

Replace simulate_llm_inference() with a more realistic simulation that:

  • Tokenizes input (split into words)
  • Computes embeddings (calculate vector for each token)
  • Ranks top results
  • Returns structured JSON

Part 6: Architecture Review with AI

Once your capstone is complete, validate it with your AI:

💬 AI Colearning Prompt

"I built an AI agent that fetches from 3 sources concurrently and processes each with LLM inference in parallel. Here's my code: [paste your ai_agent_query() function]. Can you:

  1. Explain how TaskGroup ensures structured concurrency (what happens if one fetch fails?)
  2. Explain how InterpreterPoolExecutor achieves true parallelism (why can't threading do this?)
  3. Identify any error handling gaps or improvements"

Expected Outcome: AI reviews your architecture and provides concrete feedback on structured concurrency, error recovery, and GIL implications.


Key Learnings Summary

This capstone demonstrates:

  1. TaskGroup provides structured concurrency — all tasks must complete before moving on, and failure in one cancels others (intentional design)
  2. InterpreterPoolExecutor provides true parallelism — multiple Python interpreters with separate GILs let CPU-bound work run on multiple cores
  3. Hybrid workloads combine both patterns:
    • Use TaskGroup for I/O-bound work (API calls)
    • Use InterpreterPoolExecutor for CPU-bound work (inference)
    • Overlap them: fetch while processing, process while fetching
  4. Error handling is critical — graceful degradation (continue with partial results) is more valuable than failing completely
  5. Timeouts prevent hang states — always apply timeouts to external operations
  6. Performance measurement proves the pattern works — measure sequential vs hybrid to quantify the benefit

Challenge 6: The Complete AI Agent Capstone (5-Part)

This is a 5-part bidirectional learning challenge where you complete, evaluate, and reflect on your production AI agent system.

Verification and Testing Phase

Your Challenge: Ensure your built system actually works as intended.

Verification Checklist:

  1. Run your complete AI agent system from Part 4 of the main lesson
  2. Measure actual execution time (should be ~5-6 seconds, not 9+ seconds sequential)
  3. Verify concurrent fetching: all 3 sources start within 100ms of each other
  4. Verify parallel processing: CPU work happens on multiple cores simultaneously
  5. Test error handling: simulate one source timing out; system should continue
  6. Document timing: {fetch_time, process_time, total_time, speedup_vs_sequential}

Expected Behavior:

  • Concurrent fetch: 3 seconds (longest single source, not sum)
  • Parallel process: 2 seconds (3 tasks on multiple cores)
  • Total: 5-6 seconds (stages overlap, not sequential)
  • Speedup: 1.5–2x vs sequential (~10 seconds)

Deliverable: Create /tmp/capstone_verification.md documenting:

  • Actual measured timings
  • Proof of concurrency (execution timeline)
  • Proof of parallelism (core utilization)
  • Error scenarios tested
  • Performance improvement quantified

Performance Analysis Phase

Your Challenge: Identify bottlenecks without AI help first.

Analysis Tasks:

  1. Run with different number of fetch sources (2, 3, 4, 5) — how does total time scale?
  2. Run with different process times (1s, 2s, 3s) — what becomes the bottleneck?
  3. Add timing instrumentation: log exactly when each fetch starts/completes, when each process starts/completes
  4. Create a timeline visualization showing overlap (or lack thereof)
  5. Identify the critical path: which task/stage limits overall execution?

Expected Observations:

  • Adding more sources increases fetch time (more tasks)
  • Processing time affects when results are ready
  • Critical path is: max(fetch_times) + max(process_times), not sum
  • Pipeline is NOT fully optimal—there's opportunity for improvement

Self-Validation:

  • Can you explain why adding a 4th source didn't increase time much?
  • What would happen if processing was much slower than fetching?
  • How would you fix a bottlenecked pipeline?

Learning Production Optimization Patterns

💬 AI Colearning Prompt: "I built an AI agent that fetches from 3 sources (2-3s each) and processes results (2s each). Total time is ~5 seconds. I added timing instrumentation and identified that fetch is the bottleneck. Teach me about optimization strategies: 1) Early timeout to fail fast, 2) Caching to avoid refetching, 3) Request prioritization (fetch fast sources first), 4) Adaptive batch sizing. Show me code examples for each. Which would help most in my system?"

AI's Role: Explain production optimization patterns, show code for each strategy, help you reason about tradeoffs (latency vs memory, throughput vs latency).

Interactive Moment: Ask a clarifying question:

"You showed me timeout for fast-fail. But what if a source is occasionally slow (97% of time 0.5s, but sometimes 5s)? How do I balance reliability (waiting for slow source) with latency (timing out)? What's the right timeout value?"

Expected Outcome: AI clarifies that optimization is contextual—you optimize for your constraints (latency SLO, error budget, resource limits). You learn to make principled engineering decisions, not just chase speedup.


System Architecture and Extension Phase

Setup: AI generates an optimized version of your system. Your job is to test it, find issues, and improve it based on production realities.

AI's Initial Code (ask for this):

"Show me an optimized version of the AI agent that: 1) Implements per-source timeout (fail fast on slow sources), 2) Uses asyncio.as_completed() to process results as they arrive (not wait for all), 3) Adds exponential backoff for retries. Explain why this is better than my current version."

Your Task:

  1. Run the optimized version. Measure timing.
  2. Compare to your original: is it faster? By how much?
  3. Identify potential issues:
    • Does early timeout help or hurt? (Might fail unnecessarily)
    • Does as_completed() improve UX or create race conditions?
    • Is retry logic robust or does it hang on certain failures?
  4. Teach AI:

"Your optimized version is 10% faster but sometimes times out sources that would have succeeded in 3.5 seconds. The timeout you chose (3s) is too aggressive for my use case. How do I determine the right timeout for each source? Should it be static or adaptive based on historical latency?"

Your Edge Case Discovery: Ask AI:

"What if I want to return 'best effort' results as soon as 2 out of 3 sources complete, instead of waiting for all 3? How would that change the architecture? What problems might that create?"

Expected Outcome: You discover that optimization has tradeoffs—faster timeouts mean more failures, early returns mean incomplete results. You learn to think about requirements first, then optimize.


Reflection and Synthesis Phase

Your Challenge: Synthesize everything you've learned about asyncio into principle-based thinking.

Reflection Tasks:

  1. Conceptual Mapping: Draw a diagram showing how Lessons 1-5 concepts connect:

    • Event loops (Lesson 1) enable TaskGroup (Lesson 2)
    • await/pause points (Lesson 1) create opportunities for timeouts (Lesson 3)
    • TaskGroup (Lesson 2) can't help CPU work, so InterpreterPoolExecutor (Lesson 4) needed
    • All together form the hybrid system (Lesson 5)
  2. Principles Documentation: Write 3-5 principles you've learned:

    • Example: "I/O-bound work benefits from concurrency (TaskGroup), CPU-bound work needs parallelism (InterpreterPoolExecutor)"
    • Example: "Timeouts are defensive—without them, one slow source hangs the whole system"
    • Example: "Error handling must distinguish transient (retry) vs permanent (fail fast) failures"
  3. Production Checklist: Create a checklist for building production asyncio systems:

    • Event loop management (asyncio.run at top level)
    • Structured concurrency (TaskGroup, not raw create_task)
    • Timeout controls (prevent infinite waits)
    • Error handling (distinguish error types)
    • Resource cleanup (executor shutdown, queue drains)
    • Performance measurement (baseline vs optimized)
  4. AI Conversation: Talk to AI about your system as if you were explaining it to a colleague:

"Our AI agent works like this: [describe your architecture]. The key insights are: [principles]. I'd optimize by [your optimization strategy]. What are the risks I haven't considered? What production issues might I hit?"

Expected Outcome: AI identifies edge cases you missed (concurrency bugs, resource exhaustion, corner cases). You learn from production experience vicariously.

Deliverable: Save to /tmp/capstone_reflection.md:

  • Concept map (text or ASCII diagram)
  • 5 Core principles you've learned
  • Production checklist
  • Summary of architecture and optimization choices
  • Identified risks and mitigation strategies

Chapter Synthesis: From Concepts to Production

You've now mastered:

  • Layer 1 (Foundations): Understanding event loops and coroutines
  • Layer 2 (Collaboration): TaskGroup, gather(), timeout patterns
  • Layer 3 (Intelligence): Hybrid I/O+CPU systems, bottleneck analysis
  • Layer 4 (Integration): Production-grade system with error handling, timeouts, optimization

You can now:

  • Build systems that scale to 1000+ concurrent I/O operations
  • Parallelize CPU work across cores despite the GIL
  • Handle real-world failures gracefully
  • Optimize based on bottleneck analysis
  • Explain architectural decisions to colleagues

Time Estimate: 50-65 minutes (10 min verification, 10 min discovery, 12 min coach interaction, 12 min optimization, 6-15 min reflection)

Key Takeaway: Asyncio isn't just syntax—it's a mindset. Event loops, structured concurrency, careful error handling, and measurement are how real systems are built. You've learned the architecture, not just the code.


Try With AI

How do you integrate async API calls, CPU-bound inference, timeouts, retries, and error handling in one AI agent system?

🔍 Explore Complete System Architecture:

"Show me the architecture for an AI agent that: fetches data from APIs (asyncio), processes with ML model (ProcessPoolExecutor), stores results (asyncio). Trace a request through all layers and explain concurrency at each stage."

🎯 Practice Production Patterns:

"Implement structured concurrency with TaskGroup for critical operations and gather() for optional enrichment. Add per-operation timeouts (2s API, 5s inference). Show how one failure doesn't crash the system."

🧪 Test Resilience Mechanisms:

"Create an agent with 3 API dependencies. One API is down, one is slow (3s), one is flaky (50% failure rate). Implement retries, circuit breakers, and fallback responses. Measure end-to-end latency under degraded conditions."

🚀 Apply to Multi-Agent Orchestration:

"Design a coordinator managing 5 agents concurrently. Each agent has Fetch → Infer → Store pipeline. Include backpressure (limit concurrent agents), error aggregation, and throughput monitoring. Explain bottleneck optimization strategy."