跳到主要内容

网络编程

网络编程是 Python 的重要应用领域,它涵盖了从简单的 HTTP 请求到底层 Socket 通信的各种技术。本章将介绍 HTTP 编程、Socket 编程、WebSocket、API 开发和网络爬虫等内容。

HTTP 编程

requests 库

安装和基础使用

# 安装 requests 库
# pip install requests

import requests

# 发送简单的 GET 请求
response = requests.get('https://httpbin.org/get')

# 查看响应状态码
print(f"状态码: {response.status_code}") # 200

# 查看响应内容
print(f"响应内容: {response.text}")

# 查看响应头
print(f"响应头: {response.headers}")

# 查看 JSON 数据
print(f"JSON数据: {response.json()}")

# 响应对象的主要属性和方法
print(f"URL: {response.url}") # 请求的 URL
print(f"编码: {response.encoding}") # 响应的编码
print(f"Cookies: {response.cookies}") # 响应的 cookies
print(f"耗时: {response.elapsed.total_seconds()}秒") # 请求耗时

GET 请求

import requests

# 基本 GET 请求
response = requests.get('https://httpbin.org/get')
print(response.json())

# 带参数的 GET 请求
params = {
'name': 'Alice',
'age': 25,
'city': 'Beijing'
}

response = requests.get('https://httpbin.org/get', params=params)
print(f"请求URL: {response.url}")
# 实际请求: https://httpbin.org/get?name=Alice&age=25&city=Beijing

# 设置请求头
headers = {
'User-Agent': 'My App/1.0',
'Accept': 'application/json',
'Accept-Language': 'zh-CN'
}

response = requests.get(
'https://httpbin.org/get',
headers=headers
)
print(f"自定义请求头: {response.json()['headers']}")

# 设置超时
try:
# 连接超时5秒,读取超时10秒
response = requests.get(
'https://httpbin.org/get',
timeout=(5, 10)
)
print("请求成功")
except requests.exceptions.Timeout:
print("请求超时")

# 设置超时(统一超时时间)
response = requests.get('https://httpbin.org/get', timeout=5)

# 下载文件
response = requests.get('https://httpbin.org/image/png')
with open('image.png', 'wb') as f:
f.write(response.content)

# 流式下载大文件
response = requests.get(
'https://httpbin.org/stream/20',
stream=True
)

with open('stream.txt', 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)

POST 请求

import requests
import json

# 发送表单数据
data = {
'username': 'alice',
'password': '123456'
}

response = requests.post(
'https://httpbin.org/post',
data=data
)
print(f"表单提交: {response.json()['form']}")

# 发送 JSON 数据
json_data = {
'name': 'Bob',
'age': 30,
'hobbies': ['reading', 'swimming']
}

response = requests.post(
'https://httpbin.org/post',
json=json_data # 自动设置 Content-Type: application/json
)
print(f"JSON提交: {response.json()['json']}")

# 手动发送 JSON
response = requests.post(
'https://httpbin.org/post',
data=json.dumps(json_data),
headers={'Content-Type': 'application/json'}
)

# 上传文件
files = {
'file': open('test.txt', 'rb')
}

response = requests.post(
'https://httpbin.org/post',
files=files
)

# 上传文件并指定文件名
files = {
'file': ('custom_name.txt', open('test.txt', 'rb'))
}

response = requests.post(
'https://httpbin.org/post',
files=files
)

# 上传多个文件
files = [
('files', open('file1.txt', 'rb')),
('files', open('file2.txt', 'rb'))
]

response = requests.post(
'https://httpbin.org/post',
files=files
)

请求头和响应头

import requests

# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0',
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': 'https://www.google.com'
}

response = requests.get(
'https://httpbin.org/headers',
headers=headers
)

# 查看响应头
print("所有响应头:")
for key, value in response.headers.items():
print(f" {key}: {value}")

# 访问特定响应头
print(f"\nContent-Type: {response.headers['Content-Type']}")
print(f"Content-Length: {response.headers.get('Content-Length')}")

# 不区分大小写
print(f"content-type: {response.headers['content-type']}")

# 查看服务器返回的特定头
response = requests.get('https://www.example.com')
print(f"\nServer: {response.headers.get('Server')}")
print(f"Date: {response.headers.get('Date')}")

# 常用请求头说明
"""
User-Agent: 浏览器标识
Accept: 接受的内容类型
Accept-Language: 接受的语言
Accept-Encoding: 接受的编码方式
Authorization: 认证信息
Cookie: 客户端存储的 cookie
Referer: 请求来源页
Content-Type: 请求体的内容类型
Content-Length: 请求体的长度
"""
import requests

