跳到主要内容

并发编程

并发编程是 Python 中重要的主题,它允许程序同时执行多个任务,提高程序的性能和响应能力。本章将介绍多线程、多进程和异步编程三种并发方式。

并发基础

并发 vs 并行

# 并发 (Concurrency): 同时处理多个任务的能力
# - 任务在同一时间段内交替执行
# - 适合 I/O 密集型任务
# - 单核 CPU 也能实现并发

# 并行 (Parallelism): 同时执行多个任务的能力
# - 任务在同一时刻真正同时执行
# - 适合 CPU 密集型任务
# - 需要多核 CPU

# 图示说明:
"""
并发:
任务1: ====== ======
任务2: ====== ======
时间 →

并行:
任务1: =================
任务2: =================
时间 →
"""

CPU 密集型 vs I/O 密集型

# CPU 密集型任务
def cpu_bound_task(n):
"""CPU 密集型任务 - 大量计算"""
total = 0
for i in range(n):
total += i ** 2
return total

# I/O 密集型任务
import time

def io_bound_task():
"""I/O 密集型任务 - 大量等待"""
time.sleep(1) # 模拟 I/O 等待
return "完成"

# 选择并发方式:
# - CPU 密集型: 使用多进程 (绕过 GIL)
# - I/O 密集型: 使用多线程或异步编程 (节省资源)

Python 并发方式对比

"""
并发方式对比:

1. 多线程 (threading):
- 优点: 轻量级,切换快,共享内存
- 缺点: 受 GIL 限制,不适合 CPU 密集型
- 适用: I/O 密集型任务

2. 多进程 (multiprocessing):
- 优点: 绕过 GIL,真正并行,充分利用多核
- 缺点: 重量级,进程间通信复杂,资源消耗大
- 适用: CPU 密集型任务

3. 异步编程 (asyncio):
- 优点: 高效 I/O,资源占用少,单线程并发
- 缺点: 代码复杂,不适合 CPU 密集型
- 适用: 高并发 I/O 密集型任务
"""

多线程

threading 模块

创建线程

import threading
import time

def worker(name, delay):
"""工作线程函数"""
print(f"线程 {name} 开始")
time.sleep(delay)
print(f"线程 {name} 完成")

# 方法一: 创建 Thread 对象
thread1 = threading.Thread(target=worker, args=("A", 2))
thread2 = threading.Thread(target=worker, args=("B", 1))

# 启动线程
thread1.start()
thread2.start()

# 等待线程完成
thread1.join()
thread2.join()

print("主线程完成")

# 方法二: 继承 Thread 类
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay

def run(self):
"""线程执行的方法"""
print(f"线程 {self.name} 开始")
time.sleep(self.delay)
print(f"线程 {self.name} 完成")

# 使用自定义线程
thread3 = MyThread("C", 1.5)
thread3.start()
thread3.join()

线程属性和方法

import threading
import time

def show_thread_info():
"""显示线程信息"""
print(f"线程名称: {threading.current_thread().name}")
print(f"线程 ID: {threading.get_ident()}")
print(f"是否存活: {threading.current_thread().is_alive()}")
print(f"是否为主线程: {threading.current_thread() is threading.main_thread()}")

# 主线程信息
show_thread_info()
print("-" * 30)

# 子线程信息
def worker():
show_thread_info()

thread = threading.Thread(target=worker, name="WorkerThread")
thread.start()
thread.join()

# 常用属性和方法:
print(f"当前活动线程数: {threading.active_count()}") # 活动线程数
print(f"主线程对象: {threading.main_thread()}") # 主线程对象
print(f"当前线程对象: {threading.current_thread()}") # 当前线程

线程间共享数据

import threading

# 全局变量在线程间共享
counter = 0

def increment():
"""增加计数器"""
global counter
for _ in range(100000):
counter += 1

# 创建多个线程
threads = []
for i in range(5):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()

# 等待所有线程完成
for thread in threads:
thread.join()

print(f"计数器最终值: {counter}")
# 注意: 结果可能不是 500000,因为没有使用锁保护

Thread 类详解

守护线程

import threading
import time

def daemon_worker():
"""守护线程"""
print("守护线程开始")
time.sleep(2)
print("守护线程结束")

def normal_worker():
"""普通线程"""
print("普通线程开始")
time.sleep(1)
print("普通线程结束")

# 守护线程: 会在主线程结束时自动退出
daemon_thread = threading.Thread(target=daemon_worker, daemon=True)
# 或者
# daemon_thread = threading.Thread(target=daemon_worker)
# daemon_thread.daemon = True

# 普通线程: 主线程会等待其完成
normal_thread = threading.Thread(target=normal_worker)

daemon_thread.start()
normal_thread.start()

# 主线程不会等待守护线程
# normal_thread.join() # 只等待普通线程

print("主线程结束")

线程名称和标识

import threading

def worker():
print(f"线程名称: {threading.current_thread().name}")
print(f"线程标识符: {threading.current_thread().ident}")

# 为线程设置名称
thread1 = threading.Thread(target=worker, name="Worker-1")
thread2 = threading.Thread(target=worker, name="Worker-2")

