Lesson 5: Choosing Your Concurrency Approach
You've now learned about CPython's architecture (Lesson 1), its performance evolution (Lesson 2), the traditional GIL (Lesson 3), and free-threaded Python (Lesson 4). You understand what changed and why it matters. But there's a critical gap in your knowledge: how do you actually choose which approach to use?
This lesson teaches the most important skill in concurrent programming: making the right decision. You'll learn a decision framework that accounts for workload type, scale requirements, and isolation needs. You'll discover that no concurrency is often faster than bad concurrency, and you'll build benchmarking skills to validate your choices.
By the end of this lesson, you'll be able to look at a problem and confidently decide: "This needs single-threaded optimization," or "This needs free-threaded Python," or "This needs multiprocessing," and prove it with benchmarks.
Section 1: The Decision Framework (The Heart of This Lesson)
The most important table in concurrent programming is simple: workload type matters more than any other factor.
| Workload Type | Nature | Best Approach | Why | Python 3.14 Notes |
|---|---|---|---|---|
| CPU-Bound (Sequential) | Compute-heavy, single task | Single-threaded | No concurrency overhead; fastest | Still the baseline—measure it first |
| CPU-Bound (Parallel, Shared State) | Compute-heavy, parallel, needs shared memory | Free-Threaded Python | True parallelism without process overhead; 5-10% single-threaded cost for 2-4x parallel gain | New in 3.14 — this is revolutionary |
| CPU-Bound (Parallel, Isolated) | Compute-heavy, parallel, processes are independent | Multiprocessing | Process isolation when needed; forkserver default safer than fork | Improved in 3.14 — safer defaults |
| I/O-Bound (Low Concurrency) | Network/file I/O, event-driven, 10-100 concurrent tasks | Asyncio | Single-threaded event loop; no context switching overhead | Improved in 3.14 — CLI debugging (ps, pstree) |
| Hybrid (CPU + I/O) | Both CPU reasoning and I/O operations | Free-Threaded + Asyncio | Threads for CPU work; asyncio for I/O within/across threads | New in 3.14 — official support for combination |
Read this table carefully. This single framework drives 80% of concurrency decisions. Everything in this lesson builds from it.
💬 AI Colearning Prompt (15 min mark)
Let's test your understanding:
"Walk me through 10 real-world scenarios. For each, classify as CPU-bound (sequential/parallel) or I/O-bound. Then I'll guess which approach you'd choose. Then you ask me to justify my choice. Here are the scenarios: (1) PDF text extraction for 1000 documents, (2) web scraper downloading 100 pages, (3) local file compression with zlib, (4) REST API handling 50 concurrent requests, (5) matrix multiplication for ML training, (6) database query processing 1M rows, (7) AI agent reasoning task, (8) real-time stock price updates from APIs, (9) batch image resizing, (10) multi-agent negotiation. For each, tell me: Is this CPU or I/O? What's the best approach?"
Expected outcome: You'll develop intuition for the decision framework. You'll see patterns: anything involving network/file I/O is I/O-bound; anything computing without waiting is CPU-bound.
Section 2: Single-Threaded Execution (The Baseline You Must Measure)
Here's a truth that surprises many developers: no concurrency is often the fastest option.
Why? Concurrency adds overhead:
- Creating threads/processes (memory, startup time)
- Synchronization (locks, context switches)
- IPC (inter-process communication, serialization)
For small workloads or sequential operations, this overhead exceeds any benefit.
When Single-Threaded is Best
- Sequential Tasks: Processing items one after another where order matters
- Single Unit of Work: One large task that can't be parallelized
- Small Workloads: Where total runtime is milliseconds (overhead dominates)
- Memory Constrained: Creating extra processes/threads isn't viable
The Baseline Rule
ALWAYS measure single-threaded performance first. This is your baseline. Any concurrency approach must beat this baseline, accounting for overhead.
import time
def cpu_task(n: int) -> int:
"""CPU-bound task: calculate sum of square roots."""
total = 0
for i in range(n):
total += (i ** 2) ** 0.5
return total
def benchmark_single_threaded(n: int) -> float:
"""Single-threaded baseline."""
start = time.perf_counter()
result = cpu_task(n)
elapsed = time.perf_counter() - start
return elapsed
# Measure the baseline
if __name__ == "__main__":
baseline = benchmark_single_threaded(15**6)
print(f"Single-threaded baseline: {baseline:.3f}s")
# Later: any parallel approach must beat this to justify overhead
🎓 Instructor Commentary (20 min mark)
In AI-native development, you don't guess at concurrency. You measure the baseline first. Syntax and frameworks are cheap—understanding when overhead is justified is gold. Many developers add threading/multiprocessing and wonder why it's slower. The reason: they didn't measure the baseline.
Section 3: Free-Threaded Python (True Parallelism on Shared State)
When to use: CPU-bound work that needs parallelism WITH shared state
Free-threaded Python (introduced in Lesson 4) removes the GIL, enabling true parallel execution of Python threads. This is revolutionary because:
- No process isolation overhead
- No IPC/serialization
- Shared memory means data sharing is direct
- 5-10% single-threaded overhead for 2-4x parallel gains on 4 cores
Real Example: AI Multi-Agent System
Imagine a system where 4 AI agents collaborate on a problem, sharing reasoning state:
import threading
import time
from typing import Any
class Agent:
"""AI agent doing CPU-bound reasoning."""
def __init__(self, name: str, iterations: int = 15**6):
self.name = name
self.iterations = iterations
self.result = None
def reason(self) -> None:
"""CPU-intensive reasoning task."""
total = 0
for i in range(self.iterations):
total += (i ** 2) ** 0.5
self.result = total
def benchmark_free_threaded(num_agents: int = 4) -> float:
"""Benchmark: 4 agents reasoning in parallel."""
agents = [Agent(f"Agent-{i}") for i in range(num_agents)]
threads = [threading.Thread(target=agent.reason) for agent in agents]
start = time.perf_counter()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
elapsed = time.perf_counter() - start
return elapsed
# With free-threaded Python 3.14:
# Expected: ~3-4x speedup on 4-core machine (not the 4x you'd get without overhead)
if __name__ == "__main__":
free_threaded_time = benchmark_free_threaded(4)
print(f"Free-threaded (4 agents): {free_threaded_time:.3f}s")
The gain: On a 4-core machine with free-threaded Python, this runs in ~1/3 to 1/4 the time of single-threaded. The 5-10% overhead is more than compensated by true parallelism.
🚀 CoLearning Challenge (25 min mark)
Ask your AI Companion:
"Compare free-threaded Python to traditional threading with the GIL. Create a benchmark that shows: (1) traditional GIL threading stays around baseline time (no speedup), (2) free-threaded achieves 2-4x speedup. Explain why the GIL prevented the speedup and free-threading enables it."
Expected outcome: You'll understand viscerally why free-threading is revolutionary. You'll see that traditional threading doesn't actually parallelize CPU work.
Section 4: Multiprocessing (True Parallelism with Isolation)
When to use: CPU-bound work needing true parallelism AND process isolation
Multiprocessing creates separate Python processes, each with its own GIL. This:
- Achieves true parallelism (not blocked by GIL)
- Isolates processes (one crash doesn't kill all)
- Adds overhead (process creation, IPC, serialization)
Python 3.14 improves multiprocessing with:
- forkserver default: Safer than
fork(avoids deadlocks from threading states) - Process.interrupt(): Graceful shutdown of worker processes
- Context inheritance: Better control over initialization
Real Example: Distributed Processing
import multiprocessing
import time
def cpu_task(n: int) -> int:
"""Same CPU-bound task."""
total = 0
for i in range(n):
total += (i ** 2) ** 0.5
return total
def benchmark_multiprocessing(n: int, num_processes: int = 4) -> float:
"""Benchmark: 4 processes running CPU task in parallel."""
start = time.perf_counter()
with multiprocessing.Pool(num_processes) as pool:
# Each process gets a chunk of work
tasks = [n for _ in range(num_processes)]
results = pool.map(cpu_task, tasks)
elapsed = time.perf_counter() - start
return elapsed
if __name__ == "__main__":
mp_time = benchmark_multiprocessing(15**6, 4)
print(f"Multiprocessing (4 processes): {mp_time:.3f}s")
# Note: Process creation overhead makes this slower for small workloads
# But scales well for large, independent CPU tasks
✨ Teaching Tip (35 min mark)
Use Claude Code to explore multiprocessing edge cases:
"Show me what happens if a multiprocessing worker crashes. How does the Pool handle it? What if I want to interrupt all workers gracefully? Show me Python 3.14's Process.interrupt() in action."
This teaches you the exception handling and lifecycle management that matters in production.
Section 5: Asyncio (I/O-Bound Concurrency Without Threads)
When to use: I/O-bound work with event-driven architecture (10-1000 concurrent tasks)
Asyncio uses a single thread with an event loop. When one task waits for I/O (network, file), the loop switches to another task. This:
- Handles 100s of concurrent connections
- No thread overhead (single thread)
- Scales to high concurrency
- Limited by CPU (you need threads/processes for CPU work alongside I/O)
This connects directly to Chapter 28 (Asyncio fundamentals). Lesson 5 adds Python 3.14 improvements.
Python 3.14 Asyncio Improvements: CLI Debugging
Python 3.14 adds production-grade debugging tools for asyncio:
# Save as async_example.py
import asyncio
async def agent_task(name: str, duration: int) -> None:
"""Simulate an AI agent processing."""
print(f"{name} starting...")
await asyncio.sleep(duration)
print(f"{name} finished")
async def main() -> None:
"""Run 3 agents concurrently."""
tasks = [
asyncio.create_task(agent_task("Agent-1", 5)),
asyncio.create_task(agent_task("Agent-2", 3)),
asyncio.create_task(agent_task("Agent-3", 7))
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Run this, then inspect with Python 3.14's asyncio CLI tools:
# In another terminal, get the process ID
ps aux | grep async_example.py
# Show all running tasks (flat list)
python -m asyncio ps <PID>
# Show task hierarchy (tree structure)
python -m asyncio pstree <PID>
These commands show you:
- What tasks are currently running
- How long each has been running
- Stack traces for debugging deadlocks
💬 AI Colearning Prompt (42 min mark)
"I'm debugging an asyncio program that seems stuck. One task finished, but two others never completed. How do I use
python -m asyncio psandpstreeto diagnose what's happening? Show me the commands and explain what the output tells me."
Expected outcome: You'll learn to diagnose asyncio deadlocks and hangs—critical production skills.
Section 6: Python 3.14 Asyncio Improvements (Deep Dive)
Python 3.14 makes asyncio production-ready with three major additions:
1. Thread-Safe Event Loop
Previously, calling asyncio from threads was error-prone. Python 3.14 guarantees thread-safe event loop access:
import asyncio
import threading
async def async_task() -> str:
await asyncio.sleep(0.1)
return "Result from async"
def call_async_from_thread(loop: asyncio.AbstractEventLoop) -> None:
"""Call async code from a regular thread—now thread-safe in 3.14."""
future = asyncio.run_coroutine_threadsafe(async_task(), loop)
result = future.result()
print(f"Async result: {result}")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
thread = threading.Thread(target=call_async_from_thread, args=(loop,))
thread.start()
asyncio.set_event_loop(loop)
loop.run_until_complete(async_task())
thread.join()
2. Faster Event Loop Performance
Python 3.14 improves event loop speed by ~10-20% through optimizations in task switching. This compounds for systems handling 100s of concurrent connections.
3. Better Error Reporting
Error messages now clearly indicate which task failed and why, making debugging significantly faster.
🎓 Instructor Commentary (55 min mark)
Production systems live or die on debuggability. Python 3.14's asyncio CLI tools transform asyncio from "black box" to "transparent." You can now see exactly what's running, what's blocked, and why.
Section 7: Python 3.14 Multiprocessing Improvements
Python 3.14 improves multiprocessing with safer defaults and better control:
1. forkserver Default (Safer Initialization)
Multiprocessing on Unix/Linux has two creation methods:
- fork: Copies entire parent process (fast but unsafe if parent uses threads)
- spawn: Creates fresh child process (slow but safe)
- forkserver (NEW DEFAULT): Starts server process, forks from clean state (fast AND safe)
import multiprocessing
# Python 3.14 uses forkserver by default on Unix/Linux
# This is equivalent to explicit configuration in earlier versions:
if __name__ == "__main__":
# Explicitly set method (useful for clarity)
ctx = multiprocessing.get_context('forkserver')
process = ctx.Process(target=some_function)
process.start()
process.join()
2. Process.interrupt() for Graceful Shutdown
Previously, you had to use signals or terminate() (forceful). Python 3.14 adds interrupt:
import multiprocessing
import signal
def worker() -> None:
try:
for i in range(100):
print(f"Working... {i}")
time.sleep(1)
except KeyboardInterrupt:
print("Worker interrupted gracefully")
if __name__ == "__main__":
p = multiprocessing.Process(target=worker)
p.start()
time.sleep(5)
# Graceful interrupt (sends signal, waits for cleanup)
p.interrupt() # NEW in Python 3.14
p.join(timeout=2)
if p.is_alive():
p.terminate() # Only forceful kill if necessary
🚀 CoLearning Challenge (65 min mark)
Tell your AI Companion:
"Implement all 5 benchmarks (single-threaded, free-threaded, multiprocessing, asyncio, and hybrid free-threaded+asyncio) for a CPU task. Compare execution times on your machine. Which is fastest? Why? How does overhead change with workload size?"
Expected outcome: You'll build the benchmarking muscles needed to make production decisions. You'll see that overhead patterns vary with workload size.
Section 8: Benchmarking Methodology (Measure Before and After)
You can't choose the right approach without data. Here's the rigorous methodology:
Step 1: Define Your Task
import time
def benchmark_task(n: int) -> int:
"""CPU-bound task: sum of square roots."""
total = 0
for i in range(n):
total += (i ** 2) ** 0.5
return total
Step 2: Measure Baseline (Single-Threaded)
def measure_baseline(n: int, runs: int = 3) -> float:
"""Run multiple times; return average."""
times = []
for _ in range(runs):
start = time.perf_counter()
benchmark_task(n)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
print(f"Baseline: avg={avg:.3f}s, min={min_time:.3f}s, max={max_time:.3f}s")
return avg
Step 3: Measure Each Approach
def measure_approach(approach_name: str, func, n: int, runs: int = 3) -> tuple[float, float]:
"""Measure any approach and compare to baseline."""
times = []
for _ in range(runs):
start = time.perf_counter()
func(n)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg = sum(times) / len(times)
speedup = baseline / avg # How much faster than baseline?
print(f"{approach_name}: avg={avg:.3f}s, speedup={speedup:.2f}x")
return avg, speedup
Step 4: Interpret Results
| Speedup | Interpretation | Decision |
|---|---|---|
| < 1.0x | Slower than baseline | Don't use this approach |
| 1.0-1.2x | Overhead not justified | Stick with simpler approach |
| 1.2-2.0x | Worth considering; check overhead for your scale | Use if workload is large |
| > 2.0x | Clear winner; go with this | Use in production |
💬 AI Colearning Prompt (58 min mark)
"Ask your AI: What factors besides execution time matter when choosing a concurrency approach? Consider: memory usage (how many processes/threads can I create?), complexity (how hard to debug?), latency (is 95th percentile important?), and maintainability (can my team understand this code?)."
Expected outcome: You'll see that the decision framework isn't just about speed—it's about holistic trade-offs.
Section 9: Real-World AI Workload Patterns
Let's connect this to AI systems (the book's focus):
Pattern 1: AI Inference (CPU-Bound, Parallel)
- Nature: Running inference on multiple inputs (batch processing)
- Best Approach: Free-threaded Python
- Why: CPU-intensive, needs parallelism, agents share model state
- Example: 4 agents analyzing documents in parallel
Pattern 2: Web API Backend (I/O-Bound, Event-Driven)
- Nature: Handling 50-500 concurrent API requests
- Best Approach: Asyncio
- Why: Network I/O dominates; event loop handles concurrency elegantly
- Example: REST API serving AI inference requests
Pattern 3: Data Pipeline (CPU + I/O Mixed)
- Nature: Fetch data (I/O), process (CPU), store (I/O)
- Best Approach: Free-threaded + Asyncio Hybrid
- Why: Threads handle CPU; asyncio handles I/O concurrency
- Example: Ingest data from APIs, process with agents, save results
Pattern 4: Distributed AI (Multi-Machine)
- Nature: Train/infer across multiple servers
- Best Approach: Multiprocessing (local) + Kubernetes (distributed)
- Why: Process isolation; distributed orchestration
- Example: Part 13 content (Dapr actors)
✨ Teaching Tip (72 min mark)
Ask Claude Code:
"Design a production AI system: 10 agents (5 do NLP inference, 5 fetch data from APIs). Which concurrency approach for each group? How do they interact? Draw an architecture diagram."
This teaches you to think in systems, not isolated components.
Section 10: Hybrid Approaches (Free-Threaded + Asyncio)
For complex systems with both CPU and I/O work, combine free-threading and asyncio:
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
async def io_task(url: str) -> str:
"""Simulate I/O-bound work (network fetch)."""
# In real code, use aiohttp; here we simulate
await asyncio.sleep(1) # Represents network latency
return f"Data from {url}"
def cpu_task(data: str) -> str:
"""CPU-bound work in thread pool."""
# Simulate CPU-intensive processing
result = 0
for i in range(10**6):
result += (i ** 2) ** 0.5
return f"Processed: {data}"
async def hybrid_workflow() -> None:
"""Combine I/O (asyncio) and CPU (threads) work."""
# Create thread pool for CPU work
with ThreadPoolExecutor(max_workers=4) as executor:
# Fetch data concurrently (asyncio)
urls = ["url1", "url2", "url3", "url4"]
fetch_tasks = [io_task(url) for url in urls]
data_results = await asyncio.gather(*fetch_tasks)
# Process each in thread pool (CPU work)
loop = asyncio.get_event_loop()
process_tasks = [
loop.run_in_executor(executor, cpu_task, data)
for data in data_results
]
final_results = await asyncio.gather(*process_tasks)
for result in final_results:
print(result)
if __name__ == "__main__":
asyncio.run(hybrid_workflow())
This pattern scales to production:
- Asyncio handles 100s of concurrent I/O operations
- Threads handle CPU work without blocking I/O
- Free-threading (Python 3.14) ensures threads actually parallelize
Section 11: Decision Tree Summary
Use this decision tree when facing a new problem:
-
Is the workload I/O-bound? (network, file, database)
- YES → Asyncio (unless you need CPU parallelism too)
- NO → Continue to step 2
-
Is the workload CPU-bound and needs parallelism?
- YES → Continue to step 3
- NO → Single-threaded (no overhead needed)
-
Do the parallel workers need to share memory/state?
- YES → Free-Threaded Python (Python 3.14+)
- NO → Continue to step 4
-
Do processes need isolation or are they independent?
- Isolation needed → Multiprocessing (with forkserver)
- Independent → Multiprocessing (any method)
-
Do you have both CPU and I/O work?
- YES → Free-Threaded + Asyncio Hybrid
- NO → You've already chosen in steps 1-4
🎓 Instructor Commentary (78 min mark)
Memorizing this tree won't help you. Understanding the WHY behind each decision makes you production-ready. Every choice reflects trade-offs: overhead vs speedup, isolation vs sharing, simplicity vs performance.
Try With AI
Prompt 1: Recall (Remember) - 3 minutes
"List the 5 concurrency approaches covered in this lesson and their primary use cases. Ask Claude Code to validate your list and add one insight you might have missed."
Expected outcome: You can recall the decision framework.
Example correct answer:
- Single-threaded: When no concurrency needed
- Free-threaded: CPU-parallel with shared state
- Multiprocessing: CPU-parallel with isolation
- Asyncio: I/O-bound event-driven
- Hybrid: Both CPU and I/O
Prompt 2: Explain (Understand) - 3 minutes
"Ask your AI: For each of the 5 approaches, explain the memory model (shared vs separate) and when it matters. Include Python 3.14 improvements for each."
Expected outcome: You understand architectural differences, not just names.
Example correct answer:
- Free-threaded: Shared memory within process; data sharing is direct; matters when agents collaborate
- Multiprocessing: Separate memory per process; data must be serialized; matters for isolation/crash safety
Prompt 3: Apply - Benchmark (5 minutes)
"Tell your AI: I have a web scraper that downloads 100 pages and extracts text (CPU-intensive). Should I use asyncio for downloads and free-threaded for processing? Implement a benchmark to prove which approach is fastest: (a) single-threaded sequential, (b) asyncio downloads + sync processing, (c) asyncio downloads + threaded processing. Show time for each."
Expected outcome: You'll see that combining asyncio (I/O) with threading (CPU) beats either alone.
Expected output: Benchmark comparing 3 approaches; asyncio+threaded should be fastest.
Prompt 4: Analyze - Production System (4 minutes)
"Ask your AI: Design a production AI system with these components: (1) 10 agents doing NLP (CPU-bound), (2) 5 agents fetching live data (I/O-bound), (3) coordinator that orchestrates them. Which concurrency approach for each component? Why? How would you deploy this on Kubernetes (preview: Part 11)? Connect to the decision framework from this lesson."
Expected outcome: You integrate all lesson concepts into a real-world design.
Expected answer should include:
- Agents doing NLP: Free-threaded Python
- Fetching live data: Asyncio
- Coordinator: Main thread orchestrating both
- Kubernetes: Each component as separate pod (Part 11 detail)
- Decision framework: Applied at each architectural choice
Next Steps
You now have the knowledge to make concurrency decisions confidently. Lesson 6 (Capstone) is where you'll build a full multi-agent system using these approaches, complete with a benchmarking dashboard. You'll see these concepts in action at production scale.
But first: practice the decision framework on your own problems. When you face a new workload, think through the decision tree. Measure baselines. Benchmark approaches. Let data guide your choice.