# 获取响应中的 cookies
response = requests.get('https://httpbin.org/cookies/set/name/value')
print(f"响应Cookies: {response.cookies}")

# 访问特定的 cookie
print(f"name cookie: {response.cookies.get('name')}")

# 发送请求时携带 cookies
cookies = {
'name': 'Alice',
'age': '25'
}

response = requests.get(
'https://httpbin.org/cookies',
cookies=cookies
)
print(f"发送的Cookies: {response.json()['cookies']}")

# 使用 CookieJar 对象管理 cookies
jar = requests.cookies.RequestsCookieJar()
jar.set('name', 'Bob', domain='httpbin.org')
jar.set('session', 'abc123', domain='httpbin.org')

response = requests.get(
'https://httpbin.org/cookies',
cookies=jar
)

# 从响应中提取 cookies 并保存到 CookieJar
jar = requests.cookies.RequestsCookieJar()
response = requests.get('https://www.example.com')
jar.update(response.cookies)

# 保存 cookies 到文件
import pickle

with open('cookies.pkl', 'wb') as f:
pickle.dump(jar, f)

# 从文件加载 cookies
with open('cookies.pkl', 'rb') as f:
jar = pickle.load(f)

# 使用加载的 cookies
response = requests.get(
'https://httpbin.org/cookies',
cookies=jar
)

Session 管理

import requests

# 使用 Session 对象自动管理 cookies
session = requests.Session()

# 第一次请求,设置 cookie
session.get('https://httpbin.org/cookies/set/name/Alice')

# 后续请求会自动携带之前的 cookies
response = session.get('https://httpbin.org/cookies')
print(f"Session Cookies: {response.json()['cookies']}")

# Session 会保持连接,提高性能
for i in range(5):
response = session.get('https://httpbin.org/get')
print(f"请求 {i+1}: 状态码 {response.status_code}")

# 设置 Session 的默认参数
session.headers.update({
'User-Agent': 'MySession/1.0'
})

session.verify = False # 忽略 SSL 证书验证
session.timeout = 10 # 默认超时时间

# 使用 Session 的代理设置
session.proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080'
}

# 清空 Session cookies
session.cookies.clear()

# 关闭 Session
session.close()

# 使用 with 语句自动关闭
with requests.Session() as session:
session.get('https://httpbin.org/cookies/set/name/Bob')
response = session.get('https://httpbin.org/cookies')
print(f"Cookies: {response.json()['cookies']}")

高级功能

异常处理

import requests
from requests.exceptions import (
RequestException,
Timeout,
ConnectionError,
HTTPError,
TooManyRedirects
)

# 完整的异常处理示例
def safe_request(url, **kwargs):
"""安全的请求方法"""
try:
response = requests.get(url, **kwargs)

# 检查 HTTP 错误状态码
response.raise_for_status()

return response

except Timeout:
print(f"请求超时: {url}")
except ConnectionError:
print(f"连接错误: {url}")
except HTTPError as e:
print(f"HTTP错误: {e.response.status_code}")
except TooManyRedirects:
print(f"重定向次数过多: {url}")
except RequestException as e:
print(f"请求异常: {e}")

return None

# 使用
response = safe_request(
'https://httpbin.org/get',
timeout=5
)

if response:
print(response.json())

# 重试机制
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()

# 配置重试策略
retry_strategy = Retry(
total=3, # 总共重试3次
backoff_factor=1, # 重试间隔递增因子
status_forcelist=[429, 500, 502, 503, 504], # 这些状态码时重试
method_whitelist=["HEAD", "GET", "OPTIONS"] # 只对这些方法重试
)

adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)

# 使用带重试的 session
response = session.get('https://httpbin.org/get')

代理设置

import requests

# 设置代理
proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080'
}

response = requests.get(
'https://httpbin.org/get',
proxies=proxies
)

# 使用需要认证的代理
proxies = {
'http': 'http://user:pass@proxy.example.com:8080',
'https': 'https://user:pass@proxy.example.com:8080'
}

# 使用 SOCKS 代理
# 需要安装: pip install requests[socks]
proxies = {
'http': 'socks5://proxy.example.com:1080',
'https': 'socks5://proxy.example.com:1080'
}

# 禁用代理
response = requests.get(
'https://httpbin.org/get',
proxies={'http': None, 'https': None}
)