thread1.start()
thread2.start()

thread1.join()
thread2.join()

# 未命名的线程有默认名称
thread3 = threading.Thread(target=worker)
print(f"默认名称: {thread3.name}")
thread3.start()
thread3.join()

线程同步

锁 (Lock)

import threading

# 创建锁
lock = threading.Lock()
counter = 0

def safe_increment():
"""线程安全的计数器"""
global counter
# 获取锁
lock.acquire()
try:
# 临界区代码
current = counter
counter = current + 1
finally:
# 释放锁
lock.release()

# 使用 with 语句自动获取和释放锁
def safe_increment_with():
"""线程安全的计数器 (使用 with)"""
global counter
with lock:
current = counter
counter = current + 1

# 创建多个线程
threads = []
for i in range(100):
thread = threading.Thread(target=safe_increment_with)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

print(f"计数器最终值: {counter}") # 100

可重入锁 (RLock)

import threading

# 可重入锁: 同一个线程可以多次获取同一个锁
rlock = threading.RLock()

def recursive_function(n):
"""递归函数使用可重入锁"""
with rlock:
print(f"递归层级: {n}")
if n > 0:
recursive_function(n - 1)

recursive_function(3)
# 输出:
# 递归层级: 3
# 递归层级: 2
# 递归层级: 1
# 递归层级: 0

# 普通锁会导致死锁
"""
lock = threading.Lock()

def deadlock_example():
with lock:
print("第一次获取锁")
with lock: # 死锁!
print("第二次获取锁")

# deadlock_example() # 永远阻塞
"""

信号量 (Semaphore)

import threading
import time

# 信号量: 限制同时访问资源的线程数量
semaphore = threading.Semaphore(3) # 最多3个线程同时访问

def worker(worker_id):
"""工作线程"""
print(f"Worker {worker_id} 尝试获取资源")
with semaphore:
print(f"Worker {worker_id} 获取资源")
time.sleep(1)
print(f"Worker {worker_id} 释放资源")

# 创建10个线程,但同时只有3个能访问资源
threads = []
for i in range(10):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

事件 (Event)

import threading
import time

# 事件: 线程间简单通信机制
event = threading.Event()

def waiter():
"""等待事件的线程"""
print("等待事件...")
event.wait() # 阻塞直到事件被设置
print("事件已触发,继续执行")

def setter():
"""设置事件的线程"""
time.sleep(2)
print("设置事件")
event.set() # 触发事件

# 创建线程
thread1 = threading.Thread(target=waiter)
thread2 = threading.Thread(target=setter)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

# 事件的其他方法
print(f"事件是否设置: {event.is_set()}")

event.clear() # 清除事件
print(f"清除后: {event.is_set()}")

# 带超时的等待
event.wait(timeout=1) # 等待1秒,超时后自动返回

条件变量 (Condition)

import threading
import time
import random

# 条件变量: 用于复杂的线程同步
condition = threading.Condition()
queue = []

def producer():
"""生产者"""
for i in range(5):
time.sleep(random.random())
with condition:
item = f"Item-{i}"
queue.append(item)
print(f"生产: {item}")
condition.notify() # 通知一个等待的消费者

def consumer():
"""消费者"""
while True:
with condition:
if not queue:
print("队列为空,等待...")
condition.wait() # 等待通知

item = queue.pop(0)
print(f"消费: {item}")

if item == "Item-4":
break

# 创建生产者和消费者
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

屏障 (Barrier)

import threading
import time

# 屏障: 等待固定数量的线程都到达某个点后再继续
barrier = threading.Barrier(3) # 等待3个线程

def worker(worker_id):
"""工作线程"""
print(f"Worker {worker_id} 开始工作")
time.sleep(worker_id)
print(f"Worker {worker_id} 到达屏障点")
barrier.wait() # 等待其他线程
print(f"Worker {worker_id} 通过屏障,继续执行")

# 创建3个线程
threads = []
for i in range(3):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

print("所有线程都完成了")

锁机制

死锁及其避免

import threading

# 死锁示例
lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1_worker():
"""线程1"""
with lock1:
print("线程1: 获取锁1")
import time
time.sleep(0.1)
print("线程1: 尝试获取锁2")
with lock2:
print("线程1: 获取两个锁")

def thread2_worker():
"""线程2"""
with lock2:
print("线程2: 获取锁2")
print("线程2: 尝试获取锁1")
with lock1:
print("线程2: 获取两个锁")

# 这种情况会导致死锁
# thread1 = threading.Thread(target=thread1_worker)
# thread2 = threading.Thread(target=thread2_worker)
# thread1.start()
# thread2.start()

# 避免死锁的方法:

# 方法一: 按固定顺序获取锁
def safe_thread1():
with lock1:
with lock2: # 始终先获取 lock1,再获取 lock2
print("安全线程1")

def safe_thread2():
with lock1: # 也先获取 lock1
with lock2:
print("安全线程2")

