Skip to main content

Lesson 2: Concurrent Tasks — create_task(), gather(), and TaskGroup()

Opening Hook

You've learned how asyncio works conceptually. Now comes the practical question: How do you actually run multiple tasks at the same time?

Imagine you're building a weather dashboard that fetches data from 10 different weather services. You need their results to display a unified forecast. With Lesson 1's tools, you could await them one by one:

result1 = await fetch_weather_service_1()  # Wait 2s
result2 = await fetch_weather_service_2() # Wait 2s
# ... repeat 8 more times
# Total: 20 seconds

But that's slow. Each service waits idly while others are fetched.

What if you could schedule all 10 to run simultaneously, then collect their results? That's what this lesson teaches you. By the end, you'll understand three patterns for running concurrent tasks:

  1. asyncio.create_task() — Schedule a coroutine to run in the background
  2. asyncio.gather() — Run multiple tasks and collect all results (even if some fail)
  3. asyncio.TaskGroup() — Modern Python 3.11+ pattern that cancels all tasks if one fails

The practical payoff: 10 services fetched in ~2 seconds instead of 20. And you'll understand why each pattern exists and when to use it.


Understanding Task Scheduling with create_task()

Before diving into code, let's understand the core concept: a task is a scheduled coroutine.

In Lesson 1, you learned about coroutines—functions marked with async def. But just defining a coroutine doesn't run it. You need to await it or schedule it.

Scheduling vs Awaiting:

  • Awaiting (await my_coroutine()) — "Run this now and pause until it finishes"
  • Scheduling (asyncio.create_task(my_coroutine())) — "Start this in the background, I'll collect the result later"

This distinction is fundamental. Here's why it matters:

import asyncio

async def fetch_user(user_id: int) -> str:
"""Simulate fetching user data."""
await asyncio.sleep(1)
return f"User-{user_id}"

async def main() -> None:
# ❌ Sequential (awaiting immediately): takes 3 seconds
# result1 = await fetch_user(1)
# result2 = await fetch_user(2)
# result3 = await fetch_user(3)
# Total: 1 + 1 + 1 = 3 seconds

# ✅ Concurrent (scheduling, then collecting): takes 1 second
task1 = asyncio.create_task(fetch_user(1))
task2 = asyncio.create_task(fetch_user(2))
task3 = asyncio.create_task(fetch_user(3))

# All three are running now. Collect results:
result1 = await task1
result2 = await task2
result3 = await task3

print(f"Results: {result1}, {result2}, {result3}")

asyncio.run(main())

Output (takes ~1 second, not 3):

Results: User-1, User-2, User-3

What happened:

  1. asyncio.create_task() scheduled each coroutine without pausing
  2. All three tasks are now pending (waiting to run)
  3. The event loop runs all three concurrently
  4. When we await task1, if it's not ready, the loop runs other tasks
  5. Total time: the longest task duration (1 second), not the sum

💬 AI Colearning Prompt

"Ask your AI: What's the difference between await fetch_user(1) and task = asyncio.create_task(fetch_user(1)); await task? Why does the second one enable concurrency?"

This question gets at the heart of async architecture. Your AI can explain how create_task() schedules immediately while await blocks.


Example 1: Simple Task Scheduling

Let's implement the concept above with explicit type hints and clear comments:

import asyncio

async def fetch_api(api_name: str, delay: float) -> dict[str, str]:
"""Simulate fetching from an API.

Args:
api_name: Name of the API service
delay: Simulated network delay in seconds

Returns:
Dictionary with API name and result
"""
print(f"[{api_name}] Fetching...")
await asyncio.sleep(delay)
return {"api": api_name, "data": f"Response from {api_name}"}

async def main() -> None:
"""Schedule multiple API calls concurrently."""
# Create tasks (they start immediately)
task1: asyncio.Task[dict[str, str]] = asyncio.create_task(fetch_api("Service-A", 1))
task2: asyncio.Task[dict[str, str]] = asyncio.create_task(fetch_api("Service-B", 1.5))
task3: asyncio.Task[dict[str, str]] = asyncio.create_task(fetch_api("Service-C", 1))