# 从环境变量读取代理
# export HTTP_PROXY=http://proxy.example.com:8080
# export HTTPS_PROXY=https://proxy.example.com:8080
response = requests.get('https://httpbin.org/get')

SSL 证书验证

import requests
import urllib3

# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# 忽略 SSL 证书验证(不推荐)
response = requests.get(
'https://example.com',
verify=False
)

# 指定 CA 证书
response = requests.get(
'https://example.com',
verify='/path/to/cacert.pem'
)

# 使用证书和私钥(客户端认证)
response = requests.get(
'https://example.com',
cert=('/path/to/client.cert', '/path/to/client.key')
)

# Session 级别的 SSL 设置
session = requests.Session()
session.verify = False # 全局禁用验证
session.cert = '/path/to/client.cert' # 全局证书

Socket 编程

Socket 基础

Socket 概念

"""
Socket (套接字) 是网络编程的基础

Socket 类型:
- AF_INET: IPv4 地址族
- AF_INET6: IPv6 地址族
- AF_UNIX: 本地进程间通信

Socket 类型:
- SOCK_STREAM: TCP (面向连接,可靠)
- SOCK_DGRAM: UDP (无连接,不可靠)

Socket 工作流程:
1. 创建 socket
2. 绑定地址和端口 (bind)
3. 监听连接 (listen) - 服务器端
4. 接受连接 (accept) - 服务器端
5. 发送/接收数据 (send/recv)
6. 关闭 socket (close)
"""

import socket

# 创建 socket
# socket.AF_INET: IPv4
# socket.SOCK_STREAM: TCP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

print(f"Socket家族: {sock.family}")
print(f"Socket类型: {sock.type}")
print(f"Socket协议: {sock.proto}")

# 关闭 socket
sock.close()

TCP 客户端

import socket

# 创建 TCP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 连接服务器
server_host = 'localhost'
server_port = 8888

try:
client_socket.connect((server_host, server_port))
print(f"已连接到服务器 {server_host}:{server_port}")

# 发送数据
message = "Hello, Server!"
client_socket.send(message.encode('utf-8'))
print(f"已发送: {message}")

# 接收数据
data = client_socket.recv(1024)
print(f"收到回复: {data.decode('utf-8')}")

except ConnectionRefusedError:
print("连接被拒绝,请确认服务器是否运行")
except Exception as e:
print(f"发生错误: {e}")
finally:
# 关闭 socket
client_socket.close()
print("连接已关闭")

# 使用 with 语句自动关闭
def tcp_client(host, port, message):
"""TCP 客户端"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((host, port))
sock.send(message.encode('utf-8'))
response = sock.recv(1024)
return response.decode('utf-8')

# 使用
# response = tcp_client('localhost', 8888, 'Hello!')

TCP 服务器

import socket
import threading

def handle_client(client_socket, client_address):
"""处理客户端连接"""
try:
print(f"新连接来自: {client_address}")

# 接收数据
data = client_socket.recv(1024)
if data:
message = data.decode('utf-8')
print(f"收到消息: {message}")

# 发送回复
response = f"Echo: {message}"
client_socket.send(response.encode('utf-8'))

except Exception as e:
print(f"处理客户端时出错: {e}")
finally:
# 关闭连接
client_socket.close()
print(f"连接关闭: {client_address}")

def tcp_server(host='0.0.0.0', port=8888):
"""TCP 服务器"""
# 创建 socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 设置地址复用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

try:
# 绑定地址和端口
server_socket.bind((host, port))
print(f"服务器启动在 {host}:{port}")

# 监听连接
server_socket.listen(5)
print("等待客户端连接...")

while True:
# 接受新连接
client_socket, client_address = server_socket.accept()

# 为每个客户端创建新线程
client_thread = threading.Thread(
target=handle_client,
args=(client_socket, client_address)
)
client_thread.start()

except KeyboardInterrupt:
print("\n服务器停止")
except Exception as e:
print(f"服务器错误: {e}")
finally:
server_socket.close()

# 启动服务器
# tcp_server()

UDP Socket

UDP 客户端

import socket

def udp_client(host, port, message):
"""UDP 客户端"""
# 创建 UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

try:
# 发送数据 (无需连接)
sock.sendto(message.encode('utf-8'), (host, port))
print(f"已发送到 {host}:{port}: {message}")

# 接收回复
data, addr = sock.recvfrom(1024)
print(f"收到来自 {addr} 的回复: {data.decode('utf-8')}")

except Exception as e:
print(f"发生错误: {e}")
finally:
sock.close()

# 使用
# udp_client('localhost', 9999, 'Hello UDP Server!')

UDP 服务器

import socket

def udp_server(host='0.0.0.0', port=9999):
"""UDP 服务器"""
# 创建 UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

try:
# 绑定地址和端口
sock.bind((host, port))
print(f"UDP 服务器启动在 {host}:{port}")

while True:
# 接收数据
data, addr = sock.recvfrom(1024)
message = data.decode('utf-8')
print(f"收到来自 {addr} 的消息: {message}")

# 发送回复
response = f"Echo: {message}"
sock.sendto(response.encode('utf-8'), addr)
print(f"已回复 {addr}")

except KeyboardInterrupt:
print("\n服务器停止")
except Exception as e:
print(f"服务器错误: {e}")
finally:
sock.close()

# 启动服务器
# udp_server()

UDP vs TCP

"""
TCP (Transmission Control Protocol):
- 面向连接的协议
- 可靠传输 (保证数据顺序和完整性)
- 有流量控制和拥塞控制
- 适合传输重要数据
- 速度相对较慢