# 方法二: 使用超时
def safe_thread_with_timeout():
acquired = False
try:
acquired = lock1.acquire(timeout=1)
if acquired:
acquired = lock2.acquire(timeout=1)
if acquired:
print("获取两个锁成功")
finally:
if acquired:
lock2.release()
lock1.release()

锁的性能影响

import threading
import time

# 锁会降低性能,但保证安全性
lock = threading.Lock()
counter = 0

def increment_with_lock():
global counter
with lock:
counter += 1

def increment_without_lock():
global counter
counter += 1 # 不安全,但更快

# 性能测试
def test_with_lock():
start = time.time()
threads = []
for _ in range(1000):
thread = threading.Thread(target=increment_with_lock)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return time.time() - start

def test_without_lock():
start = time.time()
threads = []
for _ in range(1000):
thread = threading.Thread(target=increment_without_lock)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return time.time() - start

print(f"使用锁耗时: {test_with_lock():.4f}秒")
print(f"不使用锁耗时: {test_without_lock():.4f}秒")
# 注意: 不使用锁的结果可能不正确,但速度更快

线程池

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
import time
import threading

def task(name, delay):
"""任务函数"""
print(f"{name} 开始 (线程: {threading.current_thread().name})")
time.sleep(delay)
return f"{name} 完成"

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
future1 = executor.submit(task, "Task-1", 1)
future2 = executor.submit(task, "Task-2", 2)
future3 = executor.submit(task, "Task-3", 1.5)

# 获取结果
print(future1.result()) # 阻塞直到任务完成
print(future2.result())
print(future3.result())

print("\n使用 map 批量提交任务:")

# 使用 map 批量提交
with ThreadPoolExecutor(max_workers=3) as executor:
tasks = ["A", "B", "C", "D", "E"]
delays = [1, 2, 1.5, 1, 2]

# map 会按顺序返回结果
results = executor.map(task, tasks, delays)

for result in results:
print(result)

as_completed 和 wait

from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time

def task(name, delay):
time.sleep(delay)
return f"{name} ({delay}秒)"

# 使用 as_completed 按完成顺序获取结果
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(task, name, delay): name
for name, delay in [("A", 2), ("B", 1), ("C", 1.5)]
}

for future in as_completed(futures):
name = futures[future]
try:
result = future.result()
print(f"完成: {result}")
except Exception as e:
print(f"{name} 抛出异常: {e}")

print("\n使用 wait 等待所有任务:")

# 使用 wait 等待所有任务完成
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(task, f"Task-{i}", i)
for i in range(3, 0, -1)
]

# 等待所有任务完成
completed, not_completed = wait(futures)

print(f"已完成: {len(completed)}")
print(f"未完成: {len(not_completed)}")

线程池最佳实践

from concurrent.futures import ThreadPoolExecutor
import time

def cpu_task(n):
"""CPU 密集型任务"""
total = 0
for i in range(n):
total += i ** 2
return total

def io_task(seconds):
"""I/O 密集型任务"""
time.sleep(seconds)
return f"等待了 {seconds} 秒"

# 确定合适的线程数量
# 对于 I/O 密集型任务: 线程数可以大于 CPU 核心数
# 对于 CPU 密集型任务: 线程数通常等于 CPU 核心数

import os
cpu_count = os.cpu_count()
print(f"CPU 核心数: {cpu_count}")

# I/O 密集型: 可以使用更多线程
print("I/O 密集型任务:")
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(io_task, [1, 1, 1, 1, 1]))
for result in results:
print(result)

# CPU 密集型: 使用核心数的线程
print("\nCPU 密集型任务:")
with ThreadPoolExecutor(max_workers=cpu_count) as executor:
results = list(executor.map(cpu_task, [10000] * 5))
print(f"结果: {results[:2]}...")

多进程

multiprocessing 模块

创建进程

import multiprocessing
import time

def worker(name, delay):
"""工作进程函数"""
print(f"进程 {name} 开始 (PID: {multiprocessing.current_process().pid})")
time.sleep(delay)
print(f"进程 {name} 完成")

if __name__ == "__main__":
# Windows 下必须使用 if __name__ == "__main__" 保护

# 方法一: 创建 Process 对象
process1 = multiprocessing.Process(target=worker, args=("A", 2))
process2 = multiprocessing.Process(target=worker, args=("B", 1))

# 启动进程
process1.start()
process2.start()

# 等待进程完成
process1.join()
process2.join()

print("主进程完成")

# 方法二: 继承 Process 类
class MyProcess(multiprocessing.Process):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay

def run(self):
"""进程执行的方法"""
print(f"进程 {self.name} 开始")
time.sleep(self.delay)
print(f"进程 {self.name} 完成")

process3 = MyProcess("C", 1.5)
process3.start()
process3.join()

进程属性和方法

import multiprocessing

def show_process_info():
"""显示进程信息"""
current_process = multiprocessing.current_process()
print(f"进程名称: {current_process.name}")
print(f"进程 PID: {current_process.pid}")
print(f"进程是否存活: {current_process.is_alive()}")

if __name__ == "__main__":
# 主进程信息
print("=== 主进程 ===")
show_process_info()
print(f"当前进程 PID: {multiprocessing.current_process().pid}")
print(f"父进程 PID: {multiprocessing.parent_process().pid}")