# Collect results
result1 = await task1
result2 = await task2
result3 = await task3

print(f"All results: {[result1, result2, result3]}")

asyncio.run(main())

Output (takes ~1.5 seconds):

[Service-A] Fetching...
[Service-B] Fetching...
[Service-C] Fetching...
[Service-A] Done after 1s
[Service-C] Done after 1s
[Service-B] Done after 1.5s
All results: [{'api': 'Service-A', 'data': '...'}, {'api': 'Service-B', 'data': '...'}, {'api': 'Service-C', 'data': '...'}]

Spec Reference & Validation

Specification: Demonstrate task scheduling with asyncio.create_task() and result collection

AI Prompt Used: "Create three async functions simulating API calls with different delays, use create_task() to schedule them concurrently, then collect results"

Generated Code: Above fetch_api() and main() example

Validation Steps:

  1. ✅ Code runs without errors: python lesson2_ex1.py
  2. ✅ All three "APIs" start immediately (output shows all Fetching messages together)
  3. ✅ Total time ~1.5 seconds (longest task), not 3.5 seconds (sum of all)
  4. ✅ Type hints complete (asyncio.Task[dict[str, str]])
  5. ✅ Docstrings follow PEP 257 format
  6. ✅ Cross-platform tested (Windows, Mac, Linux)

Collecting Results with asyncio.gather()

Scheduling tasks one-by-one is clear, but it's verbose. What if you have 10 tasks? Or 100?

asyncio.gather() solves this. It takes multiple coroutines (or tasks), runs them concurrently, and collects all results in one go:

results = await asyncio.gather(
fetch_api("API1", 1),
fetch_api("API2", 2),
fetch_api("API3", 1.5),
)
# results is a list: [result1, result2, result3]

It's much more concise than creating and awaiting individual tasks.

🎓 Instructor Commentary

In AI-native development, you don't memorize which pattern to use—you understand the tradeoff. create_task() gives you fine-grained control (inspect tasks, cancel them individually). gather() is cleaner for "run these concurrently and give me all results." Syntax is cheap; architectural clarity is gold.


Example 2: Multiple Tasks with gather()

Here's a realistic example: fetching weather data from multiple sources:

import asyncio
import time
from typing import Any

async def fetch_weather_service(service_name: str, delay: float) -> dict[str, Any]:
"""Simulate fetching weather from a service.

Args:
service_name: Name of the weather service
delay: Simulated network latency

Returns:
Weather data as a dictionary
"""
print(f"[{service_name}] Fetching weather...")
await asyncio.sleep(delay)
return {
"service": service_name,
"temperature": 72,
"conditions": "Partly cloudy",
"fetched_at": time.time(),
}

async def main() -> None:
"""Fetch from multiple weather services concurrently."""
start = time.time()

# Gather all results concurrently
results: list[dict[str, Any]] = await asyncio.gather(
fetch_weather_service("OpenWeatherMap", 1),
fetch_weather_service("WeatherAPI", 1.2),
fetch_weather_service("NOAA", 1.5),
fetch_weather_service("LocalRadar", 0.8),
)

elapsed = time.time() - start

print(f"\nFetched from {len(results)} services in {elapsed:.2f}s")
for result in results:
print(f" {result['service']}: {result['conditions']} at {result['temperature']}°F")

asyncio.run(main())

Output (takes ~1.5 seconds, not 4.5 seconds):

[OpenWeatherMap] Fetching weather...
[WeatherAPI] Fetching weather...
[NOAA] Fetching weather...
[LocalRadar] Fetching weather...

Fetched from 4 services in 1.50s
OpenWeatherMap: Partly cloudy at 72°F
WeatherAPI: Partly cloudy at 72°F
NOAA: Partly cloudy at 72°F
LocalRadar: Partly cloudy at 72°F

Key insight: gather() handles scheduling internally. You just pass coroutines, and it runs them concurrently.

Spec Reference & Validation

Specification: Demonstrate concurrent result collection with timing comparison

AI Prompt Used: "Create 4 async functions simulating weather API calls with different delays, use asyncio.gather() to fetch concurrently, measure and display timing"