UDP (User Datagram Protocol):
- 无连接协议
- 不可靠传输 (可能丢包、乱序)
- 没有流量控制和拥塞控制
- 适合实时应用 (视频、音频、游戏)
- 速度快

选择建议:
- 需要可靠性: 使用 TCP
- 需要速度、容忍丢包: 使用 UDP
- 文件传输: TCP
- 视频直播: UDP
- 在线游戏: UDP
"""

Socket 高级

非阻塞 Socket

import socket
import select

def non_blocking_client(host, port):
"""非阻塞客户端"""
# 创建 socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 设置为非阻塞模式
sock.setblocking(False)

try:
# 连接服务器 (会抛出 BlockingIOError)
try:
sock.connect((host, port))
except BlockingIOError:
pass

# 等待连接完成
ready = select.select([], [sock], [], 5)[0]
if sock in ready:
print("连接成功")

# 发送数据
message = "Hello!"
sock.send(message.encode('utf-8'))

# 接收数据
ready = select.select([sock], [], [], 5)[0]
if sock in ready:
data = sock.recv(1024)
print(f"收到: {data.decode('utf-8')}")

except Exception as e:
print(f"错误: {e}")
finally:
sock.close()

# 使用
# non_blocking_client('localhost', 8888)

Socket 超时设置

import socket

def timeout_client(host, port, timeout=5):
"""带超时的客户端"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 设置超时
sock.settimeout(timeout)

try:
sock.connect((host, port))
print("连接成功")

# 发送数据
sock.send(b"Hello!")

# 接收数据 (会应用超时)
data = sock.recv(1024)
print(f"收到: {data.decode('utf-8')}")

except socket.timeout:
print("操作超时")
except Exception as e:
print(f"错误: {e}")
finally:
sock.close()

多路复用

import socket
import select

def multiplexing_server(host='0.0.0.0', port=8888):
"""使用 select 的多路复用服务器"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)

# 设置为非阻塞
server_socket.setblocking(False)

# 输入列表 (需要读取的 socket)
inputs = [server_socket]

print(f"服务器启动在 {host}:{port}")

try:
while inputs:
# 使用 select 监听多个 socket
readable, _, exceptional = select.select(inputs, [], inputs, 1)

for sock in readable:
if sock is server_socket:
# 新连接
client_socket, client_address = sock.accept()
client_socket.setblocking(False)
inputs.append(client_socket)
print(f"新连接: {client_address}")
else:
# 客户端数据
data = sock.recv(1024)
if data:
print(f"收到: {data.decode('utf-8')}")
sock.send(b"OK")
else:
# 连接关闭
inputs.remove(sock)
sock.close()

for sock in exceptional:
inputs.remove(sock)
sock.close()

except KeyboardInterrupt:
print("\n服务器停止")
finally:
server_socket.close()

# 启动服务器
# multiplexing_server()

WebSocket

WebSocket 基础

"""
WebSocket 是一种全双工通信协议
- 建立在 TCP 之上
- 允许服务器主动推送消息
- 保持长连接,减少通信开销
- 适合实时应用