print("\n=== 子进程 ===")

def child_worker():
show_process_info()

child_process = multiprocessing.Process(target=child_worker)
child_process.start()
child_process.join()

# 获取 CPU 核心数
print(f"\nCPU 核心数: {multiprocessing.cpu_count()}")

Process 类详解

守护进程

import multiprocessing
import time

def daemon_worker():
"""守护进程"""
print("守护进程开始")
time.sleep(2)
print("守护进程结束")

def normal_worker():
"""普通进程"""
print("普通进程开始")
time.sleep(1)
print("普通进程结束")

if __name__ == "__main__":
# 守护进程: 会在主进程结束时自动退出
daemon_process = multiprocessing.Process(target=daemon_worker, daemon=True)

# 普通进程: 主进程会等待其完成
normal_process = multiprocessing.Process(target=normal_worker)

daemon_process.start()
normal_process.start()

# 主进程不会等待守护进程
normal_process.join() # 只等待普通进程

print("主进程结束")

进程间数据隔离

import multiprocessing

# 进程间内存不共享
counter = 0

def increment():
"""尝试增加全局变量"""
global counter
for _ in range(100000):
counter += 1
print(f"子进程中的 counter: {counter}")

if __name__ == "__main__":
counter = 0
process = multiprocessing.Process(target=increment)
process.start()
process.join()

print(f"主进程中的 counter: {counter}")
# 主进程和子进程的 counter 是不同的变量

进程间通信

Queue (队列)

import multiprocessing

def producer(queue):
"""生产者"""
for i in range(5):
item = f"Item-{i}"
queue.put(item)
print(f"生产: {item}")
queue.put(None) # 发送结束信号

def consumer(queue):
"""消费者"""
while True:
item = queue.get()
if item is None: # 收到结束信号
break
print(f"消费: {item}")
queue.task_done()

if __name__ == "__main__":
# 创建队列
queue = multiprocessing.Queue()

# 创建生产者和消费者
producer_process = multiprocessing.Process(target=producer, args=(queue,))
consumer_process = multiprocessing.Process(target=consumer, args=(queue,))

producer_process.start()
consumer_process.start()

producer_process.join()
consumer_process.join()

print("生产消费完成")

Pipe (管道)

import multiprocessing

def sender(conn):
"""发送者"""
messages = ["Hello", "World", "EOF"]
for msg in messages:
conn.send(msg)
print(f"发送: {msg}")
conn.close()

def receiver(conn):
"""接收者"""
while True:
try:
msg = conn.recv()
if msg == "EOF":
break
print(f"接收: {msg}")
except EOFError:
break
conn.close()

if __name__ == "__main__":
# 创建管道 (双向)
parent_conn, child_conn = multiprocessing.Pipe()

# 创建进程
sender_process = multiprocessing.Process(target=sender, args=(child_conn,))
receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))

sender_process.start()
receiver_process.start()

sender_process.join()
receiver_process.join()

print("通信完成")

Manager (共享对象)

import multiprocessing

def worker(shared_list, shared_dict, shared_value):
"""工作进程"""
shared_list.append(f"Item-{multiprocessing.current_process().pid}")
shared_dict[f"PID-{multiprocessing.current_process().pid}"] = "完成"
shared_value.value += 1

if __name__ == "__main__":
# 使用 Manager 创建共享对象
with multiprocessing.Manager() as manager:
shared_list = manager.list()
shared_dict = manager.dict()
shared_value = manager.Value('i', 0)

# 创建多个进程
processes = []
for i in range(5):
process = multiprocessing.Process(
target=worker,
args=(shared_list, shared_dict, shared_value)
)
processes.append(process)
process.start()

for process in processes:
process.join()

print(f"共享列表: {list(shared_list)}")
print(f"共享字典: {dict(shared_dict)}")
print(f"共享值: {shared_value.value}")

共享内存 (Value 和 Array)

import multiprocessing

def increment_counter(counter):
"""增加计数器"""
with counter.get_lock(): # 获取锁
counter.value += 1

def add_to_array(arr, index, value):
"""向数组添加值"""
with arr.get_lock():
arr[index] = value

if __name__ == "__main__":
# 创建共享内存
counter = multiprocessing.Value('i', 0) # 'i' 表示整数
array = multiprocessing.Array('i', [0] * 5) # 整数数组

# 创建多个进程
processes = []
for i in range(100):
process = multiprocessing.Process(target=increment_counter, args=(counter,))
processes.append(process)
process.start()

for process in processes:
process.join()

print(f"计数器最终值: {counter.value}")

# 使用数组
for i in range(5):
process = multiprocessing.Process(
target=add_to_array,
args=(array, i, i * 10)
)
process.start()
process.join()

print(f"数组内容: {array[:]}")

进程池

Pool 基本使用

import multiprocessing
import time

def task(name, delay):
"""任务函数"""
print(f"{name} 开始 (PID: {multiprocessing.current_process().pid})")
time.sleep(delay)
return f"{name} 完成"