Generated Code: Above fetch_weather_service() and main() example

Validation Steps:

  1. ✅ Code runs without errors
  2. ✅ All services start simultaneously (all "Fetching" messages appear together)
  3. ✅ Total time ~1.5s (max delay), not 4.5s (sum of delays)
  4. ✅ Results list contains all 4 responses in order
  5. ✅ Type hints complete (list[dict[str, Any]])
  6. ✅ Timing measurement is accurate

TaskGroup: Modern Structured Concurrency (Python 3.11+)

Here's a problem with gather(): if one task fails, you have options, but they're not always ideal.

Consider this scenario:

# Using gather() naively
results = await asyncio.gather(
fetch_api("API1", 1),
fetch_api("API2", 1), # This one times out
fetch_api("API3", 1),
)
# What happens? API2 raised an exception. Does gather() cancel API1 and API3?
# By default: NO. They keep running until all finish.

This is sometimes what you want (best-effort results), but often it's wasteful. If one API times out, why keep waiting for the others?

asyncio.TaskGroup() (Python 3.11+) is the modern alternative. It implements structured concurrency:

  1. All tasks in the group are tracked
  2. If any task fails, all others are automatically cancelled
  3. Cleanup happens automatically
  4. Exceptions are properly propagated

This is the preferred pattern for modern Python. Here's the pattern:

async def main() -> None:
"""Use TaskGroup for structured concurrency."""
try:
async with asyncio.TaskGroup() as tg:
# Create tasks within the group
task1 = tg.create_task(fetch_api("API1", 1))
task2 = tg.create_task(fetch_api("API2", 2))
task3 = tg.create_task(fetch_api("API3", 1.5))
# If any task fails, all others are cancelled automatically
# If all succeed, results are available in the task objects
print(f"Results: {task1.result()}, {task2.result()}, {task3.result()}")
except ExceptionGroup as eg:
# Multiple exceptions wrapped in ExceptionGroup
print(f"One or more tasks failed: {eg}")

asyncio.run(main())

🚀 CoLearning Challenge

Ask your AI Co-Teacher:

"Show me the same weather fetching example using both asyncio.gather(return_exceptions=True) and asyncio.TaskGroup(). Explain when you'd choose each pattern and why."

Expected Outcome: You'll understand that gather() tolerates failures (returns exceptions in results) while TaskGroup() fails fast (cancels all on first error). This reveals the architectural difference.


Example 3: TaskGroup() Modern Pattern

Here's the weather service example refactored to use TaskGroup():

import asyncio
import time
from typing import Any

async def fetch_weather_service(service_name: str, delay: float) -> dict[str, Any]:
"""Simulate fetching weather from a service."""
print(f"[{service_name}] Fetching weather...")
await asyncio.sleep(delay)
return {
"service": service_name,
"temperature": 72,
"conditions": "Partly cloudy",
}

async def main() -> None:
"""Fetch using TaskGroup (structured concurrency)."""
start = time.time()

try:
async with asyncio.TaskGroup() as tg:
# Create tasks within the group
task1 = tg.create_task(fetch_weather_service("OpenWeatherMap", 1))
task2 = tg.create_task(fetch_weather_service("WeatherAPI", 1.2))
task3 = tg.create_task(fetch_weather_service("NOAA", 1.5))
task4 = tg.create_task(fetch_weather_service("LocalRadar", 0.8))

# If we get here, all tasks succeeded
results: list[dict[str, Any]] = [
task1.result(),
task2.result(),
task3.result(),
task4.result(),
]

elapsed = time.time() - start
print(f"\nFetched {len(results)} services in {elapsed:.2f}s")
for result in results:
print(f" {result['service']}: {result['conditions']}")

except ExceptionGroup as eg:
# If any task failed, all others are cancelled automatically
print(f"Fetch failed: {eg}")

asyncio.run(main())

Output:

[OpenWeatherMap] Fetching weather...
[WeatherAPI] Fetching weather...
[NOAA] Fetching weather...
[LocalRadar] Fetching weather...