使用场景:
- 实时聊天
- 在线游戏
- 股票行情
- 协作编辑
"""

# 安装 websockets 库
# pip install websockets

import asyncio
import websockets

async def websocket_client(uri):
"""WebSocket 客户端"""
async with websockets.connect(uri) as websocket:
# 发送消息
await websocket.send("Hello, WebSocket!")

# 接收消息
response = await websocket.recv()
print(f"收到: {response}")

# 持续接收消息
while True:
try:
message = await asyncio.wait_for(
websocket.recv(),
timeout=1.0
)
print(f"收到: {message}")
except asyncio.TimeoutError:
# 可以在这里发送心跳包
await websocket.ping()

# 运行客户端
# asyncio.run(websocket_client('ws://localhost:8765'))

WebSocket 服务器

import asyncio
import websockets

async def echo_server(websocket, path):
"""Echo 服务器"""
print(f"新客户端连接: {websocket.remote_address}")

try:
async for message in websocket:
print(f"收到: {message}")

# 发送回复
await websocket.send(f"Echo: {message}")

except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")

async def websocket_server(host='localhost', port=8765):
"""WebSocket 服务器"""
async with websockets.serve(echo_server, host, port):
print(f"WebSocket 服务器启动在 {host}:{port}")
await asyncio.Future() # 永远运行

# 启动服务器
# asyncio.run(websocket_server())

广播消息

import asyncio
import websockets

# 连接池
connected_clients = set()

async def broadcast_server(websocket, path):
"""广播服务器"""
# 注册新客户端
connected_clients.add(websocket)
print(f"客户端连接: {websocket.remote_address}")

try:
async for message in websocket:
print(f"收到: {message}")

# 广播消息给所有客户端
for client in connected_clients:
if client != websocket:
await client.send(message)

except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")
finally:
# 移除断开的客户端
connected_clients.remove(websocket)

async def broadcast_websocket_server(host='localhost', port=8765):
"""广播 WebSocket 服务器"""
async with websockets.serve(broadcast_server, host, port):
print(f"广播服务器启动在 {host}:{port}")
await asyncio.Future()

# 启动服务器
# asyncio.run(broadcast_websocket_server())

API 开发

Flask 基础

安装和快速开始

# 安装 Flask
# pip install flask

from flask import Flask, jsonify, request

# 创建应用
app = Flask(__name__)

# 路由和视图函数
@app.route('/')
def hello():
return 'Hello, World!'

@app.route('/hello/<name>')
def hello_name(name):
return f'Hello, {name}!'

# 返回 JSON
@app.route('/api/data')
def get_data():
data = {
'name': 'Alice',
'age': 25,
'city': 'Beijing'
}
return jsonify(data)

# 处理不同的 HTTP 方法
@app.route('/api/user', methods=['GET', 'POST'])
def user():
if request.method == 'GET':
return jsonify({'message': 'GET user'})
elif request.method == 'POST':
# 获取 JSON 数据
data = request.get_json()
return jsonify({'message': 'User created', 'data': data})

# 获取查询参数
@app.route('/api/search')
def search():
keyword = request.args.get('keyword', '')
page = request.args.get('page', 1, type=int)

return jsonify({
'keyword': keyword,
'page': page
})

# 运行应用
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)

路由和请求处理

from flask import Flask, request, jsonify

app = Flask(__name__)

# 路由变量
@app.route('/user/<username>')
def show_user_profile(username):
return f'User: {username}'

@app.route('/post/<int:post_id>')
def show_post(post_id):
return f'Post: {post_id}'

@app.route('/path/<path:subpath>')
def show_subpath(subpath):
return f'Subpath: {subpath}'

# HTTP 方法
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
# 获取表单数据
username = request.form['username']
password = request.form['password']

# 获取 JSON 数据
# data = request.get_json()

return jsonify({'status': 'success'})

return '''
<form method="post">
<input type="text" name="username" placeholder="用户名">
<input type="password" name="password" placeholder="密码">
<button type="submit">登录</button>
</form>
'''

# 获取请求头
@app.route('/headers')
def headers():
user_agent = request.headers.get('User-Agent')
return f'Your browser: {user_agent}'

# 获取上传文件
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400

file = request.files['file']

if file.filename == '':
return jsonify({'error': 'No selected file'}), 400

if file:
filename = file.filename
file.save(f'/uploads/{filename}')
return jsonify({'message': f'File {filename} uploaded'})

# 响应处理
@app.route('/response')
def response():
# 返回 HTML
# return '<h1>Hello World</h1>'

# 返回 JSON
# return jsonify({'key': 'value'})

# 返回自定义状态码
return jsonify({'message': 'Not Found'}), 404

if __name__ == '__main__':
app.run(debug=True)

错误处理和中间件

from flask import Flask, jsonify, request

app = Flask(__name__)

# 错误处理
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Not found'}), 404

@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500

# 自定义错误
class InvalidUsage(Exception):
status_code = 400

def __init__(self, message, status_code=None, payload=None):
Exception.__init__(self)
self.message = message
if status_code is not None:
self.status_code = status_code
self.payload = payload

def to_dict(self):
rv = dict(self.payload or ())
rv['message'] = self.message
return rv

@app.errorhandler(InvalidUsage)
def handle_invalid_usage(error):
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response

# 使用自定义错误
@app.route('/api/<int:value>')
def api_endpoint(value):
if value < 0:
raise InvalidUsage('Value must be positive', status_code=400)

return jsonify({'value': value})

# 请求前钩子
@app.before_request
def before_request():
"""每个请求前执行"""
print(f'Before request: {request.method} {request.path}')

# 可以在这里做认证
# token = request.headers.get('Authorization')
# if not token:
# return jsonify({'error': 'Unauthorized'}), 401

# 请求后钩子
@app.after_request
def after_request(response):
"""每个请求后执行"""
print(f'After request: {response.status_code}')
# 添加响应头
response.headers['X-Custom-Header'] = 'Flask'
return response

# 拆卸请求
@app.teardown_request
def teardown_request(exception):
"""请求结束后执行"""
print(f'Teardown request: {exception}')

if __name__ == '__main__':
app.run(debug=True)

FastAPI 基础

安装和快速开始

# 安装 FastAPI
# pip install fastapi uvicorn

from fastapi import FastAPI
from fastapi.responses import JSONResponse

# 创建应用
app = FastAPI()

# 基本路由
@app.get('/')
def read_root():
return {'Hello': 'World'}

# 路径参数
@app.get('/items/{item_id}')
def read_item(item_id: int):
return {'item_id': item_id}

# 带类型的路径参数
@app.get('/users/{user_id}')
def read_user(user_id: int, q: str = None):
return {'user_id': user_id, 'q': q}

# 查询参数
@app.get('/items/')
def read_items(skip: int = 0, limit: int = 10):
return {'skip': skip, 'limit': limit}

# 多个路径和查询参数
@app.get('/users/{user_id}/items/{item_id}')
def read_user_item(
user_id: int,
item_id: str,
q: str = None,
short: bool = False
):
item = {'item_id': item_id, 'owner_id': user_id}
if q:
item.update({'q': q})
if not short:
item.update(
{'description': 'This is an amazing item'}
)
return item

# POST 请求
from pydantic import BaseModel

class Item(BaseModel):
name: str
price: float
is_offer: bool = None

@app.post('/items/')
def create_item(item: Item):
return {
'item_name': item.name,
'item_price': item.price
}

# 运行服务器
# uvicorn main:app --reload

请求体和验证

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field

app = FastAPI()

class Item(BaseModel):
name: str
description: str = None
price: float = Field(..., gt=0, description="价格必须大于0")
tax: float = None
tags: list = []

class User(BaseModel):
username: str
full_name: str = None

# 请求体 + 路径参数 + 查询参数
@app.put('/items/{item_id}')
def update_item(
item_id: int,
item: Item,
user: User,
q: str = None
):
results = {'item_id': item_id, 'item': item, 'user': user}
if q:
results.update({'q': q})
return results

# 请求体验证
@app.post('/items/')
async def create_item(item: Item):
# Pydantic 会自动验证
if item.price < 0:
raise HTTPException(
status_code=400,
detail="价格必须为正数"
)

return {
'name': item.name,
'price': item.price,
'description': item.description
}

# 多个请求体
@app.post('/items/multiple/')
async def create_multiple_items(
item: Item,
user: User,
importance: int = Body(..., gt=0, le=10)
):
results = {
'item': item,
'user': user,
'importance': importance
}
return results

# 嵌入请求体
class ItemIn(BaseModel):
name: str
description: str
price: float

class ItemOut(BaseModel):
name: str
price: float
tax: float = 10.5

@app.post('/items/embed/', response_model=ItemOut)
async def create_item_embed(item: ItemIn):
# 计算税费
item_dict = item.dict()
if 'tax' not in item_dict:
item_dict.update({'tax': item.price * 0.1})

return item_dict

异常处理

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse

app = FastAPI()

# 抛出 HTTP 异常
@app.get('/items/{item_id}')
def read_item(item_id: int):
if item_id == 0:
raise HTTPException(
status_code=404,
detail='Item not found',
headers={'X-Error': 'There goes my error'}
)

return {'item_id': item_id}

# 自定义异常处理器
class UnicornException(Exception):
def __init__(self, name: str):
self.name = name

@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
return JSONResponse(
status_code=418,
content={'message': f'Oops! {exc.name} did something.'}
)

# 使用自定义异常
@app.get('/unicorns/{name}')
def read_unicorn(name: str):
if name == 'yolo':
raise UnicornException(name=name)

return {'unicorn_name': name}

# 全局异常处理器
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
return JSONResponse(
status_code=400,
content={'error': str(exc)}
)

# 覆盖默认异常处理器
@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={
'custom_error': True,
'detail': exc.detail
}
)

网络爬虫

requests + BeautifulSoup

基础爬虫

# 安装依赖
# pip install requests beautifulsoup4 lxml

import requests
from bs4 import BeautifulSoup

# 获取网页内容
url = 'https://example.com'

response = requests.get(url)
response.encoding = 'utf-8' # 设置编码

# 解析 HTML
soup = BeautifulSoup(response.text, 'lxml')

# 获取标题
title = soup.title.string
print(f"网页标题: {title}")

# 查找元素
# 通过 ID 查找
element = soup.find(id='content')

# 通过类名查找
elements = soup.find_all(class_='article')

# 通过标签查找
links = soup.find_all('a')

for link in links:
href = link.get('href')
text = link.string
print(f"链接: {href} - {text}")

# CSS 选择器
elements = soup.select('div.container > ul li')
for element in elements:
print(element.text)

# 提取文本和属性
for link in soup.select('a'):
href = link.get('href')
text = link.get_text(strip=True)
print(f"{text}: {href}")

实战示例

import requests
from bs4 import BeautifulSoup
import time

def scrape_news():
"""爬取新闻标题"""
url = 'https://news.example.com'

# 设置请求头模拟浏览器
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}

response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'lxml')

# 查找新闻列表
news_items = soup.select('.news-item')

for item in news_items:
title = item.select_one('.title').get_text(strip=True)
link = item.select_one('a')['href']
summary = item.select_one('.summary').get_text(strip=True)

print(f"标题: {title}")
print(f"链接: {link}")
print(f"摘要: {summary}")
print("-" * 50)

# 使用 Session 保持连接
def scrape_with_session(urls):
"""使用 Session 爬取多个页面"""
session = requests.Session()
session.headers.update({
'User-Agent': 'My Bot/1.0'
})

for url in urls:
try:
response = session.get(url, timeout=10)
soup = BeautifulSoup(response.text, 'lxml')

# 处理数据
title = soup.title.string
print(f"页面标题: {title}")

# 礼貌延迟
time.sleep(1)

except Exception as e:
print(f"爬取 {url} 失败: {e}")

session.close()

# 处理分页
def scrape_pages(base_url, pages):
"""爬取多个分页"""
for page in range(1, pages + 1):
url = f"{base_url}?page={page}"
print(f"爬取第 {page} 页: {url}")

response = requests.get(url)
soup = BeautifulSoup(response.text, 'lxml')

# 提取当前页数据
items = soup.select('.item')
for item in items:
title = item.get_text(strip=True)
print(f" - {title}")

time.sleep(1)

# 处理表单和登录
def login_and_scrape():
"""登录后爬取数据"""
session = requests.Session()

# 获取登录页面
login_page = session.get('https://example.com/login')
soup = BeautifulSoup(login_page.text, 'lxml')

# 提取 CSRF token
csrf_token = soup.find('input', {'name': 'csrf_token'})['value']

# 提交登录表单
login_data = {
'username': 'your_username',
'password': 'your_password',
'csrf_token': csrf_token
}

session.post('https://example.com/login', data=login_data)

# 爬取需要登录的页面
response = session.get('https://example.com/protected')
print(response.text)

Scrapy 框架

Scrapy 基础

# 安装 Scrapy
# pip install scrapy

# 创建项目
# scrapy startproject myproject

# 定义 Item
import scrapy

class ArticleItem(scrapy.Item):
"""文章 Item"""
title = scrapy.Field()
link = scrapy.Field()
summary = scrapy.Field()

# 编写 Spider
class ArticleSpider(scrapy.Spider):
"""文章爬虫"""
name = 'articles'
allowed_domains = ['example.com']
start_urls = [
'https://example.com/articles'
]

def parse(self, response):
"""解析响应"""
# 提取文章列表
for article in response.css('.article'):
item = ArticleItem()

item['title'] = article.css('.title::text').get()
item['link'] = article.css('a::attr(href)').get()
item['summary'] = article.css('.summary::text').get()

yield item

# 提取下一页链接
next_page = response.css('a.next::attr(href)').get()
if next_page is not None:
yield response.follow(next_page, callback=self.parse)

# 运行爬虫
# scrapy crawl articles

Scrapy 高级

import scrapy
from scrapy.http import Request
from scrapy.exceptions import CloseSpider

class AdvancedSpider(scrapy.Spider):
"""高级爬虫示例"""
name = 'advanced'

def __init__(self, max_pages=5, *args, **kwargs):
super(AdvancedSpider, self).__init__(*args, **kwargs)
self.max_pages = int(max_pages)
self.page_count = 0

def start_requests(self):
"""自定义起始请求"""
urls = [
'https://example.com/page1',
'https://example.com/page2'
]

for url in urls:
yield Request(url, callback=self.parse, meta={'proxy': None})

def parse(self, response):
"""解析响应"""
self.page_count += 1

if self.page_count > self.max_pages:
raise CloseSpider('Reached maximum page limit')

# 提取数据
for item in response.css('.item'):
yield {
'title': item.css('.title::text').get(),
'url': item.css('a::attr(href)').get(),
}

# 提取链接并跟踪
for href in response.css('a::attr(href)'):
url = response.urljoin(href)
yield Request(url, callback=self.parse_detail)

def parse_detail(self, response):
"""解析详情页"""
yield {
'detail_url': response.url,
'content': response.css('.content::text').getall()
}

# 中间件
class ProxyMiddleware:
"""代理中间件"""

def __init__(self, proxy_url):
self.proxy_url = proxy_url

@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_url=crawler.settings.get('PROXY_URL')
)

def process_request(self, request, spider):
"""处理请求"""
request.meta['proxy'] = self.proxy_url

def process_exception(self, request, exception, spider):
"""处理异常"""
if 'proxy' in request.meta:
proxy = request.meta['proxy']
spider.logger.error(f'Proxy error: {proxy}')

# Pipelines
class DataPipeline:
"""数据处理管道"""

def __init__(self):
self.items = []

def process_item(self, item, spider):
"""处理每个 item"""
# 数据清洗
item['title'] = item['title'].strip()

# 数据验证
if not item['title']:
raise DropItem(f'Missing title in {item}')

# 存储数据
self.items.append(item)

return item

def close_spider(self, spider):
"""爬虫关闭时保存数据"""
import json

with open('output.json', 'w') as f:
json.dump(self.items, f)

# settings.py 配置
"""
BOT_NAME = 'myproject'