if __name__ == "__main__":
# 创建进程池
with multiprocessing.Pool(processes=3) as pool:
# 方法一: apply (同步执行)
result1 = pool.apply(task, args=("Apply-1", 1))
print(f"Apply 结果: {result1}")

print("\n方法二: apply_async (异步执行)")
# 方法二: apply_async (异步执行)
result2 = pool.apply_async(task, args=("Async-1", 1))
result3 = pool.apply_async(task, args=("Async-2", 2))

print(f"Async 结果1: {result2.get()}")
print(f"Async 结果2: {result3.get()}")

print("\n方法三: map (批量同步)")
# 方法三: map (批量同步)
results = pool.map(task, ["Map-1", "Map-2", "Map-3"], [1, 1.5, 1])
print(f"Map 结果: {results}")

print("\n方法四: map_async (批量异步)")
# 方法四: map_async (批量异步)
async_result = pool.map_async(
task,
["AMap-1", "AMap-2", "AMap-3"],
[1, 1.5, 1]
)
print(f"Async Map 结果: {async_result.get()}")

imap 和 istarmap

import multiprocessing
import time

def heavy_task(x):
"""耗时任务"""
time.sleep(0.5)
return x ** 2

if __name__ == "__main__":
numbers = range(10)

# imap: 迭代器版本,按顺序返回结果
print("使用 imap:")
with multiprocessing.Pool(processes=3) as pool:
for result in pool.imap(heavy_task, numbers):
print(f"结果: {result}")

# imap_unordered: 不保证顺序,但更快
print("\n使用 imap_unordered:")
with multiprocessing.Pool(processes=3) as pool:
for result in pool.imap_unordered(heavy_task, numbers):
print(f"结果: {result}")

# istarmap: 处理多个参数
print("\n使用 istarmap:")
def multi_task(x, y):
time.sleep(0.3)
return x + y

data = [(1, 2), (3, 4), (5, 6), (7, 8)]
with multiprocessing.Pool(processes=2) as pool:
results = pool.istarmap(multi_task, data)
for result in results:
print(f"结果: {result}")

进程池最佳实践

import multiprocessing
import time

def cpu_task(n):
"""CPU 密集型任务"""
total = 0
for i in range(n):
total += i ** 2
return total

if __name__ == "__main__":
# CPU 密集型任务: 进程数等于 CPU 核心数
cpu_count = multiprocessing.cpu_count()
print(f"CPU 核心数: {cpu_count}")

# 测试不同进程数的效果
for num_processes in [1, 2, cpu_count, cpu_count * 2]:
start_time = time.time()

with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(cpu_task, [100000] * 8)

elapsed = time.time() - start_time
print(f"{num_processes} 个进程耗时: {elapsed:.2f}秒")

# 最佳实践:
# 1. CPU 密集型: 进程数 = CPU 核心数
# 2. 进程创建开销大,使用进程池复用
# 3. 任务数据量要适中,避免通信开销过大

异步编程

asyncio 基础

事件循环

import asyncio
import time

# 基础事件循环
async def main():
"""主协程"""
print("Hello")
await asyncio.sleep(1)
print("World")

# 运行协程
if __name__ == "__main__":
# Python 3.7+
asyncio.run(main())

# 旧版本方式
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
# loop.close()

# 获取当前事件循环
async def get_current_loop():
"""获取当前事件循环"""
loop = asyncio.get_running_loop()
print(f"当前事件循环: {loop}")

协程 (Coroutine)

import asyncio

async def simple_coroutine():
"""简单协程"""
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
return "返回值"

async def main():
# 创建协程对象
coro = simple_coroutine()
print(f"协程对象: {coro}")

# 运行协程
result = await coro
print(f"协程结果: {result}")

# 运行
asyncio.run(main())

# 协程的特点:
# 1. 使用 async def 定义
# 2. 使用 await 调用其他协程
# 3. 必须在事件循环中运行

任务 (Task)

import asyncio

async def task(name, delay):
"""任务协程"""
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name} 结果"

async def main():
# 创建任务
task1 = asyncio.create_task(task("Task-1", 2))
task2 = asyncio.create_task(task("Task-2", 1))
task3 = asyncio.create_task(task("Task-3", 1.5))

print("任务已创建,开始等待...")

# 等待任务完成
result1 = await task1
result2 = await task2
result3 = await task3

print(f"任务1: {result1}")
print(f"任务2: {result2}")
print(f"任务3: {result3}")

asyncio.run(main())

# Task vs Coroutine:
# - Task 是已安排运行的协程
# - Task 会被立即调度执行
# - 多个 Task 可以并发执行

async/await

基本语法

import asyncio
import time

async def say_hello(name, delay):
"""异步函数"""
print(f"{name} 开始等待")
await asyncio.sleep(delay) # 异步等待
print(f"{name} 说 Hello!")
return f"{name} 完成"

async def main():
# 顺序执行 (总耗时 = sum)
print("=== 顺序执行 ===")
start = time.time()
result1 = await say_hello("Alice", 1)
result2 = await say_hello("Bob", 1)
result3 = await say_hello("Charlie", 1)
print(f"总耗时: {time.time() - start:.2f}秒")

