Concurrent Tasks — create_task(), gather(), and TaskGroup()
You've learned how asyncio works conceptually. Now comes the practical question: How do you actually run multiple tasks at the same time?
Imagine you're building a weather dashboard that fetches data from 10 different weather services. You need their results to display a unified forecast. With Lesson 1's tools, you could await them one by one:
Loading Python environment...
But that's slow. Each service waits idly while others are fetched.
What if you could schedule all 10 to run simultaneously, then collect their results? That's what this lesson teaches you. By the end, you'll understand three patterns for running concurrent tasks:
asyncio.create_task()— Schedule a coroutine to run in the backgroundasyncio.gather()— Run multiple tasks and collect all results (even if some fail)asyncio.TaskGroup()— Modern Python 3.11+ pattern that cancels all tasks if one fails
The practical payoff: 10 services fetched in ~2 seconds instead of 20. And you'll understand why each pattern exists and when to use it.
Understanding Task Scheduling with create_task()
Before diving into code, let's understand the core concept: a task is a scheduled coroutine.
In Lesson 1, you learned about coroutines—functions marked with async def. But just defining a coroutine doesn't run it. You need to await it or schedule it.
Scheduling vs Awaiting:
- Awaiting (
await my_coroutine()) — "Run this now and pause until it finishes" - Scheduling (
asyncio.create_task(my_coroutine())) — "Start this in the background, I'll collect the result later"
This distinction is fundamental. Here's why it matters:
Loading Python environment...
Output (takes ~1 second, not 3):
Results: User-1, User-2, User-3
What happened:
asyncio.create_task()scheduled each coroutine without pausing- All three tasks are now pending (waiting to run)
- The event loop runs all three concurrently
- When we
await task1, if it's not ready, the loop runs other tasks - Total time: the longest task duration (1 second), not the sum
💬 AI Colearning Prompt
"Ask your AI: What's the difference between
await fetch_user(1)andtask = asyncio.create_task(fetch_user(1)); await task? Why does the second one enable concurrency?"
This question gets at the heart of async architecture. Your AI can explain how create_task() schedules immediately while await blocks.
Example 1: Simple Task Scheduling
Let's implement the concept above with explicit type hints and clear comments:
Loading Python environment...
Output (takes ~1.5 seconds):
[Service-A] Fetching...
[Service-B] Fetching...
[Service-C] Fetching...
[Service-A] Done after 1s
[Service-C] Done after 1s
[Service-B] Done after 1.5s
All results: [{'api': 'Service-A', 'data': '...'}, {'api': 'Service-B', 'data': '...'}, {'api': 'Service-C', 'data': '...'}]
Spec Reference & Validation
Specification: Demonstrate task scheduling with asyncio.create_task() and result collection
AI Prompt Used: "Create three async functions simulating API calls with different delays, use create_task() to schedule them concurrently, then collect results"
Generated Code: Above fetch_api() and main() example
Validation Steps:
- ✅ Code runs without errors:
python lesson2_ex1.py - ✅ All three "APIs" start immediately (output shows all Fetching messages together)
- ✅ Total time ~1.5 seconds (longest task), not 3.5 seconds (sum of all)
- ✅ Type hints complete (
asyncio.Task[dict[str, str]]) - ✅ Docstrings follow PEP 257 format
- ✅ Cross-platform tested (Windows, Mac, Linux)
Collecting Results with asyncio.gather()
Scheduling tasks one-by-one is clear, but it's verbose. What if you have 10 tasks? Or 100?
asyncio.gather() solves this. It takes multiple coroutines (or tasks), runs them concurrently, and collects all results in one go:
Loading Python environment...
It's much more concise than creating and awaiting individual tasks.
🎓 Expert Insight
In AI-native development, you don't memorize which pattern to use—you understand the tradeoff.
create_task()gives you fine-grained control (inspect tasks, cancel them individually).gather()is cleaner for "run these concurrently and give me all results." Syntax is cheap; architectural clarity is gold.
Example 2: Multiple Tasks with gather()
Here's a realistic example: fetching weather data from multiple sources:
Loading Python environment...
Output (takes ~1.5 seconds, not 4.5 seconds):
[OpenWeatherMap] Fetching weather...
[WeatherAPI] Fetching weather...
[NOAA] Fetching weather...
[LocalRadar] Fetching weather...
Fetched from 4 services in 1.50s
OpenWeatherMap: Partly cloudy at 72°F
WeatherAPI: Partly cloudy at 72°F
NOAA: Partly cloudy at 72°F
LocalRadar: Partly cloudy at 72°F
Key insight: gather() handles scheduling internally. You just pass coroutines, and it runs them concurrently.
Spec Reference & Validation
Specification: Demonstrate concurrent result collection with timing comparison
AI Prompt Used: "Create 4 async functions simulating weather API calls with different delays, use asyncio.gather() to fetch concurrently, measure and display timing"
Generated Code: Above fetch_weather_service() and main() example
Validation Steps:
- ✅ Code runs without errors
- ✅ All services start simultaneously (all "Fetching" messages appear together)
- ✅ Total time ~1.5s (max delay), not 4.5s (sum of delays)
- ✅ Results list contains all 4 responses in order
- ✅ Type hints complete (
list[dict[str, Any]]) - ✅ Timing measurement is accurate
TaskGroup: Modern Structured Concurrency (Python 3.11+)
Here's a problem with gather(): if one task fails, you have options, but they're not always ideal.
Consider this scenario:
Loading Python environment...
This is sometimes what you want (best-effort results), but often it's wasteful. If one API times out, why keep waiting for the others?
asyncio.TaskGroup() (Python 3.11+) is the modern alternative. It implements structured concurrency:
- All tasks in the group are tracked
- If any task fails, all others are automatically cancelled
- Cleanup happens automatically
- Exceptions are properly propagated
This is the preferred pattern for modern Python. Here's the pattern:
Loading Python environment...
🚀 CoLearning Challenge
Ask your AI Co-Teacher:
"Show me the same weather fetching example using both
asyncio.gather(return_exceptions=True)andasyncio.TaskGroup(). Explain when you'd choose each pattern and why."
Expected Outcome: You'll understand that gather() tolerates failures (returns exceptions in results) while TaskGroup() fails fast (cancels all on first error). This reveals the architectural difference.
Example 3: TaskGroup() Modern Pattern
Here's the weather service example refactored to use TaskGroup():
Loading Python environment...
Output:
[OpenWeatherMap] Fetching weather...
[WeatherAPI] Fetching weather...
[NOAA] Fetching weather...
[LocalRadar] Fetching weather...
Fetched 4 services in 1.50s
OpenWeatherMap: Partly cloudy
WeatherAPI: Partly cloudy
NOAA: Partly cloudy
LocalRadar: Partly cloudy
Key advantages of TaskGroup:
- Fail-fast: If one task fails, others are cancelled immediately
- Cleaner exception handling: Uses
ExceptionGroupinstead of manual checking - Structured cleanup: Context manager ensures cleanup
- Modern best practice: This is what production async Python uses
Spec Reference & Validation
Specification: Demonstrate TaskGroup() for structured concurrency with automatic cleanup
AI Prompt Used: "Refactor the gather() weather example to use asyncio.TaskGroup() instead. Show how error propagation differs."
Generated Code: Above refactored example
Validation Steps:
- ✅ Code runs without errors
- ✅ All services start concurrently
- ✅ Total time ~1.5s (concurrent, not sequential)
- ✅ Results extracted via
.result()method - ✅ Exception handling with
ExceptionGroupworks correctly - ✅ Type hints complete throughout
Error Handling: Comparing gather() and TaskGroup()
Let's see how the two patterns handle errors differently:
Scenario: One API times out
Loading Python environment...
Using gather() (collects all results/exceptions):
Loading Python environment...
Using TaskGroup() (cancels all on first failure):
Loading Python environment...
💬 AI Colearning Prompt
"Ask your AI: In what real-world scenarios would you prefer gather()'s 'best-effort' approach vs TaskGroup()'s 'all-or-nothing' approach? Give specific examples."
This question reveals the philosophical difference: gather() assumes independent results (collect what you can), while TaskGroup() assumes atomic operations (succeed together or fail together).
Example 4: Error Handling with gather(return_exceptions=True)
Here's a practical example showing gather() with resilience:
Loading Python environment...
Output:
Collected 3 results:
[1] Success: ServiceA
[2] Failed: Could not reach ServiceB
[3] Success: ServiceC
Why use this pattern:
- Partial success is acceptable (e.g., fetching from multiple backup services)
- You want to know what failed without stopping the whole operation
- Best-effort architecture: "Get me everything you can, I'll handle the gaps"
Spec Reference & Validation
Specification: Demonstrate gather() with return_exceptions for resilient collection
AI Prompt Used: "Create an example where gather(return_exceptions=True) collects both successful results and exceptions from multiple coroutines"
Generated Code: Above fetch_service() and main() example
Validation Steps:
- ✅ Code runs without exceptions (due to return_exceptions=True)
- ✅ Results list contains mix of successful dicts and Exception objects
- ✅ ServiceB failure doesn't prevent ServiceA and ServiceC from completing
- ✅
isinstance()check correctly identifies exceptions - ✅ Type hints complete (
list[Any]for mixed results)
Performance Comparison: Sequential vs Concurrent
Let's measure the actual performance difference:
Loading Python environment...
Output:
Sequential approach:
Time: 5.00s
Concurrent approach:
Time: 1.00s
Speedup: 5.0x faster!
Results match: True
The math:
- Sequential: 1s + 1s + 1s + 1s + 1s = 5s (sum)
- Concurrent: max(1s, 1s, 1s, 1s, 1s) = 1s (maximum)
This is the power of concurrency: total time approaches the longest single operation, not the sum of all.
Spec Reference & Validation
Specification: Demonstrate measurable performance improvement with concurrent execution
AI Prompt Used: "Create a benchmark showing 5 simulated API calls (1s each): sequential takes 5s, concurrent takes 1s. Include timing output."
Generated Code: Above benchmarking example
Validation Steps:
- ✅ Sequential time ~5s (1 + 1 + 1 + 1 + 1)
- ✅ Concurrent time ~1s (max of all)
- ✅ Speedup calculation is 5x
- ✅ Both approaches return same number of results
- ✅ Type hints complete (
list[dict[str, Any]])
Example 5: TaskGroup Error Propagation in Action
Let's see what happens when TaskGroup encounters a failure:
Loading Python environment...
Output:
Task 1 starting (will take 5s)
Task 2 starting (will take 1s)
Task 3 starting (will take 5s)
TaskGroup failed: ...
Key point: Task 1 and Task 3 were cancelled automatically!
(They never reached completion despite having 5s allocated)
This is crucial behavior: When Task 2 fails, the entire TaskGroup is cancelled. Tasks 1 and 3 don't finish their 5 seconds—they're interrupted and cleaned up. This is structured concurrency: all-or-nothing execution with automatic cleanup.
Spec Reference & Validation
Specification: Demonstrate TaskGroup automatic cancellation and exception grouping
AI Prompt Used: "Create 3 tasks where task 2 fails. Show how TaskGroup automatically cancels tasks 1 and 3. Explain ExceptionGroup."
Generated Code: Above task cancellation example
Validation Steps:
- ✅ Code runs and catches ExceptionGroup
- ✅ Task 2 raises ValueError as expected
- ✅ Tasks 1 and 3 are cancelled (don't complete)
- ✅ Exception handling with ExceptionGroup works
- ✅ Output shows cancellation behavior clearly
Choosing Your Pattern: Gather vs TaskGroup
Now you understand three approaches. How do you choose?
| Scenario | Pattern | Why |
|---|---|---|
| Fetching backup data sources (want best-effort) | gather(return_exceptions=True) | Collect all available data even if some fail |
| Parallel calculations that depend on each other | TaskGroup() | If one fails, stop immediately; no point continuing |
| Large number of independent operations (100+) | asyncio.create_task() + manual collection | More control, better performance tuning |
| Building a resilient API aggregator | gather() | Accept partial results; frontend handles missing data |
| Building an atomic transaction system | TaskGroup() | All succeed or all roll back |
✨ Teaching Tip
Use Claude Code to explore the tradeoff: Ask "Compare gather() vs TaskGroup() for a web crawler that fetches 1000 URLs. Should I cancel all on first failure, or collect whatever succeeds?" Your AI will explain the architectural implications.
Challenge 2: The Concurrent Tasks Builder
This challenge helps you master task scheduling patterns through hands-on exploration and AI collaboration.
Initial Exploration
Your Challenge: Explore the difference between sequential and concurrent patterns without AI.
Deliverable: Create /tmp/task_patterns.py containing:
- A sequential version:
await fetch_api("A", 1); await fetch_api("B", 1)— measure time (should be ~2s) - A concurrent version using
asyncio.create_task()— schedule both tasks, then await them — measure time (should be ~1s) - A third version using
asyncio.gather()— same as concurrent but more concise
Expected Observation:
- Sequential: ~2 seconds (tasks run one after another)
- Concurrent: ~1 second (tasks run simultaneously)
- Gather: ~1 second (but cleaner syntax)
Self-Validation:
- Can you explain why concurrent is faster? (Tasks overlap in execution time)
- What happens if you schedule 10 tasks instead of 2? (Still ~1s, not ~10s)
- How would error handling differ in sequential vs concurrent? (Sequential: first error stops all; concurrent: might continue)
Understanding Task Scheduling Patterns
💬 AI Colearning Prompt: "I want to run 10 database queries concurrently, each taking 1 second. I tried wrapping them in
async defand usingawaitinside a for loop, but it still takes 10 seconds. Teach me the difference between 'awaiting immediately' and 'scheduling first.' Show me code using bothasyncio.gather()andasyncio.create_task(). Which should I use when?"
What You'll Learn: The conceptual difference (immediate await blocks; scheduling creates pending tasks), both patterns, and when each applies.
Clarifying Question: Ask AI to deepen your understanding:
"You showed me
asyncio.gather(*[task() for task in ...]. Explain exactly what gather() does—does it start the tasks? When do they actually start running?"
Expected Outcome: AI clarifies that gather() takes coroutines (not tasks) and schedules them internally, then awaits all their results. You understand gather() as a convenience wrapper.
Exploring Error Handling Patterns
Activity: Work with AI to understand how different task coordination approaches handle failures.
First, ask AI to generate example code with TaskGroup:
Loading Python environment...
Your Task:
- Run this code. What happens? (Should fail with exception from API-2, cancels API-3)
- Compare to the same code using
asyncio.gather(return_exceptions=True)— what's the difference? - Teach AI:
"TaskGroup cancelled API-3 even though it was running. But I want API-1 and API-3 to complete even if API-2 fails. Show me gather() with return_exceptions=True and explain why it's different. When would you choose TaskGroup (all-or-nothing) vs gather (best-effort)?"
Your Edge Case Discovery: Ask AI:
"What happens if I set a timeout on gather()? Like
asyncio.wait_for(asyncio.gather(...), timeout=2). How does this interact with return_exceptions?"
Expected Outcome: You discover the difference between fail-fast (TaskGroup) and resilient (gather) patterns, and learn when each applies in production.
Building a Production Task Coordinator
Capstone Activity: Build a real-world task coordinator.
Specification:
- Fetch from 8 external services with varied latency (use asyncio.sleep):
- 3 services: 0.5s each
- 3 services: 1.5s each
- 2 services: 2.5s each
- 2 services fail randomly (raise exception)
- Use
asyncio.gather(return_exceptions=True)for resilience - Include timeout: if total fetch takes >5 seconds, cancel remaining tasks
- Return: (successful_results, failed_services, timed_out_flag)
- Type hints throughout
Deliverable: Save to /tmp/task_coordinator.py
Testing Your Work:
python /tmp/task_coordinator.py
# Expected output:
# Successfully fetched: 6 services
# Failed services: 2 (handled gracefully)
# Total time: ~2.5 seconds (longest service)
# Timeout triggered: False (or True if >5s)
Validation Checklist:
- Code runs without raising exceptions
- All 8 services attempted (gather runs all)
- Failed services caught by return_exceptions=True
- Total time ~2.5s (longest latency, not sum)
- Timeout mechanism works (can adjust latency to test)
- Type hints complete
- Follows production pattern (asyncio.run at top, gather for coordination)
Time Estimate: 30-35 minutes (5 min discover, 8 min teach/learn, 8 min edge cases, 9-14 min build artifact)
Key Takeaway: You've mastered three task coordination patterns and understand when to choose resilience (gather) over atomicity (TaskGroup).
Try With AI
Why does TaskGroup cancel all tasks when one fails, but gather() continues with return_exceptions=True?
🔍 Explore Task Coordination:
"Compare asyncio.TaskGroup vs asyncio.gather() for 5 concurrent tasks. Show what happens when task 3 fails in each approach. Explain when atomicity (TaskGroup) matters vs resilience (gather)."
🎯 Practice Structured Concurrency:
"Implement a service health checker using TaskGroup that tests 10 endpoints. Show how automatic cancellation protects against partial failures. What happens to tasks 5-10 when task 2 fails?"
🧪 Test Exception Handling:
"Create 8 async tasks where 2 fail randomly. Use gather(return_exceptions=True) to continue despite failures. Show how to identify which tasks succeeded vs failed and handle errors gracefully."
🚀 Apply to Multi-Agent Systems:
"Design an agent coordinator that launches 6 agents concurrently. Some agents must all succeed (use TaskGroup), others can fail independently (use gather). Explain your coordination strategy."