装饰器与生成器
装饰器(Decorator)和生成器(Generator)是 Python 的高级特性,它们可以帮助我们写出更优雅、更高效的代码。
装饰器
装饰器基础
什么是装饰器
装饰器 是一种修改函数或类行为的设计模式,它可以在不修改原函数代码的情况下,为函数添加额外的功能。装饰器本质上是一个接受函数作为参数并返回一个新函数的函数。
# 装饰器的基本概念
def my_decorator(func):
"""这是一个装饰器"""
def wrapper():
print("在函数执行前做一些操作")
func()
print("在函数执行后做一些操作")
return wrapper
@my_decorator
def say_hello():
print("Hello!")
# 调用函数
say_hello()
# 输出:
# 在函数执行前做一些操作
# Hello!
# 在函数执行后做一些操作
# @my_decorator 等价于:
# say_hello = my_decorator(say_hello)
装饰器的基本语法
# 定义装饰器
def decorator_name(func):
def wrapper(*args, **kwargs):
# 前置处理
result = func(*args, **kwargs)
# 后置处理
return result
return wrapper
# 使用装饰器
@decorator_name
def function():
pass
简单装饰器示例
# 计时装饰器 - 测量函数执行时间
import time
def timing_decorator(func):
"""计算函数执行时间的装饰器"""
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} 执行时间: {end_time - start_time:.4f}秒")
return result
return wrapper
@timing_decorator
def slow_function():
"""模拟一个慢速函数"""
time.sleep(1)
return "完成"
slow_function() # slow_function 执行时间: 1.0012秒
# 日志装饰器
def log_decorator(func):
"""记录函数调用的装饰器"""
def wrapper(*args, **kwargs):
print(f"调用函数: {func.__name__}")
print(f"参数: args={args}, kwargs={kwargs}")
result = func(*args, **kwargs)
print(f"返回值: {result}")
return result
return wrapper
@log_decorator
def add(a, b):
return a + b
add(3, 5)
# 输出:
# 调用函数: add
# 参数: args=(3, 5), kwargs={}
# 返回值: 8
函数装饰器
保留原函数信息
使用装饰器后,函数的元数据(如 __name__、__doc__)会被替换为包装函数的信息。使用 functools.wraps 可以保留原函数的信息。
from functools import wraps
def my_decorator(func):
@wraps(func) # 保留原函数的元数据
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@my_decorator
def greet(name):
"""问候函数"""
return f"Hello, {name}!"
print(greet.__name__) # greet (而不是 wrapper)
print(greet.__doc__) # 问候函数 (而不是 None)
装饰器带参数
有时装饰器本身需要接受参数,这需要创建一个装饰器工厂函数。
# 装饰器带参数
def repeat(times):
"""重复执行函数指定次数的装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
results = []
for _ in range(times):
result = func(*args, **kwargs)
results.append(result)
return results
return wrapper
return decorator
@repeat(3)
def say_hello(name):
return f"Hello, {name}!"
print(say_hello("Alice"))
# 输出: ['Hello, Alice!', 'Hello, Alice!', 'Hello, Alice!']
# 实际应用: 带重试功能的装饰器
import time
def retry(max_attempts, delay=1):
"""函数执行失败时重试的装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
print(f"尝试 {attempt + 1} 失败, {delay}秒后重试...")
time.sleep(delay)
return wrapper
return decorator
@retry(max_attempts=3, delay=2)
def unstable_function():
"""模拟一个不稳定的函数"""
import random
if random.random() < 0.7:
raise Exception("随机失败!")
return "成功!"
print(unstable_function())
装饰多个函数
# 批量装饰函数
def debug(func):
"""调试装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
print(f"[DEBUG] 调用 {func.__name__}")
result = func(*args, **kwargs)
print(f"[DEBUG] {func.__name__} 返回 {result}")
return result
return wrapper
# 装饰多个函数
@debug
def multiply(a, b):
return a * b
@debug
def divide(a, b):
return a / b
multiply(3, 4) # [DEBUG] 调用 multiply, [DEBUG] multiply 返回 12
divide(10, 2) # [DEBUG] 调用 divide, [DEBUG] divide 返回 5.0
类装饰器
类作为装饰器
使用类作为装饰器可以更好地管理状态。
class CountDecorator:
"""计数装饰器"""
def __init__(self, func):
self.func = func
self.count = 0
def __call__(self, *args, **kwargs):
self.count += 1
print(f"{self.func.__name__} 被调用了 {self.count} 次")
return self.func(*args, **kwargs)
@CountDecorator
def greet(name):
return f"Hello, {name}!"
greet("Alice") # greet 被调用了 1 次
greet("Bob") # greet 被调用了 2 次
greet("Charlie") # greet 被调用了 3 次
# 带参数的类装饰器
class Cached:
"""缓存装饰器"""
def __init__(self, func):
self.func = func
self.cache = {}
def __call__(self, *args):
if args not in self.cache:
self.cache[args] = self.func(*args)
print(f"计算并缓存: {args}")
else:
print(f"从缓存读取: {args}")
return self.cache[args]
@Cached
def fibonacci(n):
"""计算斐波那契数列(带缓存)"""
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
print(fibonacci(10))
print(fibonacci(10)) # 第二次从缓存读取
装饰类
装饰器不仅可以装饰函数,也可以装饰类。
def add_str_method(cls):
"""为类添加 __str__ 方法的装饰器"""
original_str = cls.__str__
def new_str(self):
return f"<{cls.__name__} instance>"
cls.__str__ = new_str
return cls
@add_str_method
class Person:
def __init__(self, name):
self.name = name
person = Person("Alice")
print(person) # <Person instance>
# 实际应用: 单例模式装饰器
def singleton(cls):
"""单例装饰器"""
instances = {}
@wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton
class Database:
def __init__(self):
print("初始化数据库连接")
db1 = Database() # 初始化数据库连接
db2 = Database() # 不会再次初始化
print(db1 is db2) # True
装饰器链
多个装饰器可以叠加使用,从下到上依次应用。
# 装饰器链的执行顺序
@decorator1
@decorator2
@decorator3
def function():
pass
# 等价于:
# function = decorator1(decorator2(decorator3(function)))
from functools import wraps
def bold(func):
@wraps(func)
def wrapper(*args, **kwargs):
return f"<b>{func(*args, **kwargs)}</b>"
return wrapper
def italic(func):
@wraps(func)
def wrapper(*args, **kwargs):
return f"<i>{func(*args, **kwargs)}</i>"
return wrapper
def underline(func):
@wraps(func)
def wrapper(*args, **kwargs):
return f"<u>{func(*args, **kwargs)}</u>"
return wrapper
@bold
@italic
@underline
def format_text(text):
return text
print(format_text("Hello"))
# 输出: <b><i><u>Hello</u></i></b>
# 实际应用: 组合多个功能
import time
from functools import wraps
def timer(func):
"""计时装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} 耗时: {end - start:.4f}秒")
return result
return wrapper
def logger(func):
"""日志装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
print(f"[LOG] 调用 {func.__name__}")
result = func(*args, **kwargs)
print(f"[LOG] {func.__name__} 完成")
return result
return wrapper
@timer
@logger
def process_data(data):
"""处理数据"""
time.sleep(0.1)
return data.upper()
print(process_data("hello"))
# 输出:
# [LOG] 调用 process_data
# [LOG] process_data 完成
# process_data 耗时: 0.1001秒
# HELLO
常用内置装饰器
@staticmethod
静态方法装饰器,不需要访问实例或类。
class Math:
@staticmethod
def add(a, b):
return a + b
@staticmethod
def multiply(a, b):
return a * b
# 不需要创建实例就可以调用
print(Math.add(5, 3)) # 8
print(Math.multiply(4, 3)) # 12
# 也可以通过实例调用
m = Math()
print(m.add(5, 3)) # 8
@classmethod
类方法装饰器,第一个参数是类本身(通常命名为 cls)。
class Person:
species = "人类"
def __init__(self, name):
self.name = name
@classmethod
def from_birth_year(cls, name, birth_year):
"""从出生年份创建实例"""
from datetime import datetime
current_year = datetime.now().year
age = current_year - birth_year
return cls(name, age) # 假设有 age 参数
@classmethod
def get_species(cls):
"""获取类属性"""
return cls.species
# 使用类方法创建实例
person = Person.from_birth_year("Alice", 1990)
print(person.name)
# 访问类属性
print(Person.get_species()) # 人类
# 工厂模式示例
class Animal:
def __init__(self, name):
self.name = name
def speak(self):
raise NotImplementedError("子类必须实现此方法")
class Dog(Animal):
def speak(self):
return f"{self.name} 说: 汪汪!"
class Cat(Animal):
def speak(self):
return f"{self.name} 说: 喵喵!"
@classmethod
def create(cls, name):
"""工厂方法"""
return cls(name)
# 使用工厂方法
cat = Cat.create("小白")
print(cat.speak()) # 小白 说: 喵喵!
@property
属性装饰器,将方法转换为属性访问。
class Person:
def __init__(self, first_name, last_name):
self.first_name = first_name
self.last_name = last_name
@property
def full_name(self):
"""获取全名"""
return f"{self.first_name} {self.last_name}"
@full_name.setter
def full_name(self, name):
"""设置全名"""
first, last = name.split()
self.first_name = first
self.last_name = last
@full_name.deleter
def full_name(self):
"""删除全名"""
self.first_name = ""
self.last_name = ""
person = Person("张", "三")
print(person.full_name) # 张 三
person.full_name = "李 四"
print(person.first_name) # 李
print(person.last_name) # 四
# 实际应用: 带验证的属性
class Temperature:
def __init__(self, celsius):
self._celsius = celsius
@property
def celsius(self):
"""获取摄氏度"""
return self._celsius
@celsius.setter
def celsius(self, value):
"""设置摄氏度(带验证)"""
if value < -273.15:
raise ValueError("温度不能低于绝对零度")
self._celsius = value
@property
def fahrenheit(self):
"""获取华氏度"""
return self._celsius * 9/5 + 32
@fahrenheit.setter
def fahrenheit(self, value):
"""设置华氏度"""
self._celsius = (value - 32) * 5/9
temp = Temperature(25)
print(temp.celsius) # 25
print(temp.fahrenheit) # 77.0
temp.fahrenheit = 100
print(temp.celsius) # 37.777...
@functools.lru_cache
缓存装饰器,缓存函数的结果以提高性能。
from functools import lru_cache
@lru_cache(maxsize=128)
def fibonacci(n):
"""计算斐波那契数列(带缓存)"""
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# 第一次计算较慢
print(fibonacci(100))
# 第二次直接从缓存读取,非常快
print(fibonacci(100))
# 查看缓存信息
print(fibonacci.cache_info())
# CacheInfo(hits=1, misses=101, maxsize=128, currsize=102)
# 清除缓存
fibonacci.cache_clear()
# 实际应用: 缓存网络请求
import requests
from functools import lru_cache
@lru_cache(maxsize=100)
def get_user_data(user_id):
"""获取用户数据(带缓存)"""
print(f"请求用户 {user_id} 的数据...")
# response = requests.get(f"https://api.example.com/users/{user_id}")
# return response.json()
return {"id": user_id, "name": f"User{user_id}"}
# 第一次调用会发送请求
print(get_user_data(1))
# 第二次调用直接返回缓存
print(get_user_data(1))
装饰器应用场景
权限验证
from functools import wraps
def require_admin(func):
"""管理员权限验证装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
user = kwargs.get('user') or (args[0] if args else None)
if not user or not user.get('is_admin'):
raise PermissionError("需要管理员权限")
return func(*args, **kwargs)
return wrapper
def login_required(func):
"""登录验证装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
user = kwargs.get('user') or (args[0] if args else None)
if not user or not user.get('is_logged_in'):
raise PermissionError("请先登录")
return func(*args, **kwargs)
return wrapper
@login_required
@require_admin
def delete_user(user, user_id):
"""删除用户"""
return f"用户 {user_id} 已删除"
# 使用
admin = {"is_logged_in": True, "is_admin": True}
print(delete_user(admin, 123)) # 用户 123 已删除
normal_user = {"is_logged_in": True, "is_admin": False}
# print(delete_user(normal_user, 123)) # PermissionError: 需要管理员权限
性能监控
import time
from functools import wraps
def measure_performance(func):
"""性能监控装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
start_memory = get_memory_usage()
result = func(*args, **kwargs)
end_time = time.time()
end_memory = get_memory_usage()
print(f"{func.__name__} 性能统计:")
print(f" 执行时间: {end_time - start_time:.4f}秒")
print(f" 内存使用: {end_memory - start_memory:.2f}MB")
return result
return wrapper
def get_memory_usage():
"""获取当前进程内存使用量"""
import psutil
import os
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
@measure_performance
def process_large_data():
"""处理大数据"""
data = [i ** 2 for i in range(100000)]
return sum(data)
process_large_data()
异常处理
from functools import wraps
def handle_exceptions(*exceptions):
"""异常处理装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except exceptions as e:
print(f"错误: {e}")
return None
return wrapper
return decorator
@handle_exceptions(ValueError, TypeError, ZeroDivisionError)
def divide(a, b):
"""除法运算"""
return a / b
print(divide(10, 2)) # 5.0
print(divide(10, 0)) # 错误: division by zero, 输出: None
print(divide("10", "2")) # 错误: ...
输入验证
from functools import wraps
def validate_types(**type_map):
"""类型验证装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 获取函数参数
from inspect import signature
sig = signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
# 验证参数类型
for param_name, param_value in bound_args.arguments.items():
if param_name in type_map:
expected_type = type_map[param_name]
if not isinstance(param_value, expected_type):
raise TypeError(
f"{param_name} 应该是 {expected_type.__name__} 类型"
)
return func(*args, **kwargs)
return wrapper
return decorator
@validate_types(name=str, age=int, email=str)
def create_user(name, age, email):
"""创建用户"""
return {"name": name, "age": age, "email": email}
print(create_user("Alice", 25, "alice@example.com"))
# print(create_user(123, 25, "alice@example.com")) # TypeError
生成器
生成器函数
什么是生成器
生成器 是一种特殊的迭代器,使用 yield 关键字按需生成值,而不是一次性生成所有值。生成器可以节省内存,提高性能。
# 普通函数
def generate_numbers(n):
"""生成前 n 个数的平方(普通函数)"""
result = []
for i in range(n):
result.append(i ** 2)
return result
# 使用列表
numbers = generate_numbers(1000000) # 占用大量内存
# 生成器函数
def generate_numbers_gen(n):
"""生成前 n 个数的平方(生成器)"""
for i in range(n):
yield i ** 2
# 使用生成器
numbers_gen = generate_numbers_gen(1000000) # 几乎不占用内存
print(next(numbers_gen)) # 0
print(next(numbers_gen)) # 1
基本语法
def simple_generator():
"""简单的生成器"""
yield 1
yield 2
yield 3
# 创建生成器
gen = simple_generator()
# 使用 next() 获取值
print(next(gen)) # 1
print(next(gen)) # 2
print(next(gen)) # 3
# print(next(gen)) # StopIteration
# 使用 for 循环
gen = simple_generator()
for value in gen:
print(value)
# 1
# 2
# 3
# 转换为列表
print(list(simple_generator())) # [1, 2, 3]
yield 语句
yield vs return
def with_return():
"""使用 return"""
return [1, 2, 3]
def with_yield():
"""使用 yield"""
yield 1
yield 2
yield 3
# return 一次性返回所有值
print(with_return()) # [1, 2, 3]
# yield 逐个返回值
gen = with_yield()
print(next(gen)) # 1
print(next(gen)) # 2
print(next(gen)) # 3
# 对比: 无限序列
def infinite_sequence():
"""生成无限序列(只有 yield 能做到)"""
num = 0
while True:
yield num
num += 1
gen = infinite_sequence()
print(next(gen)) # 0
print(next(gen)) # 1
print(next(gen)) # 2
# 可以无限继续...
yield 表达式
def echo_generator():
"""回显生成器"""
while True:
received = yield
yield f"回显: {received}"
gen = echo_generator()
next(gen) # 启动生成器
print(gen.send("Hello")) # 回显: Hello
next(gen) # 前进到下一个 yield
print(gen.send("World")) # 回显: World
# 实际应用: 生产者-消费者模式
def consumer():
"""消费者"""
print("等待数据...")
try:
while True:
data = yield
print(f"处理: {data}")
except GeneratorExit:
print("消费者关闭")
def producer(consumer_gen):
"""生产者"""
next(consumer_gen) # 启动消费者
for i in range(5):
consumer_gen.send(i)
consumer_gen.close()
# 使用
cons = consumer()
producer(cons)
# 输出:
# 等待数据...
# 处理: 0
# 处理: 1
# 处理: 2
# 处理: 3
# 处理: 4
# 消费者关闭
生成器表达式
基本语法
生成器表达式类似于列表推导式,但返回生成器而不是列表。
# 列表推导式
list_comp = [x ** 2 for x in range(10)]
print(list_comp) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 生成器表达式
gen_expr = (x ** 2 for x in range(10))
print(list(gen_expr)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
内存优势
import sys
# 列表推导式占用内存
list_comp = [x ** 2 for x in range(1000000)]
print(sys.getsizeof(list_comp)) # 约 8MB
# 生成器表达式几乎不占用内存
gen_expr = (x ** 2 for x in range(1000000))
print(sys.getsizeof(gen_expr)) # 约 200 bytes
作为函数参数
# 生成器表达式可以直接作为函数参数
sum_of_squares = sum(x ** 2 for x in range(100))
print(sum_of_squares) # 328350
max_value = max(x ** 2 for x in range(10))
print(max_value) # 81
# 读取大文件
# 不使用列表,直接生成器表达式
lines = (line.strip() for line in open('large_file.txt'))
longest = max(lines, key=len)
生成器进阶
yield from
yield from 用于在一个生成器中委托给另一个生成器或可迭代对象。
def sub_generator():
"""子生成器"""
yield 1
yield 2
def main_generator():
"""主生成器"""
yield from sub_generator() # 委托给子生成器
yield 3
yield 4
print(list(main_generator())) # [1, 2, 3, 4]
# 实际应用: 扁平化嵌套列表
def flatten(nested_list):
"""扁平化嵌套列表"""
for item in nested_list:
if isinstance(item, list):
yield from flatten(item) # 递归
else:
yield item
nested = [1, [2, [3, 4]], 5, [6, 7]]
print(list(flatten(nested))) # [1, 2, 3, 4, 5, 6, 7]
# 链式生成器
def generator1():
yield from range(3)
def generator2():
yield from range(3, 6)
def generator3():
yield from generator1()
yield from generator2()
print(list(generator3())) # [0, 1, 2, 3, 4, 5]
生成器的方法
def my_generator():
"""演示生成器方法的生成器"""
try:
yield 1
yield 2
yield 3
except ValueError:
print("捕获到 ValueError")
yield "从异常中恢复"
finally:
print("生成器清理")
# close() 方法 - 关闭生成器
gen = my_generator()
print(next(gen)) # 1
gen.close() # 关闭生成器
# print(next(gen)) # StopIteration
# throw() 方法 - 向生成器抛出异常
gen = my_generator()
print(next(gen)) # 1
print(next(gen)) # 2
print(gen.throw(ValueError, "测试异常")) # 捕获到 ValueError, 输出: 从异常中恢复
生成器应用场景
处理大文件
# 读取大文件(不会一次性加载到内存)
def read_large_file(file_path):
"""逐行读取大文件"""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip()
# 使用
for line in read_large_file('large_file.txt'):
print(line)
# 处理 CSV 文件
def parse_csv(file_path):
"""解析 CSV 文件"""
with open(file_path, 'r', encoding='utf-8') as f:
headers = f.readline().strip().split(',')
for line in f:
values = line.strip().split(',')
yield dict(zip(headers, values))
for row in parse_csv('data.csv'):
print(row)
无限序列
def fibonacci():
"""斐波那契数列生成器"""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# 获取前 10 个斐波那契数
fib = fibonacci()
for _ in range(10):
print(next(fib))
# 0, 1, 1, 2, 3, 5, 8, 13, 21, 34
# 素数生成器
def primes():
"""素数生成器"""
yield 2
primes_so_far = [2]
num = 3
while True:
is_prime = True
for p in primes_so_far:
if p * p > num:
break
if num % p == 0:
is_prime = False
break
if is_prime:
primes_so_far.append(num)
yield num
num += 2
# 获取前 10 个素数
prime_gen = primes()
for _ in range(10):
print(next(prime_gen))
# 2, 3, 5, 7, 11, 13, 17, 19, 23, 29
数据管道
# 数据处理管道
def read_lines(file_path):
"""读取文件行"""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip()
def filter_lines(lines, keyword):
"""过滤包含关键字的行"""
for line in lines:
if keyword in line:
yield line
def transform_lines(lines):
"""转换行"""
for line in lines:
yield line.upper()
def write_lines(lines, output_file):
"""写入文件"""
with open(output_file, 'w', encoding='utf-8') as f:
for line in lines:
f.write(line + '\n')
# 构建管道
lines = read_lines('input.txt')
filtered = filter_lines(lines, 'error')
transformed = transform_lines(filtered)
write_lines(transformed, 'output.txt')
批处理
def batch_generator(data, batch_size):
"""将数据分批"""
batch = []
for item in data:
batch.append(item)
if len(batch) >= batch_size:
yield batch
batch = []
if batch: # 最后一批
yield batch
# 使用
data = range(100)
for batch in batch_generator(data, 10):
print(f"处理批次: {batch}")
# 实际应用: 批量数据库插入
def batch_insert(records, table_name, batch_size=1000):
"""批量插入数据库"""
for batch in batch_generator(records, batch_size):
# INSERT INTO table_name VALUES (...), (...), ...
print(f"插入 {len(batch)} 条记录到 {table_name}")
records = range(5000)
batch_insert(records, "users", batch_size=1000)
惰性求值
def lazy_map(func, iterable):
"""惰性 map"""
for item in iterable:
yield func(item)
def lazy_filter(predicate, iterable):
"""惰性 filter"""
for item in iterable:
if predicate(item):
yield item
# 链式操作
result = lazy_map(
lambda x: x ** 2,
lazy_filter(
lambda x: x % 2 == 0,
range(100)
)
)
# 只在需要时才计算
for _ in range(5):
print(next(result))
# 0, 4, 16, 36, 64
迭代器
迭代协议
什么是迭代器
迭代器 是实现了迭代协议的对象,可以通过 iter() 和 next() 函数进行迭代。
# 迭代器协议
my_list = [1, 2, 3, 4, 5]
# 获取迭代器
my_iter = iter(my_list)
# 使用迭代器
print(next(my_iter)) # 1
print(next(my_iter)) # 2
print(next(my_iter)) # 3
# 迭代器只能前进,不能后退
# print(next(my_iter)) # 4
# print(next(my_iter)) # 5
# print(next(my_iter)) # StopIteration
# for 循环自动处理迭代协议
for item in my_list:
print(item)
可迭代对象 vs 迭代器
# 可迭代对象(Iterable): 可以被 iter() 调用的对象
# 迭代器(Iterator): 实现了 __iter__ 和 __next__ 的对象
from collections.abc import Iterable, Iterator
# 列表是可迭代的,但不是迭代器
my_list = [1, 2, 3]
print(isinstance(my_list, Iterable)) # True
print(isinstance(my_list, Iterator)) # False
# iter() 返回迭代器
my_iter = iter(my_list)
print(isinstance(my_iter, Iterable)) # True
print(isinstance(my_iter, Iterator)) # True
自定义迭代器
实现迭代协议
class Counter:
"""自定义计数器迭代器"""
def __init__(self, start, end):
self.start = start
self.end = end
def __iter__(self):
"""返回迭代器对象"""
return self
def __next__(self):
"""返回下一个值"""
if self.start > self.end:
raise StopIteration
current = self.start
self.start += 1
return current
# 使用
counter = Counter(1, 5)
for num in counter:
print(num)
# 1, 2, 3, 4, 5
无限迭代器
class InfiniteCounter:
"""无限计数器"""
def __init__(self, start=0):
self.current = start
def __iter__(self):
return self
def __next__(self):
current = self.current
self.current += 1
return current
# 使用
counter = InfiniteCounter(10)
for _ in range(5):
print(next(counter))
# 10, 11, 12, 13, 14
反向迭代器
class ReverseIterator:
"""反向迭代器"""
def __init__(self, data):
self.data = data
self.index = len(data) - 1
def __iter__(self):
return self
def __next__(self):
if self.index < 0:
raise StopIteration
value = self.data[self.index]
self.index -= 1
return value
# 使用
reverse_iter = ReverseIterator([1, 2, 3, 4, 5])
for item in reverse_iter:
print(item)
# 5, 4, 3, 2, 1
contextlib
上下文管理器
什么是上下文管理器
上下文管理器 是实现了 __enter__ 和 __exit__ 方法的对象,通常用于资源的获取和释放。
# 自定义上下文管理器
class FileManager:
"""文件管理器"""
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
"""进入上下文"""
print("打开文件")
self.file = open(self.filename, self.mode)
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文"""
print("关闭文件")
if self.file:
self.file.close()
return False # 不抑制异常
# 使用
with FileManager('test.txt', 'w') as f:
f.write('Hello, World!')
# 输出:
# 打开文件
# 关闭文件
使用 contextmanager
from contextlib import contextmanager
@contextmanager
def file_manager(filename, mode):
"""文件管理器上下文"""
print("打开文件")
file = open(filename, mode)
try:
yield file
finally:
print("关闭文件")
file.close()
# 使用
with file_manager('test.txt', 'w') as f:
f.write('Hello, World!')
常用上下文管理器
@contextlib.suppress
from contextlib import suppress
# 抑制特定异常
with suppress(FileNotFoundError):
os.remove('non_existent_file.txt') # 不会抛出异常
# 等价于:
try:
os.remove('non_existent_file.txt')
except FileNotFoundError:
pass
@contextlib.redirect_stdout / redirect_stderr
from contextlib import redirect_stdout, redirect_stderr
# 重定向输出
with open('output.txt', 'w') as f:
with redirect_stdout(f):
print("这行内容会被写入文件")
print("这行也会被写入文件")
# 重定向错误输出
with open('errors.txt', 'w') as f:
with redirect_stderr(f):
print("错误信息", file=sys.stderr)
contextlib.ExitStack
from contextlib import ExitStack
# 管理多个上下文管理器
filenames = ['file1.txt', 'file2.txt', 'file3.txt']
with ExitStack() as stack:
files = [stack.enter_context(open(fname)) for fname in filenames]
# 处理多个文件
for f in files:
print(f.read())
# 所有文件都会被正确关闭
装饰器与生成器结合
缓存生成器结果
from functools import wraps
def cached_generator(func):
"""缓存生成器装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
cache = []
def generator():
for item in func(*args, **kwargs):
cache.append(item)
yield item
# 如果有缓存,先返回缓存
for item in cache:
yield item
# 否则生成新的值
for item in generator():
if item not in cache:
cache.append(item)
yield item
return wrapper
@cached_generator
def data_source():
"""数据源生成器"""
for i in range(5):
yield i
# 第一次调用
gen1 = data_source()
print(list(gen1)) # [0, 1, 2, 3, 4]
# 第二次调用(从缓存)
gen2 = data_source()
print(list(gen2)) # [0, 1, 2, 3, 4]
生成器性能监控
import time
from functools import wraps
def profile_generator(func):
"""生成器性能分析装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
count = 0
for item in func(*args, **kwargs):
count += 1
yield item
end_time = time.time()
print(f"{func.__name__} 生成了 {count} 个值, 耗时: {end_time - start_time:.4f}秒")
return wrapper
@profile_generator
def number_generator(n):
"""数字生成器"""
for i in range(n):
yield i
# 使用
for num in number_generator(1000000):
pass # 只是为了迭代
# 输出: number_generator 生成了 1000000 个值, 耗时: 0.xxxx秒
小结
本章节介绍了 Python 的装饰器、生成器与迭代器:
装饰器
- 装饰器基础: 装饰器概念, 基本语法, 简单装饰器示例
- 函数装饰器: 保留函数信息, 带参数装饰器, 装饰多个函数
- 类装饰器: 类作为装饰器, 装饰类
- 装饰器链: 多个装饰器叠加使用
- 内置装饰器: @staticmethod, @classmethod, @property, @lru_cache
- 应用场景: 权限验证, 性能监控, 异常处理, 输入验证
生成器
- 生成器函数: 生成器概念, 基本语法
- yield 语句: yield vs return, yield 表达式
- 生成器表达式: 基本语法, 内存优势, 作为函数参数
- 生成器进阶: yield from, 生成器方法
- 应用场景: 处理大文件, 无限序列, 数据管道, 批处理, 惰性求值
迭代器
- 迭代协议: 可迭代对象 vs 迭代器
- 自定义迭代器: 实现迭代协议, 无限迭代器, 反向迭代器
contextlib
- 上下文管理器: 自定义上下文管理器, 使用 contextmanager
- 常用管理器: suppress, redirect_stdout, ExitStack
装饰器和生成器是 Python 中强大的特性,掌握它们可以帮助你编写更优雅、更高效的代码。装饰器提供了优雅的方式来扩展功能,而生成器则提供了高效的内存管理和数据处理能力。