# 并发执行 (总耗时 = max)
print("\n=== 并发执行 ===")
start = time.time()
results = await asyncio.gather(
say_hello("Alice", 1),
say_hello("Bob", 1),
say_hello("Charlie", 1)
)
print(f"总耗时: {time.time() - start:.2f}秒")
print(f"结果: {results}")

asyncio.run(main())

awaitable 对象

import asyncio

# await 可以用于三种对象:
# 1. 协程 (Coroutine)
# 2. 任务 (Task)
# 3. Future

async def coroutine_example():
"""协程示例"""
await asyncio.sleep(1)
return "协程结果"

async def task_example():
"""任务示例"""
task = asyncio.create_task(coroutine_example())
result = await task # 等待任务
return result

async def future_example():
"""Future 示例"""
loop = asyncio.get_running_loop()
future = loop.create_future()

# 设置 Future 结果
async def set_future():
await asyncio.sleep(1)
future.set_result("Future 结果")

asyncio.create_task(set_future())
result = await future # 等待 Future
return result

async def main():
print("1. 协程:")
result1 = await coroutine_example()
print(result1)

print("\n2. 任务:")
result2 = await task_example()
print(result2)

print("\n3. Future:")
result3 = await future_example()
print(result3)

asyncio.run(main())

事件循环

事件循环方法

import asyncio

async def callback_example():
"""回调示例"""
loop = asyncio.get_running_loop()

# call_soon: 尽快调用回调
def callback():
print("回调函数执行")

loop.call_soon(callback)

# call_later: 延迟调用
def delayed_callback():
print("延迟回调执行")

loop.call_later(1, delayed_callback)

# call_at: 指定时间调用
current_time = loop.time()
def scheduled_callback():
print("定时回调执行")

loop.call_at(current_time + 2, scheduled_callback)

await asyncio.sleep(2.5)

asyncio.run(callback_example())

# 事件生命周期:
# 1. 创建事件循环
# 2. 运行协程/任务
# 3. 处理事件
# 4. 关闭事件循环

任务调度

import asyncio

async def delayed_task(name, delay):
"""延迟任务"""
await asyncio.sleep(delay)
print(f"{name} 执行")

async def main():
# 创建多个任务
task1 = asyncio.create_task(delayed_task("Task-1", 1))
task2 = asyncio.create_task(delayed_task("Task-2", 2))
task3 = asyncio.create_task(delayed_task("Task-3", 1.5))

# 等待所有任务完成
await asyncio.gather(task1, task2, task3)

print("所有任务完成")

# 使用 wait 控制任务
print("\n使用 wait:")
tasks = [
asyncio.create_task(delayed_task(f"Wait-{i}", i))
for i in range(3, 0, -1)
]

# 等待所有任务完成
done, pending = await asyncio.wait(tasks)
print(f"完成: {len(done)}, 待定: {len(pending)}")

asyncio.run(main())

协程

协程进阶

import asyncio

async def producer(queue, n):
"""生产者协程"""
for i in range(n):
item = f"Item-{i}"
await asyncio.sleep(0.5)
await queue.put(item)
print(f"生产: {item}")
await queue.put(None) # 结束信号

