并发编程
并发编程是 Python 中重要的主题,它允许程序同时执行多个任务,提高程序的性能和响应能力。本章将介绍多线程、多进程和异步编程三种并发方式。
并发基础
并发 vs 并行
# 并发 (Concurrency): 同时处理多个任务的能力
# - 任务在同一时间段内交替执行
# - 适合 I/O 密集型任务
# - 单核 CPU 也能实现并发
# 并行 (Parallelism): 同时执行多个任务的能力
# - 任务在同一时刻真正同时执行
# - 适合 CPU 密集型任务
# - 需要多核 CPU
# 图示说明:
"""
并发:
任务1: ====== ======
任务2: ====== ======
时间 →
并行:
任务1: =================
任务2: =================
时间 →
"""
CPU 密集型 vs I/O 密集型
# CPU 密集型任务
def cpu_bound_task(n):
"""CPU 密集型任务 - 大量计算"""
total = 0
for i in range(n):
total += i ** 2
return total
# I/O 密集型任务
import time
def io_bound_task():
"""I/O 密集型任务 - 大量等待"""
time.sleep(1) # 模拟 I/O 等待
return "完成"
# 选择并发方式:
# - CPU 密集型: 使用多进程 (绕过 GIL)
# - I/O 密集型: 使用多线程或异步编程 (节省资源)
Python 并发方式对比
"""
并发方式对比:
1. 多线程 (threading):
- 优点: 轻量级,切换快,共享内存
- 缺点: 受 GIL 限制,不适合 CPU 密集型
- 适用: I/O 密集型任务
2. 多进程 (multiprocessing):
- 优点: 绕过 GIL,真正并行,充分利用多核
- 缺点: 重量级,进程间通信复杂,资源消耗大
- 适用: CPU 密集型任务
3. 异步编程 (asyncio):
- 优点: 高效 I/O,资源占用少,单线程并发
- 缺点: 代码复杂,不适合 CPU 密集型
- 适用: 高并发 I/O 密集型任务
"""
多线程
threading 模块
创建线程
import threading
import time
def worker(name, delay):
"""工作线程函数"""
print(f"线程 {name} 开始")
time.sleep(delay)
print(f"线程 {name} 完成")
# 方法一: 创建 Thread 对象
thread1 = threading.Thread(target=worker, args=("A", 2))
thread2 = threading.Thread(target=worker, args=("B", 1))
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print("主线程完成")
# 方法二: 继承 Thread 类
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
"""线程执行的方法"""
print(f"线程 {self.name} 开始")
time.sleep(self.delay)
print(f"线程 {self.name} 完成")
# 使用自定义线程
thread3 = MyThread("C", 1.5)
thread3.start()
thread3.join()
线程属性和方法
import threading
import time
def show_thread_info():
"""显示线程信息"""
print(f"线程名称: {threading.current_thread().name}")
print(f"线程 ID: {threading.get_ident()}")
print(f"是否存活: {threading.current_thread().is_alive()}")
print(f"是否为主线程: {threading.current_thread() is threading.main_thread()}")
# 主线程信息
show_thread_info()
print("-" * 30)
# 子线程信息
def worker():
show_thread_info()
thread = threading.Thread(target=worker, name="WorkerThread")
thread.start()
thread.join()
# 常用属性和方法:
print(f"当前活动线程数: {threading.active_count()}") # 活动线程数
print(f"主线程对象: {threading.main_thread()}") # 主线程对象
print(f"当前线程对象: {threading.current_thread()}") # 当前线程
线程间共享数据
import threading
# 全局变量在线程间共享
counter = 0
def increment():
"""增加计数器"""
global counter
for _ in range(100000):
counter += 1
# 创建多个线程
threads = []
for i in range(5):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print(f"计数器最终值: {counter}")
# 注意: 结果可能不是 500000,因为没有使用锁保护
Thread 类详解
守护线程
import threading
import time
def daemon_worker():
"""守护线程"""
print("守护线程开始")
time.sleep(2)
print("守护线程结束")
def normal_worker():
"""普通线程"""
print("普通线程开始")
time.sleep(1)
print("普通线程结束")
# 守护线程: 会在主线程结束时自动退出
daemon_thread = threading.Thread(target=daemon_worker, daemon=True)
# 或者
# daemon_thread = threading.Thread(target=daemon_worker)
# daemon_thread.daemon = True
# 普通线程: 主线程会等待其完成
normal_thread = threading.Thread(target=normal_worker)
daemon_thread.start()
normal_thread.start()
# 主线程不会等待守护线程
# normal_thread.join() # 只等待普通线程
print("主线程结束")
线程名称和标识
import threading
def worker():
print(f"线程名称: {threading.current_thread().name}")
print(f"线程标识符: {threading.current_thread().ident}")
# 为线程设置名称
thread1 = threading.Thread(target=worker, name="Worker-1")
thread2 = threading.Thread(target=worker, name="Worker-2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# 未命名的线程有默认名称
thread3 = threading.Thread(target=worker)
print(f"默认名称: {thread3.name}")
thread3.start()
thread3.join()
线程同步
锁 (Lock)
import threading
# 创建锁
lock = threading.Lock()
counter = 0
def safe_increment():
"""线程安全的计数器"""
global counter
# 获取锁
lock.acquire()
try:
# 临界区代码
current = counter
counter = current + 1
finally:
# 释放锁
lock.release()
# 使用 with 语句自动获取和释放锁
def safe_increment_with():
"""线程安全的计数器 (使用 with)"""
global counter
with lock:
current = counter
counter = current + 1
# 创建多个线程
threads = []
for i in range(100):
thread = threading.Thread(target=safe_increment_with)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"计数器最终值: {counter}") # 100
可重入锁 (RLock)
import threading
# 可重入锁: 同一个线程可以多次获取同一个锁
rlock = threading.RLock()
def recursive_function(n):
"""递归函数使用可重入锁"""
with rlock:
print(f"递归层级: {n}")
if n > 0:
recursive_function(n - 1)
recursive_function(3)
# 输出:
# 递归层级: 3
# 递归层级: 2
# 递归层级: 1
# 递归层级: 0
# 普通锁会导致死锁
"""
lock = threading.Lock()
def deadlock_example():
with lock:
print("第一次获取锁")
with lock: # 死锁!
print("第二次获取锁")
# deadlock_example() # 永远阻塞
"""
信号量 (Semaphore)
import threading
import time
# 信号量: 限制同时访问资源的线程数量
semaphore = threading.Semaphore(3) # 最多3个线程同时访问
def worker(worker_id):
"""工作线程"""
print(f"Worker {worker_id} 尝试获取资源")
with semaphore:
print(f"Worker {worker_id} 获取资源")
time.sleep(1)
print(f"Worker {worker_id} 释放资源")
# 创建10个线程,但同时只有3个能访问资源
threads = []
for i in range(10):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
事件 (Event)
import threading
import time
# 事件: 线程间简单通信机制
event = threading.Event()
def waiter():
"""等待事件的线程"""
print("等待事件...")
event.wait() # 阻塞直到事件被设置
print("事件已触发,继续执行")
def setter():
"""设置事件的线程"""
time.sleep(2)
print("设置事件")
event.set() # 触发事件
# 创建线程
thread1 = threading.Thread(target=waiter)
thread2 = threading.Thread(target=setter)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# 事件的其他方法
print(f"事件是否设置: {event.is_set()}")
event.clear() # 清除事件
print(f"清除后: {event.is_set()}")
# 带超时的等待
event.wait(timeout=1) # 等待1秒,超时后自动返回
条件变量 (Condition)
import threading
import time
import random
# 条件变量: 用于复杂的线程同步
condition = threading.Condition()
queue = []
def producer():
"""生产者"""
for i in range(5):
time.sleep(random.random())
with condition:
item = f"Item-{i}"
queue.append(item)
print(f"生产: {item}")
condition.notify() # 通知一个等待的消费者
def consumer():
"""消费者"""
while True:
with condition:
if not queue:
print("队列为空,等待...")
condition.wait() # 等待通知
item = queue.pop(0)
print(f"消费: {item}")
if item == "Item-4":
break
# 创建生产者和消费者
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
屏障 (Barrier)
import threading
import time
# 屏障: 等待固定数量的线程都到达某个点后再继续
barrier = threading.Barrier(3) # 等待3个线程
def worker(worker_id):
"""工作线程"""
print(f"Worker {worker_id} 开始工作")
time.sleep(worker_id)
print(f"Worker {worker_id} 到达屏障点")
barrier.wait() # 等待其他线程
print(f"Worker {worker_id} 通过屏障,继续执行")
# 创建3个线程
threads = []
for i in range(3):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print("所有线程都完成了")
锁机制
死锁及其避免
import threading
# 死锁示例
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1_worker():
"""线程1"""
with lock1:
print("线程1: 获取锁1")
import time
time.sleep(0.1)
print("线程1: 尝试获取锁2")
with lock2:
print("线程1: 获取两个锁")
def thread2_worker():
"""线程2"""
with lock2:
print("线程2: 获取锁2")
print("线程2: 尝试获取锁1")
with lock1:
print("线程2: 获取两个锁")
# 这种情况会导致死锁
# thread1 = threading.Thread(target=thread1_worker)
# thread2 = threading.Thread(target=thread2_worker)
# thread1.start()
# thread2.start()
# 避免死锁的方法:
# 方法一: 按固定顺序获取锁
def safe_thread1():
with lock1:
with lock2: # 始终先获取 lock1,再获取 lock2
print("安全线程1")
def safe_thread2():
with lock1: # 也先获取 lock1
with lock2:
print("安全线程2")
# 方法二: 使用超时
def safe_thread_with_timeout():
acquired = False
try:
acquired = lock1.acquire(timeout=1)
if acquired:
acquired = lock2.acquire(timeout=1)
if acquired:
print("获取两个锁成功")
finally:
if acquired:
lock2.release()
lock1.release()
锁的性能影响
import threading
import time
# 锁会降低性能,但保证安全性
lock = threading.Lock()
counter = 0
def increment_with_lock():
global counter
with lock:
counter += 1
def increment_without_lock():
global counter
counter += 1 # 不安全,但更快
# 性能测试
def test_with_lock():
start = time.time()
threads = []
for _ in range(1000):
thread = threading.Thread(target=increment_with_lock)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return time.time() - start
def test_without_lock():
start = time.time()
threads = []
for _ in range(1000):
thread = threading.Thread(target=increment_without_lock)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return time.time() - start
print(f"使用锁耗时: {test_with_lock():.4f}秒")
print(f"不使用锁耗时: {test_without_lock():.4f}秒")
# 注意: 不使用锁的结果可能不正确,但速度更快
线程池
ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import time
import threading
def task(name, delay):
"""任务函数"""
print(f"{name} 开始 (线程: {threading.current_thread().name})")
time.sleep(delay)
return f"{name} 完成"
# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
future1 = executor.submit(task, "Task-1", 1)
future2 = executor.submit(task, "Task-2", 2)
future3 = executor.submit(task, "Task-3", 1.5)
# 获取结果
print(future1.result()) # 阻塞直到任务完成
print(future2.result())
print(future3.result())
print("\n使用 map 批量提交任务:")
# 使用 map 批量提交
with ThreadPoolExecutor(max_workers=3) as executor:
tasks = ["A", "B", "C", "D", "E"]
delays = [1, 2, 1.5, 1, 2]
# map 会按顺序返回结果
results = executor.map(task, tasks, delays)
for result in results:
print(result)
as_completed 和 wait
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
def task(name, delay):
time.sleep(delay)
return f"{name} ({delay}秒)"
# 使用 as_completed 按完成顺序获取结果
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(task, name, delay): name
for name, delay in [("A", 2), ("B", 1), ("C", 1.5)]
}
for future in as_completed(futures):
name = futures[future]
try:
result = future.result()
print(f"完成: {result}")
except Exception as e:
print(f"{name} 抛出异常: {e}")
print("\n使用 wait 等待所有任务:")
# 使用 wait 等待所有任务完成
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(task, f"Task-{i}", i)
for i in range(3, 0, -1)
]
# 等待所有任务完成
completed, not_completed = wait(futures)
print(f"已完成: {len(completed)}")
print(f"未完成: {len(not_completed)}")
线程池最佳实践
from concurrent.futures import ThreadPoolExecutor
import time
def cpu_task(n):
"""CPU 密集型任务"""
total = 0
for i in range(n):
total += i ** 2
return total
def io_task(seconds):
"""I/O 密集型任务"""
time.sleep(seconds)
return f"等待了 {seconds} 秒"
# 确定合适的线程数量
# 对于 I/O 密集型任务: 线程数可以大于 CPU 核心数
# 对于 CPU 密集型任务: 线程数通常等于 CPU 核心数
import os
cpu_count = os.cpu_count()
print(f"CPU 核心数: {cpu_count}")
# I/O 密集型: 可以使用更多线程
print("I/O 密集型任务:")
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(io_task, [1, 1, 1, 1, 1]))
for result in results:
print(result)
# CPU 密集型: 使用核心数的线程
print("\nCPU 密集型任务:")
with ThreadPoolExecutor(max_workers=cpu_count) as executor:
results = list(executor.map(cpu_task, [10000] * 5))
print(f"结果: {results[:2]}...")
多进程
multiprocessing 模块
创建进程
import multiprocessing
import time
def worker(name, delay):
"""工作进程函数"""
print(f"进程 {name} 开始 (PID: {multiprocessing.current_process().pid})")
time.sleep(delay)
print(f"进程 {name} 完成")
if __name__ == "__main__":
# Windows 下必须使用 if __name__ == "__main__" 保护
# 方法一: 创建 Process 对象
process1 = multiprocessing.Process(target=worker, args=("A", 2))
process2 = multiprocessing.Process(target=worker, args=("B", 1))
# 启动进程
process1.start()
process2.start()
# 等待进程完成
process1.join()
process2.join()
print("主进程完成")
# 方法二: 继承 Process 类
class MyProcess(multiprocessing.Process):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
"""进程执行的方法"""
print(f"进程 {self.name} 开始")
time.sleep(self.delay)
print(f"进程 {self.name} 完成")
process3 = MyProcess("C", 1.5)
process3.start()
process3.join()
进程属性和方法
import multiprocessing
def show_process_info():
"""显示进程信息"""
current_process = multiprocessing.current_process()
print(f"进程名称: {current_process.name}")
print(f"进程 PID: {current_process.pid}")
print(f"进程是否存活: {current_process.is_alive()}")
if __name__ == "__main__":
# 主进程信息
print("=== 主进程 ===")
show_process_info()
print(f"当前进程 PID: {multiprocessing.current_process().pid}")
print(f"父进程 PID: {multiprocessing.parent_process().pid}")
print("\n=== 子进程 ===")
def child_worker():
show_process_info()
child_process = multiprocessing.Process(target=child_worker)
child_process.start()
child_process.join()
# 获取 CPU 核心数
print(f"\nCPU 核心数: {multiprocessing.cpu_count()}")
Process 类详解
守护进程
import multiprocessing
import time
def daemon_worker():
"""守护进程"""
print("守护进程开始")
time.sleep(2)
print("守护进程结束")
def normal_worker():
"""普通进程"""
print("普通进程开始")
time.sleep(1)
print("普通进程结束")
if __name__ == "__main__":
# 守护进程: 会在主进程结束时自动退出
daemon_process = multiprocessing.Process(target=daemon_worker, daemon=True)
# 普通进程: 主进程会等待其完成
normal_process = multiprocessing.Process(target=normal_worker)
daemon_process.start()
normal_process.start()
# 主进程不会等待守护进程
normal_process.join() # 只等待普通进程
print("主进程结束")
进程间数据隔离
import multiprocessing
# 进程间内存不共享
counter = 0
def increment():
"""尝试增加全局变量"""
global counter
for _ in range(100000):
counter += 1
print(f"子进程中的 counter: {counter}")
if __name__ == "__main__":
counter = 0
process = multiprocessing.Process(target=increment)
process.start()
process.join()
print(f"主进程中的 counter: {counter}")
# 主进程和子进程的 counter 是不同的变量
进程间通信
Queue (队列)
import multiprocessing
def producer(queue):
"""生产者"""
for i in range(5):
item = f"Item-{i}"
queue.put(item)
print(f"生产: {item}")
queue.put(None) # 发送结束信号
def consumer(queue):
"""消费者"""
while True:
item = queue.get()
if item is None: # 收到结束信号
break
print(f"消费: {item}")
queue.task_done()
if __name__ == "__main__":
# 创建队列
queue = multiprocessing.Queue()
# 创建生产者和消费者
producer_process = multiprocessing.Process(target=producer, args=(queue,))
consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
producer_process.start()
consumer_process.start()
producer_process.join()
consumer_process.join()
print("生产消费完成")
Pipe (管道)
import multiprocessing
def sender(conn):
"""发送者"""
messages = ["Hello", "World", "EOF"]
for msg in messages:
conn.send(msg)
print(f"发送: {msg}")
conn.close()
def receiver(conn):
"""接收者"""
while True:
try:
msg = conn.recv()
if msg == "EOF":
break
print(f"接收: {msg}")
except EOFError:
break
conn.close()
if __name__ == "__main__":
# 创建管道 (双向)
parent_conn, child_conn = multiprocessing.Pipe()
# 创建进程
sender_process = multiprocessing.Process(target=sender, args=(child_conn,))
receiver_process = multiprocessing.Process(target=receiver, args=(parent_conn,))
sender_process.start()
receiver_process.start()
sender_process.join()
receiver_process.join()
print("通信完成")
Manager (共享对象)
import multiprocessing
def worker(shared_list, shared_dict, shared_value):
"""工作进程"""
shared_list.append(f"Item-{multiprocessing.current_process().pid}")
shared_dict[f"PID-{multiprocessing.current_process().pid}"] = "完成"
shared_value.value += 1
if __name__ == "__main__":
# 使用 Manager 创建共享对象
with multiprocessing.Manager() as manager:
shared_list = manager.list()
shared_dict = manager.dict()
shared_value = manager.Value('i', 0)
# 创建多个进程
processes = []
for i in range(5):
process = multiprocessing.Process(
target=worker,
args=(shared_list, shared_dict, shared_value)
)
processes.append(process)
process.start()
for process in processes:
process.join()
print(f"共享列表: {list(shared_list)}")
print(f"共享字典: {dict(shared_dict)}")
print(f"共享值: {shared_value.value}")
共享内存 (Value 和 Array)
import multiprocessing
def increment_counter(counter):
"""增加计数器"""
with counter.get_lock(): # 获取锁
counter.value += 1
def add_to_array(arr, index, value):
"""向数组添加值"""
with arr.get_lock():
arr[index] = value
if __name__ == "__main__":
# 创建共享内存
counter = multiprocessing.Value('i', 0) # 'i' 表示整数
array = multiprocessing.Array('i', [0] * 5) # 整数数组
# 创建多个进程
processes = []
for i in range(100):
process = multiprocessing.Process(target=increment_counter, args=(counter,))
processes.append(process)
process.start()
for process in processes:
process.join()
print(f"计数器最终值: {counter.value}")
# 使用数组
for i in range(5):
process = multiprocessing.Process(
target=add_to_array,
args=(array, i, i * 10)
)
process.start()
process.join()
print(f"数组内容: {array[:]}")
进程池
Pool 基本使用
import multiprocessing
import time
def task(name, delay):
"""任务函数"""
print(f"{name} 开始 (PID: {multiprocessing.current_process().pid})")
time.sleep(delay)
return f"{name} 完成"
if __name__ == "__main__":
# 创建进程池
with multiprocessing.Pool(processes=3) as pool:
# 方法一: apply (同步执行)
result1 = pool.apply(task, args=("Apply-1", 1))
print(f"Apply 结果: {result1}")
print("\n方法二: apply_async (异步执行)")
# 方法二: apply_async (异步执行)
result2 = pool.apply_async(task, args=("Async-1", 1))
result3 = pool.apply_async(task, args=("Async-2", 2))
print(f"Async 结果1: {result2.get()}")
print(f"Async 结果2: {result3.get()}")
print("\n方法三: map (批量同步)")
# 方法三: map (批量同步)
results = pool.map(task, ["Map-1", "Map-2", "Map-3"], [1, 1.5, 1])
print(f"Map 结果: {results}")
print("\n方法四: map_async (批量异步)")
# 方法四: map_async (批量异步)
async_result = pool.map_async(
task,
["AMap-1", "AMap-2", "AMap-3"],
[1, 1.5, 1]
)
print(f"Async Map 结果: {async_result.get()}")
imap 和 istarmap
import multiprocessing
import time
def heavy_task(x):
"""耗时任务"""
time.sleep(0.5)
return x ** 2
if __name__ == "__main__":
numbers = range(10)
# imap: 迭代器版本,按顺序返回结果
print("使用 imap:")
with multiprocessing.Pool(processes=3) as pool:
for result in pool.imap(heavy_task, numbers):
print(f"结果: {result}")
# imap_unordered: 不保证顺序,但更快
print("\n使用 imap_unordered:")
with multiprocessing.Pool(processes=3) as pool:
for result in pool.imap_unordered(heavy_task, numbers):
print(f"结果: {result}")
# istarmap: 处理多个参数
print("\n使用 istarmap:")
def multi_task(x, y):
time.sleep(0.3)
return x + y
data = [(1, 2), (3, 4), (5, 6), (7, 8)]
with multiprocessing.Pool(processes=2) as pool:
results = pool.istarmap(multi_task, data)
for result in results:
print(f"结果: {result}")
进程池最佳实践
import multiprocessing
import time
def cpu_task(n):
"""CPU 密集型任务"""
total = 0
for i in range(n):
total += i ** 2
return total
if __name__ == "__main__":
# CPU 密集型任务: 进程数等于 CPU 核心数
cpu_count = multiprocessing.cpu_count()
print(f"CPU 核心数: {cpu_count}")
# 测试不同进程数的效果
for num_processes in [1, 2, cpu_count, cpu_count * 2]:
start_time = time.time()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(cpu_task, [100000] * 8)
elapsed = time.time() - start_time
print(f"{num_processes} 个进程耗时: {elapsed:.2f}秒")
# 最佳实践:
# 1. CPU 密集型: 进程数 = CPU 核心数
# 2. 进程创建开销大,使用进程池复用
# 3. 任务数据量要适中,避免通信开销过大
异步编程
asyncio 基础
事件循环
import asyncio
import time
# 基础事件循环
async def main():
"""主协程"""
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行协程
if __name__ == "__main__":
# Python 3.7+
asyncio.run(main())
# 旧版本方式
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
# loop.close()
# 获取当前事件循环
async def get_current_loop():
"""获取当前事件循环"""
loop = asyncio.get_running_loop()
print(f"当前事件循环: {loop}")
协程 (Coroutine)
import asyncio
async def simple_coroutine():
"""简单协程"""
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
return "返回值"
async def main():
# 创建协程对象
coro = simple_coroutine()
print(f"协程对象: {coro}")
# 运行协程
result = await coro
print(f"协程结果: {result}")
# 运行
asyncio.run(main())
# 协程的特点:
# 1. 使用 async def 定义
# 2. 使用 await 调用其他协程
# 3. 必须在事件循环中运行
任务 (Task)
import asyncio
async def task(name, delay):
"""任务协程"""
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name} 结果"
async def main():
# 创建任务
task1 = asyncio.create_task(task("Task-1", 2))
task2 = asyncio.create_task(task("Task-2", 1))
task3 = asyncio.create_task(task("Task-3", 1.5))
print("任务已创建,开始等待...")
# 等待任务完成
result1 = await task1
result2 = await task2
result3 = await task3
print(f"任务1: {result1}")
print(f"任务2: {result2}")
print(f"任务3: {result3}")
asyncio.run(main())
# Task vs Coroutine:
# - Task 是已安排运行的协程
# - Task 会被立即调度执行
# - 多个 Task 可以并发执行
async/await
基本语法
import asyncio
import time
async def say_hello(name, delay):
"""异步函数"""
print(f"{name} 开始等待")
await asyncio.sleep(delay) # 异步等待
print(f"{name} 说 Hello!")
return f"{name} 完成"
async def main():
# 顺序执行 (总耗时 = sum)
print("=== 顺序执行 ===")
start = time.time()
result1 = await say_hello("Alice", 1)
result2 = await say_hello("Bob", 1)
result3 = await say_hello("Charlie", 1)
print(f"总耗时: {time.time() - start:.2f}秒")
# 并发执行 (总耗时 = max)
print("\n=== 并发执行 ===")
start = time.time()
results = await asyncio.gather(
say_hello("Alice", 1),
say_hello("Bob", 1),
say_hello("Charlie", 1)
)
print(f"总耗时: {time.time() - start:.2f}秒")
print(f"结果: {results}")
asyncio.run(main())
awaitable 对象
import asyncio
# await 可以用于三种对象:
# 1. 协程 (Coroutine)
# 2. 任务 (Task)
# 3. Future
async def coroutine_example():
"""协程示例"""
await asyncio.sleep(1)
return "协程结果"
async def task_example():
"""任务示例"""
task = asyncio.create_task(coroutine_example())
result = await task # 等待任务
return result
async def future_example():
"""Future 示例"""
loop = asyncio.get_running_loop()
future = loop.create_future()
# 设置 Future 结果
async def set_future():
await asyncio.sleep(1)
future.set_result("Future 结果")
asyncio.create_task(set_future())
result = await future # 等待 Future
return result
async def main():
print("1. 协程:")
result1 = await coroutine_example()
print(result1)
print("\n2. 任务:")
result2 = await task_example()
print(result2)
print("\n3. Future:")
result3 = await future_example()
print(result3)
asyncio.run(main())
事件循环
事件循环方法
import asyncio
async def callback_example():
"""回调示例"""
loop = asyncio.get_running_loop()
# call_soon: 尽快调用回调
def callback():
print("回调函数执行")
loop.call_soon(callback)
# call_later: 延迟调用
def delayed_callback():
print("延迟回调执行")
loop.call_later(1, delayed_callback)
# call_at: 指定时间调用
current_time = loop.time()
def scheduled_callback():
print("定时回调执行")
loop.call_at(current_time + 2, scheduled_callback)
await asyncio.sleep(2.5)
asyncio.run(callback_example())
# 事件生命周期:
# 1. 创建事件循环
# 2. 运行协程/任务
# 3. 处理事件
# 4. 关闭事件循环
任务调度
import asyncio
async def delayed_task(name, delay):
"""延迟任务"""
await asyncio.sleep(delay)
print(f"{name} 执行")
async def main():
# 创建多个任务
task1 = asyncio.create_task(delayed_task("Task-1", 1))
task2 = asyncio.create_task(delayed_task("Task-2", 2))
task3 = asyncio.create_task(delayed_task("Task-3", 1.5))
# 等待所有任务完成
await asyncio.gather(task1, task2, task3)
print("所有任务完成")
# 使用 wait 控制任务
print("\n使用 wait:")
tasks = [
asyncio.create_task(delayed_task(f"Wait-{i}", i))
for i in range(3, 0, -1)
]
# 等待所有任务完成
done, pending = await asyncio.wait(tasks)
print(f"完成: {len(done)}, 待定: {len(pending)}")
asyncio.run(main())
协程
协程进阶
import asyncio
async def producer(queue, n):
"""生产者协程"""
for i in range(n):
item = f"Item-{i}"
await asyncio.sleep(0.5)
await queue.put(item)
print(f"生产: {item}")
await queue.put(None) # 结束信号
async def consumer(queue, name):
"""消费者协程"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(1)
print(f"{name} 消费: {item}")
queue.task_done()
async def main():
# 创建队列
queue = asyncio.Queue()
# 创建生产者和消费者
producer_task = asyncio.create_task(producer(queue, 5))
consumer_tasks = [
asyncio.create_task(consumer(queue, f"Consumer-{i}"))
for i in range(2)
]
# 等待完成
await producer_task
await queue.join()
# 取消消费者
for task in consumer_tasks:
task.cancel()
# 等待取消完成
await asyncio.gather(*consumer_tasks, return_exceptions=True)
print("生产消费完成")
asyncio.run(main())
协程并发模式
import asyncio
async def fetch_data(id, delay):
"""模拟获取数据"""
await asyncio.sleep(delay)
return f"Data-{id}"
# 模式一: gather (收集结果)
async def pattern_gather():
"""gather 模式"""
results = await asyncio.gather(
fetch_data(1, 1),
fetch_data(2, 1.5),
fetch_data(3, 0.5)
)
print(f"Gather 结果: {results}")
# 模式二: wait (等待完成)
async def pattern_wait():
"""wait 模式"""
tasks = [
asyncio.create_task(fetch_data(i, i * 0.5))
for i in range(1, 4)
]
done, pending = await asyncio.wait(tasks)
print("Wait 完成:")
for task in done:
print(f" - {task.result()}")
# 模式三: as_completed (按完成顺序)
async def pattern_as_completed():
"""as_completed 模式"""
tasks = [
asyncio.create_task(fetch_data(i, i * 0.5))
for i in range(1, 4)
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"AsCompleted: {result}")
# 模式四: race (竞速)
async def pattern_race():
"""race 模式 - 返回第一个完成的结果"""
tasks = [
asyncio.create_task(fetch_data(i, i * 0.5))
for i in range(1, 4)
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# 取消其他任务
for task in pending:
task.cancel()
result = list(done)[0].result()
print(f"Race 获胜者: {result}")
async def main():
print("=== Gather ===")
await pattern_gather()
print("\n=== Wait ===")
await pattern_wait()
print("\n=== AsCompleted ===")
await pattern_as_completed()
print("\n=== Race ===")
await pattern_race()
asyncio.run(main())
超时控制
import asyncio
async def timeout_task(seconds):
"""可能会超时的任务"""
try:
await asyncio.sleep(seconds)
return f"完成 (耗时{seconds}秒)"
except asyncio.TimeoutError:
return "超时!"
async def main():
# 使用 wait_for 设置超时
print("=== wait_for 超时控制 ===")
try:
result = await asyncio.wait_for(
timeout_task(3),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("任务超时!")
# 使用 timeout (推荐)
print("\n=== async with timeout ===")
try:
async with asyncio.timeout(2.0):
result = await timeout_task(3)
print(result)
except TimeoutError:
print("任务超时!")
# 不等待超时
print("\n=== timeout 不等待 ===")
async with asyncio.timeout(2.0) as timeout_scope:
try:
result = await timeout_task(3)
print(result)
except TimeoutError:
print("超时,继续执行")
# 可以在这里处理超时后的清理工作
# 检查是否超时
if timeout_scope.expired():
print("确实超时了")
asyncio.run(main())
并发安全
线程安全
线程安全的数据结构
import threading
import queue
# queue.Queue 是线程安全的
safe_queue = queue.Queue()
def producer():
"""生产者"""
for i in range(5):
safe_queue.put(f"Item-{i}")
print(f"生产: Item-{i}")
def consumer():
"""消费者"""
while True:
try:
item = safe_queue.get(timeout=1)
print(f"消费: {item}")
safe_queue.task_done()
except queue.Empty:
break
# 使用线程安全队列
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
# collections.deque 也是线程安全的 (append, popleft)
from collections import deque
thread_safe_deque = deque()
线程局部变量
import threading
# 线程局部存储
local_data = threading.local()
def worker():
"""工作线程"""
# 每个线程有自己的副本
local_data.value = threading.current_thread().name
print(f"{threading.current_thread().name}: {local_data.value}")
threads = []
for i in range(3):
thread = threading.Thread(target=worker)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# local_data 在不同线程中是独立的
进程安全
进程间同步
import multiprocessing
def worker(lock, value):
"""工作进程"""
with lock:
print(f"进程 {multiprocessing.current_process().pid} 获得锁")
value.value += 1
if __name__ == "__main__":
lock = multiprocessing.Lock()
value = multiprocessing.Value('i', 0)
processes = []
for i in range(5):
process = multiprocessing.Process(
target=worker,
args=(lock, value)
)
processes.append(process)
process.start()
for process in processes:
process.join()
print(f"最终值: {value.value}")
异步安全
协程同步原语
import asyncio
# asyncio.Event
async def event_example():
"""Event 示例"""
event = asyncio.Event()
async def waiter():
print("等待事件...")
await event.wait()
print("事件已触发!")
async def setter():
await asyncio.sleep(1)
print("设置事件")
event.set()
await asyncio.gather(waiter(), setter())
# asyncio.Lock
async def lock_example():
"""Lock 示例"""
lock = asyncio.Lock()
async def worker(worker_id):
async with lock:
print(f"Worker {worker_id} 获得锁")
await asyncio.sleep(1)
print(f"Worker {worker_id} 释放锁")
await asyncio.gather(
worker(1),
worker(2),
worker(3)
)
# asyncio.Semaphore
async def semaphore_example():
"""Semaphore 示例"""
semaphore = asyncio.Semaphore(2) # 最多2个协程
async def worker(worker_id):
async with semaphore:
print(f"Worker {worker_id} 获得信号量")
await asyncio.sleep(1)
print(f"Worker {worker_id} 释放信号量")
await asyncio.gather(*[
worker(i) for i in range(5)
])
async def main():
print("=== Event ===")
await event_example()
print("\n=== Lock ===")
await lock_example()
print("\n=== Semaphore ===")
await semaphore_example()
asyncio.run(main())
GIL 详解
什么是 GIL
"""
GIL (Global Interpreter Lock) 全局解释器锁
什么是 GIL:
- GIL 是 Python 解释器的全局锁
- 同一时刻只有一个线程能执行 Python 字节码
- 影响 CPython 实现,不影响 Jython, IronPython
为什么需要 GIL:
1. 简化内存管理 (引用计数)
2. 保护 C 扩展的内部状态
3. 避免复杂的并发问题
GIL 的影响:
- CPU 密集型: 多线程不能利用多核
- I/O 密集型: 影响较小,因为 I/O 时会释放 GIL
查看 GIL 状态:
import sys
print(sys.flags) # 可以查看 Python 配置
"""
import threading
import time
# GIL 对 CPU 密集型任务的影响
def cpu_task():
"""CPU 密集型任务"""
total = 0
for i in range(10000000):
total += i ** 2
return total
def test_single_thread():
"""单线程测试"""
start = time.time()
result = cpu_task()
elapsed = time.time() - start
return elapsed
def test_multi_thread():
"""多线程测试"""
start = time.time()
threads = []
for _ in range(2):
thread = threading.Thread(target=cpu_task)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
elapsed = time.time() - start
return elapsed
print(f"单线程耗时: {test_single_thread():.2f}秒")
print(f"多线程耗时: {test_multi_thread():.2f}秒")
# 多线程可能更慢,因为 GIL 和上下文切换开销
绕过 GIL
使用多进程
import multiprocessing
import time
def cpu_task():
"""CPU 密集型任务"""
total = 0
for i in range(10000000):
total += i ** 2
return total
def test_multi_process():
"""多进程测试"""
start = time.time()
processes = []
for _ in range(2):
process = multiprocessing.Process(target=cpu_task)
processes.append(process)
process.start()
for process in processes:
process.join()
elapsed = time.time() - start
return elapsed
print(f"多进程耗时: {test_multi_process():.2f}秒")
# 多进程可以真正并行,充分利用多核
使用 C 扩展
# 一些库会释放 GIL:
# - NumPy (数组操作)
# - Pandas (数据处理)
# - OpenCV (图像处理)
import numpy as np
import threading
import time
def numpy_task():
"""NumPy 操作会释放 GIL"""
# 大型矩阵运算
matrix = np.random.rand(1000, 1000)
result = np.dot(matrix, matrix)
return result
def test_numpy_threading():
"""测试 NumPy 多线程"""
start = time.time()
threads = []
for _ in range(4):
thread = threading.Thread(target=numpy_task)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
elapsed = time.time() - start
return elapsed
print(f"NumPy 多线程耗时: {test_numpy_threading():.2f}秒")
# NumPy 释放 GIL,多线程有效
GIL 最佳实践
"""
GIL 最佳实践:
1. CPU 密集型任务:
- 使用多进程 (multiprocessing)
- 使用 Joblib, Celery 等任务队列
- 使用 NumPy, Pandas 等释放 GIL 的库
2. I/O 密集型任务:
- 使用多线程 (threading)
- 使用异步编程 (asyncio)
- 使用线程池 (ThreadPoolExecutor)
3. 混合任务:
- 分离 CPU 和 I/O 任务
- 使用进程池处理 CPU 密集型
- 使用线程池或协程处理 I/O 密集型
4. 避免频繁的线程切换:
- 减少锁的粒度
- 使用线程局部变量
- 使用队列等线程安全的数据结构
"""
import asyncio
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_intensive(n):
"""CPU 密集型"""
total = 0
for i in range(n):
total += i ** 2
return total
async def io_intensive(name):
"""I/O 密集型"""
await asyncio.sleep(1)
return f"{name} 完成"
# 混合使用
async def hybrid_approach():
"""混合方法"""
# CPU 任务使用进程池
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as executor:
cpu_result = await loop.run_in_executor(
executor,
cpu_intensive,
100000
)
print(f"CPU 任务结果: {cpu_result}")
# I/O 任务使用协程
io_results = await asyncio.gather(
io_intensive("Task-1"),
io_intensive("Task-2"),
io_intensive("Task-3")
)
print(f"I/O 任务结果: {io_results}")
# 运行
asyncio.run(hybrid_approach())
小结
本章节介绍了 Python 的并发编程:
并发基础
- 并发 vs 并行: 概念区别和适用场景
- 任务类型: CPU 密集型 vs I/O 密集型
- 并发方式: 多线程、多进程、异步编程的特点和选择
多线程
- threading 模块: 创建线程、线程属性、线程间共享数据
- Thread 类: 守护线程、线程名称和标识
- 线程同步: Lock, RLock, Semaphore, Event, Condition, Barrier
- 锁机制: 死锁及其避免、锁的性能影响
- 线程池: ThreadPoolExecutor, as_completed, wait
多进程
- multiprocessing 模块: 创建进程、进程属性、数据隔离
- Process 类: 守护进程、进程间内存隔离
- 进程间通信: Queue, Pipe, Manager, 共享内存
- 进程池: Pool, imap, istarmap, 最佳实践
异步编程
- asyncio 基础: 事件循环、协程、任务
- async/await: 基本语法、awaitable 对象
- 事件循环: 事件循环方法、任务调度
- 协程: 协程进阶、并发模式、超时控制
并发安全
- 线程安全: 线程安全的数据结构、线程局部变量
- 进程安全: 进程间同步
- 异步安全: 协程同步原语
GIL 详解
- 什么是 GIL: 全局解释器锁的概念和影响
- 绕过 GIL: 多进程、C 扩展
- 最佳实践: 根据任务类型选择合适的并发方式
掌握 Python 并发编程可以显著提高程序性能和响应能力,但需要根据具体场景选择合适的并发方式,并注意并发安全问题。