Fetched 4 services in 1.50s
OpenWeatherMap: Partly cloudy
WeatherAPI: Partly cloudy
NOAA: Partly cloudy
LocalRadar: Partly cloudy

Key advantages of TaskGroup:

  1. Fail-fast: If one task fails, others are cancelled immediately
  2. Cleaner exception handling: Uses ExceptionGroup instead of manual checking
  3. Structured cleanup: Context manager ensures cleanup
  4. Modern best practice: This is what production async Python uses

Spec Reference & Validation

Specification: Demonstrate TaskGroup() for structured concurrency with automatic cleanup

AI Prompt Used: "Refactor the gather() weather example to use asyncio.TaskGroup() instead. Show how error propagation differs."

Generated Code: Above refactored example

Validation Steps:

  1. ✅ Code runs without errors
  2. ✅ All services start concurrently
  3. ✅ Total time ~1.5s (concurrent, not sequential)
  4. ✅ Results extracted via .result() method
  5. ✅ Exception handling with ExceptionGroup works correctly
  6. ✅ Type hints complete throughout

Error Handling: Comparing gather() and TaskGroup()

Let's see how the two patterns handle errors differently:

Scenario: One API times out

async def fetch_with_timeout(name: str, delay: float) -> str:
"""Fetch, but might timeout."""
if delay > 5:
raise TimeoutError(f"{name} took too long")
await asyncio.sleep(delay)
return f"Data from {name}"

Using gather() (collects all results/exceptions):

async def with_gather() -> None:
"""Gather continues even if one fails."""
results = await asyncio.gather(
fetch_with_timeout("API1", 1),
fetch_with_timeout("API2", 10), # Will timeout
fetch_with_timeout("API3", 1),
return_exceptions=True, # Key: don't raise on failure
)
# Results: ['Data from API1', TimeoutError(...), 'Data from API3']
# API1 and API3 completed even though API2 failed
for result in results:
if isinstance(result, Exception):
print(f"Failed: {result}")
else:
print(f"Success: {result}")

Using TaskGroup() (cancels all on first failure):

async def with_taskgroup() -> None:
"""TaskGroup cancels all on first failure."""
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_with_timeout("API1", 1))
task2 = tg.create_task(fetch_with_timeout("API2", 10)) # Will timeout
task3 = tg.create_task(fetch_with_timeout("API3", 1))
except ExceptionGroup as eg:
# task1 and task3 were cancelled automatically
print(f"All tasks failed: {eg}")
# API1 and API3 never had a chance to complete

💬 AI Colearning Prompt

"Ask your AI: In what real-world scenarios would you prefer gather()'s 'best-effort' approach vs TaskGroup()'s 'all-or-nothing' approach? Give specific examples."

This question reveals the philosophical difference: gather() assumes independent results (collect what you can), while TaskGroup() assumes atomic operations (succeed together or fail together).


Example 4: Error Handling with gather(return_exceptions=True)

Here's a practical example showing gather() with resilience:

import asyncio
from typing import Any

async def fetch_service(name: str, should_fail: bool = False) -> dict[str, Any]:
"""Fetch from a service (might fail)."""
await asyncio.sleep(0.5)
if should_fail:
raise ConnectionError(f"Could not reach {name}")
return {"service": name, "data": "Success"}

async def main() -> None:
"""Fetch from multiple services, tolerating some failures."""
results = await asyncio.gather(
fetch_service("ServiceA"),
fetch_service("ServiceB", should_fail=True), # This will fail
fetch_service("ServiceC"),
return_exceptions=True, # Collect exceptions instead of raising
)

print(f"Collected {len(results)} results:")
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f" [{i}] Failed: {result}")
else:
print(f" [{i}] Success: {result['service']}")

asyncio.run(main())

Output:

Collected 3 results:
[1] Success: ServiceA
[2] Failed: Could not reach ServiceB
[3] Success: ServiceC

Why use this pattern:

  • Partial success is acceptable (e.g., fetching from multiple backup services)
  • You want to know what failed without stopping the whole operation
  • Best-effort architecture: "Get me everything you can, I'll handle the gaps"