SPIDER_MODULES = ['myproject.spiders']
NEWSPIDER_MODULE = 'myproject.spiders'

# 遵守 robots.txt
ROBOTSTXT_OBEY = True

# 下载延迟
DOWNLOAD_DELAY = 1

# 并发请求数
CONCURRENT_REQUESTS = 16

# User-Agent
USER_AGENT = 'My Bot (+http://www.yourdomain.com)'

# 启用中间件
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.ProxyMiddleware': 400,
}

# 启用 Pipeline
ITEM_PIPELINES = {
'myproject.pipelines.DataPipeline': 300,
}
"""

小结

本章节介绍了 Python 的网络编程:

HTTP 编程

  • requests 库: 安装、基础使用、GET/POST 请求
  • 请求头和响应头: 设置请求头、读取响应头
  • Cookie 处理: Cookie 的读取、设置和管理
  • Session 管理: 使用 Session 保持连接、自动管理 cookies
  • 高级功能: 异常处理、代理设置、SSL 证书验证

Socket 编程

  • Socket 基础: Socket 概念、TCP/UDP 区别
  • TCP Socket: TCP 客户端和服务器实现
  • UDP Socket: UDP 客户端和服务器实现
  • Socket 高级: 非阻塞 Socket、超时设置、多路复用

WebSocket

  • WebSocket 基础: WebSocket 概念和使用场景
  • WebSocket 客户端: 使用 websockets 库连接服务器
  • WebSocket 服务器: 实现 Echo 服务器和广播服务器

API 开发

  • Flask 基础: 路由、请求处理、返回 JSON
  • Flask 高级: 错误处理、中间件、请求钩子
  • FastAPI 基础: 路径参数、查询参数、请求体验证
  • FastAPI 高级: 异常处理、自定义异常处理器

网络爬虫

  • requests + BeautifulSoup: 基础爬虫、实战示例
  • Scrapy 框架: Spider 编写、Item 定义、中间件、Pipeline

掌握 Python 网络编程可以帮助你构建各种网络应用,从简单的 HTTP 请求到复杂的分布式系统。合理选择合适的网络编程技术和框架,可以大大提高开发效率和程序性能。