async def consumer(queue, name):
"""消费者协程"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(1)
print(f"{name} 消费: {item}")
queue.task_done()

async def main():
# 创建队列
queue = asyncio.Queue()

# 创建生产者和消费者
producer_task = asyncio.create_task(producer(queue, 5))
consumer_tasks = [
asyncio.create_task(consumer(queue, f"Consumer-{i}"))
for i in range(2)
]

# 等待完成
await producer_task
await queue.join()

# 取消消费者
for task in consumer_tasks:
task.cancel()

# 等待取消完成
await asyncio.gather(*consumer_tasks, return_exceptions=True)

print("生产消费完成")

asyncio.run(main())

协程并发模式

import asyncio

async def fetch_data(id, delay):
"""模拟获取数据"""
await asyncio.sleep(delay)
return f"Data-{id}"

# 模式一: gather (收集结果)
async def pattern_gather():
"""gather 模式"""
results = await asyncio.gather(
fetch_data(1, 1),
fetch_data(2, 1.5),
fetch_data(3, 0.5)
)
print(f"Gather 结果: {results}")

# 模式二: wait (等待完成)
async def pattern_wait():
"""wait 模式"""
tasks = [
asyncio.create_task(fetch_data(i, i * 0.5))
for i in range(1, 4)
]

done, pending = await asyncio.wait(tasks)

print("Wait 完成:")
for task in done:
print(f" - {task.result()}")

# 模式三: as_completed (按完成顺序)
async def pattern_as_completed():
"""as_completed 模式"""
tasks = [
asyncio.create_task(fetch_data(i, i * 0.5))
for i in range(1, 4)
]

for coro in asyncio.as_completed(tasks):
result = await coro
print(f"AsCompleted: {result}")

# 模式四: race (竞速)
async def pattern_race():
"""race 模式 - 返回第一个完成的结果"""
tasks = [
asyncio.create_task(fetch_data(i, i * 0.5))
for i in range(1, 4)
]

done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)

# 取消其他任务
for task in pending:
task.cancel()

result = list(done)[0].result()
print(f"Race 获胜者: {result}")

async def main():
print("=== Gather ===")
await pattern_gather()

print("\n=== Wait ===")
await pattern_wait()

print("\n=== AsCompleted ===")
await pattern_as_completed()

print("\n=== Race ===")
await pattern_race()

asyncio.run(main())

超时控制

import asyncio

async def timeout_task(seconds):
"""可能会超时的任务"""
try:
await asyncio.sleep(seconds)
return f"完成 (耗时{seconds}秒)"
except asyncio.TimeoutError:
return "超时!"

async def main():
# 使用 wait_for 设置超时
print("=== wait_for 超时控制 ===")
try:
result = await asyncio.wait_for(
timeout_task(3),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("任务超时!")

# 使用 timeout (推荐)
print("\n=== async with timeout ===")
try:
async with asyncio.timeout(2.0):
result = await timeout_task(3)
print(result)
except TimeoutError:
print("任务超时!")

# 不等待超时
print("\n=== timeout 不等待 ===")
async with asyncio.timeout(2.0) as timeout_scope:
try:
result = await timeout_task(3)
print(result)
except TimeoutError:
print("超时,继续执行")
# 可以在这里处理超时后的清理工作

# 检查是否超时
if timeout_scope.expired():
print("确实超时了")

asyncio.run(main())

并发安全

线程安全

线程安全的数据结构

import threading
import queue

# queue.Queue 是线程安全的
safe_queue = queue.Queue()

def producer():
"""生产者"""
for i in range(5):
safe_queue.put(f"Item-{i}")
print(f"生产: Item-{i}")

def consumer():
"""消费者"""
while True:
try:
item = safe_queue.get(timeout=1)
print(f"消费: {item}")
safe_queue.task_done()
except queue.Empty:
break

# 使用线程安全队列
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

# collections.deque 也是线程安全的 (append, popleft)
from collections import deque

thread_safe_deque = deque()

线程局部变量

import threading

# 线程局部存储
local_data = threading.local()

def worker():
"""工作线程"""
# 每个线程有自己的副本
local_data.value = threading.current_thread().name
print(f"{threading.current_thread().name}: {local_data.value}")

threads = []
for i in range(3):
thread = threading.Thread(target=worker)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

# local_data 在不同线程中是独立的

进程安全

进程间同步

import multiprocessing

def worker(lock, value):
"""工作进程"""
with lock:
print(f"进程 {multiprocessing.current_process().pid} 获得锁")
value.value += 1

if __name__ == "__main__":
lock = multiprocessing.Lock()
value = multiprocessing.Value('i', 0)

processes = []
for i in range(5):
process = multiprocessing.Process(
target=worker,
args=(lock, value)
)
processes.append(process)
process.start()

for process in processes:
process.join()

print(f"最终值: {value.value}")

异步安全

协程同步原语

import asyncio

# asyncio.Event
async def event_example():
"""Event 示例"""
event = asyncio.Event()

async def waiter():
print("等待事件...")
await event.wait()
print("事件已触发!")

async def setter():
await asyncio.sleep(1)
print("设置事件")
event.set()

await asyncio.gather(waiter(), setter())

# asyncio.Lock
async def lock_example():
"""Lock 示例"""
lock = asyncio.Lock()

async def worker(worker_id):
async with lock:
print(f"Worker {worker_id} 获得锁")
await asyncio.sleep(1)
print(f"Worker {worker_id} 释放锁")

await asyncio.gather(
worker(1),
worker(2),
worker(3)
)

# asyncio.Semaphore
async def semaphore_example():
"""Semaphore 示例"""
semaphore = asyncio.Semaphore(2) # 最多2个协程

async def worker(worker_id):
async with semaphore:
print(f"Worker {worker_id} 获得信号量")
await asyncio.sleep(1)
print(f"Worker {worker_id} 释放信号量")

await asyncio.gather(*[
worker(i) for i in range(5)
])

async def main():
print("=== Event ===")
await event_example()

print("\n=== Lock ===")
await lock_example()

print("\n=== Semaphore ===")
await semaphore_example()

asyncio.run(main())

GIL 详解

什么是 GIL

"""
GIL (Global Interpreter Lock) 全局解释器锁

什么是 GIL:
- GIL 是 Python 解释器的全局锁
- 同一时刻只有一个线程能执行 Python 字节码
- 影响 CPython 实现,不影响 Jython, IronPython

为什么需要 GIL:
1. 简化内存管理 (引用计数)
2. 保护 C 扩展的内部状态
3. 避免复杂的并发问题

GIL 的影响:
- CPU 密集型: 多线程不能利用多核
- I/O 密集型: 影响较小,因为 I/O 时会释放 GIL