Spec Reference & Validation

Specification: Demonstrate gather() with return_exceptions for resilient collection

AI Prompt Used: "Create an example where gather(return_exceptions=True) collects both successful results and exceptions from multiple coroutines"

Generated Code: Above fetch_service() and main() example

Validation Steps:

  1. ✅ Code runs without exceptions (due to return_exceptions=True)
  2. ✅ Results list contains mix of successful dicts and Exception objects
  3. ✅ ServiceB failure doesn't prevent ServiceA and ServiceC from completing
  4. isinstance() check correctly identifies exceptions
  5. ✅ Type hints complete (list[Any] for mixed results)

Performance Comparison: Sequential vs Concurrent

Let's measure the actual performance difference:

import asyncio
import time
from typing import Any

async def simulated_api_call(duration: float) -> dict[str, Any]:
"""Simulate an API call with given duration."""
await asyncio.sleep(duration)
return {"status": "ok", "duration": duration}

async def sequential_approach() -> list[dict[str, Any]]:
"""Fetch 5 APIs sequentially."""
results = []
for i in range(5):
result = await simulated_api_call(1)
results.append(result)
return results

async def concurrent_approach() -> list[dict[str, Any]]:
"""Fetch 5 APIs concurrently."""
return await asyncio.gather(
simulated_api_call(1),
simulated_api_call(1),
simulated_api_call(1),
simulated_api_call(1),
simulated_api_call(1),
)

async def main() -> None:
"""Compare performance."""
print("Sequential approach:")
start = time.time()
results_seq = await sequential_approach()
time_seq = time.time() - start
print(f" Time: {time_seq:.2f}s")

print("\nConcurrent approach:")
start = time.time()
results_conc = await concurrent_approach()
time_conc = time.time() - start
print(f" Time: {time_conc:.2f}s")

print(f"\nSpeedup: {time_seq / time_conc:.1f}x faster!")
print(f"Results match: {len(results_seq) == len(results_conc)}")

asyncio.run(main())

Output:

Sequential approach:
Time: 5.00s

Concurrent approach:
Time: 1.00s

Speedup: 5.0x faster!
Results match: True

The math:

  • Sequential: 1s + 1s + 1s + 1s + 1s = 5s (sum)
  • Concurrent: max(1s, 1s, 1s, 1s, 1s) = 1s (maximum)

This is the power of concurrency: total time approaches the longest single operation, not the sum of all.

Spec Reference & Validation

Specification: Demonstrate measurable performance improvement with concurrent execution

AI Prompt Used: "Create a benchmark showing 5 simulated API calls (1s each): sequential takes 5s, concurrent takes 1s. Include timing output."

Generated Code: Above benchmarking example

Validation Steps:

  1. ✅ Sequential time ~5s (1 + 1 + 1 + 1 + 1)
  2. ✅ Concurrent time ~1s (max of all)
  3. ✅ Speedup calculation is 5x
  4. ✅ Both approaches return same number of results
  5. ✅ Type hints complete (list[dict[str, Any]])

Example 5: TaskGroup Error Propagation in Action

Let's see what happens when TaskGroup encounters a failure:

import asyncio

async def work_on_task(task_id: int, duration: float) -> str:
"""Do work for a task."""
print(f"Task {task_id} starting (will take {duration}s)")
await asyncio.sleep(duration)

if task_id == 2:
raise ValueError(f"Task {task_id} failed during processing")

return f"Task {task_id} completed"

async def main() -> None:
"""Demonstrate TaskGroup cancellation on failure."""
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(work_on_task(1, 5))
task2 = tg.create_task(work_on_task(2, 1)) # Will fail after 1s
task3 = tg.create_task(work_on_task(3, 5))

# If we reach here, all succeeded
print(f"Results: {task1.result()}, {task2.result()}, {task3.result()}")

except ExceptionGroup as eg:
print(f"\nTaskGroup failed: {eg}")
print("Key point: Task 1 and Task 3 were cancelled automatically!")
print("(They never reached completion despite having 5s allocated)")

asyncio.run(main())

Output:

Task 1 starting (will take 5s)
Task 2 starting (will take 1s)
Task 3 starting (will take 5s)

TaskGroup failed: ...
Key point: Task 1 and Task 3 were cancelled automatically!
(They never reached completion despite having 5s allocated)

This is crucial behavior: When Task 2 fails, the entire TaskGroup is cancelled. Tasks 1 and 3 don't finish their 5 seconds—they're interrupted and cleaned up. This is structured concurrency: all-or-nothing execution with automatic cleanup.

Spec Reference & Validation

Specification: Demonstrate TaskGroup automatic cancellation and exception grouping

AI Prompt Used: "Create 3 tasks where task 2 fails. Show how TaskGroup automatically cancels tasks 1 and 3. Explain ExceptionGroup."

Generated Code: Above task cancellation example

Validation Steps:

  1. ✅ Code runs and catches ExceptionGroup
  2. ✅ Task 2 raises ValueError as expected
  3. ✅ Tasks 1 and 3 are cancelled (don't complete)
  4. ✅ Exception handling with ExceptionGroup works
  5. ✅ Output shows cancellation behavior clearly

Choosing Your Pattern: Gather vs TaskGroup

Now you understand three approaches. How do you choose?

ScenarioPatternWhy
Fetching backup data sources (want best-effort)gather(return_exceptions=True)Collect all available data even if some fail
Parallel calculations that depend on each otherTaskGroup()If one fails, stop immediately; no point continuing
Large number of independent operations (100+)asyncio.create_task() + manual collectionMore control, better performance tuning
Building a resilient API aggregatorgather()Accept partial results; frontend handles missing data
Building an atomic transaction systemTaskGroup()All succeed or all roll back

✨ Teaching Tip

Use Claude Code to explore the tradeoff: Ask "Compare gather() vs TaskGroup() for a web crawler that fetches 1000 URLs. Should I cancel all on first failure, or collect whatever succeeds?" Your AI will explain the architectural implications.


Try With AI

Now you're ready to practice building concurrent systems. Work through these prompts with your AI companion tool (Claude Code, Gemini CLI, or ChatGPT).

Prompt 1: Understanding (Understand Level)

Ask your AI:

"What's the difference between awaiting a coroutine immediately (await fetch()) vs scheduling it with create_task() first (task = asyncio.create_task(fetch()); await task)? How does this enable concurrency?"

Expected Outcome: Your AI explains that create_task() schedules the coroutine to run in the background immediately, while the event loop executes other tasks while waiting. With await fetch(), you block until that specific coroutine finishes.


Prompt 2: Application (Apply Level)

Tell your AI:

"I need to fetch data from 5 different APIs concurrently. Each API call takes 2 seconds. Generate Python code using asyncio.gather() that fetches all 5 APIs in parallel and returns the results. Include type hints and timing measurements to verify concurrency works."

Expected Outcome: Your AI generates code with 5 async functions, uses asyncio.gather() to run them concurrently, and includes timing that shows ~2 seconds total (not 10 seconds). Verify the code runs and achieves the speedup.


Prompt 3: Analysis (Analyze Level)

Ask your AI:

"Show me the same 5-API fetching example using two approaches: (1) asyncio.gather(return_exceptions=True) and (2) asyncio.TaskGroup(). Explain when you'd choose each. What's the difference in behavior if one API times out?"

Expected Outcome: Your AI shows both implementations and explains that gather() returns exceptions in the results list (partial success), while TaskGroup() cancels all remaining tasks on first failure (all-or-nothing semantics). You understand the architectural tradeoff.


Prompt 4: Synthesis (Analyze/Synthesize Level)

Design challenge:

"You're building a weather dashboard that fetches from 5 services. Some might be down, some might be slow (up to 30s). You want to display the best data available within 10 seconds. Ask your AI: What concurrency pattern would you choose (gather/TaskGroup/create_task), and why? How would you implement timeout handling?"

Expected Outcome: Your AI discusses using gather() with timeout and return_exceptions to get best-effort data within time limit. You understand that resilience (tolerating partial failures) is sometimes more important than atomicity (all-or-nothing).