查看 GIL 状态:
import sys
print(sys.flags) # 可以查看 Python 配置
"""

import threading
import time

# GIL 对 CPU 密集型任务的影响
def cpu_task():
"""CPU 密集型任务"""
total = 0
for i in range(10000000):
total += i ** 2
return total

def test_single_thread():
"""单线程测试"""
start = time.time()
result = cpu_task()
elapsed = time.time() - start
return elapsed

def test_multi_thread():
"""多线程测试"""
start = time.time()
threads = []
for _ in range(2):
thread = threading.Thread(target=cpu_task)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

elapsed = time.time() - start
return elapsed

print(f"单线程耗时: {test_single_thread():.2f}秒")
print(f"多线程耗时: {test_multi_thread():.2f}秒")
# 多线程可能更慢,因为 GIL 和上下文切换开销

绕过 GIL

使用多进程

import multiprocessing
import time

def cpu_task():
"""CPU 密集型任务"""
total = 0
for i in range(10000000):
total += i ** 2
return total

def test_multi_process():
"""多进程测试"""
start = time.time()
processes = []
for _ in range(2):
process = multiprocessing.Process(target=cpu_task)
processes.append(process)
process.start()

for process in processes:
process.join()

elapsed = time.time() - start
return elapsed

print(f"多进程耗时: {test_multi_process():.2f}秒")
# 多进程可以真正并行,充分利用多核

使用 C 扩展

# 一些库会释放 GIL:
# - NumPy (数组操作)
# - Pandas (数据处理)
# - OpenCV (图像处理)

import numpy as np
import threading
import time

def numpy_task():
"""NumPy 操作会释放 GIL"""
# 大型矩阵运算
matrix = np.random.rand(1000, 1000)
result = np.dot(matrix, matrix)
return result

def test_numpy_threading():
"""测试 NumPy 多线程"""
start = time.time()
threads = []
for _ in range(4):
thread = threading.Thread(target=numpy_task)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

elapsed = time.time() - start
return elapsed

print(f"NumPy 多线程耗时: {test_numpy_threading():.2f}秒")
# NumPy 释放 GIL,多线程有效

GIL 最佳实践

"""
GIL 最佳实践:

1. CPU 密集型任务:
- 使用多进程 (multiprocessing)
- 使用 Joblib, Celery 等任务队列
- 使用 NumPy, Pandas 等释放 GIL 的库

2. I/O 密集型任务:
- 使用多线程 (threading)
- 使用异步编程 (asyncio)
- 使用线程池 (ThreadPoolExecutor)

3. 混合任务:
- 分离 CPU 和 I/O 任务
- 使用进程池处理 CPU 密集型
- 使用线程池或协程处理 I/O 密集型

4. 避免频繁的线程切换:
- 减少锁的粒度
- 使用线程局部变量
- 使用队列等线程安全的数据结构
"""

import asyncio
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_intensive(n):
"""CPU 密集型"""
total = 0
for i in range(n):
total += i ** 2
return total

async def io_intensive(name):
"""I/O 密集型"""
await asyncio.sleep(1)
return f"{name} 完成"

# 混合使用
async def hybrid_approach():
"""混合方法"""
# CPU 任务使用进程池
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as executor:
cpu_result = await loop.run_in_executor(
executor,
cpu_intensive,
100000
)
print(f"CPU 任务结果: {cpu_result}")

# I/O 任务使用协程
io_results = await asyncio.gather(
io_intensive("Task-1"),
io_intensive("Task-2"),
io_intensive("Task-3")
)
print(f"I/O 任务结果: {io_results}")

# 运行
asyncio.run(hybrid_approach())

小结

本章节介绍了 Python 的并发编程:

并发基础

  • 并发 vs 并行: 概念区别和适用场景
  • 任务类型: CPU 密集型 vs I/O 密集型
  • 并发方式: 多线程、多进程、异步编程的特点和选择

多线程

  • threading 模块: 创建线程、线程属性、线程间共享数据
  • Thread 类: 守护线程、线程名称和标识
  • 线程同步: Lock, RLock, Semaphore, Event, Condition, Barrier
  • 锁机制: 死锁及其避免、锁的性能影响
  • 线程池: ThreadPoolExecutor, as_completed, wait

多进程

  • multiprocessing 模块: 创建进程、进程属性、数据隔离
  • Process 类: 守护进程、进程间内存隔离
  • 进程间通信: Queue, Pipe, Manager, 共享内存
  • 进程池: Pool, imap, istarmap, 最佳实践

异步编程

  • asyncio 基础: 事件循环、协程、任务
  • async/await: 基本语法、awaitable 对象
  • 事件循环: 事件循环方法、任务调度
  • 协程: 协程进阶、并发模式、超时控制

并发安全

  • 线程安全: 线程安全的数据结构、线程局部变量
  • 进程安全: 进程间同步
  • 异步安全: 协程同步原语

GIL 详解

  • 什么是 GIL: 全局解释器锁的概念和影响
  • 绕过 GIL: 多进程、C 扩展
  • 最佳实践: 根据任务类型选择合适的并发方式

掌握 Python 并发编程可以显著提高程序性能和响应能力,但需要根据具体场景选择合适的并发方式,并注意并发安全问题。