Flask
Flask 是一个轻量级的 Python Web 框架,被称为微框架。它提供了核心功能,同时保持简单和灵活,允许开发者根据需求选择各种扩展。
简介
Flask 特性
Flask 核心特性:
- 轻量级:核心代码简洁,易于理解
- 灵活性:开发者可以选择数据库、模板引擎等组件
- 可扩展性:丰富的第三方扩展
- 内置开发服务器和调试器
- 集成单元测试支持
- RESTful 请求分发
- Jinja2 模板引擎
- WebSocket 支持
适用场景:
- 中小型 Web 应用
- RESTful API 服务
- 微服务架构
- 快速原型开发
安装 Flask
# 创建虚拟环境
python -m venv venv
# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate
# 安装 Flask
pip install flask
# 安装常用扩展
pip install flask-sqlalchemy # ORM
pip install flask-migrate # 数据库迁移
pip install flask-login # 用户认证
pip install flask-wtf # 表单验证
pip install flask-cors # 跨域支持
# 查看 Flask 版本
python -c "import flask; print(flask.__version__)"
快速开始
第一个 Flask 应用
from flask import Flask
# 创建应用实例
app = Flask(__name__)
# 路由装饰器
@app.route('/')
def hello_world():
return 'Hello, World!'
# 运行应用
if __name__ == '__main__':
app.run(debug=True)
应用配置
from flask import Flask
app = Flask(__name__)
# 方式1:直接配置
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SESSION_COOKIE_NAME'] = 'session'
# 方式2:从配置对象加载
class Config:
DEBUG = True
SECRET_KEY = 'your-secret-key'
DATABASE_URI = 'sqlite:///site.db'
app.config.from_object(Config)
# 方式3:从配置文件加载
# config.py
# DEBUG = True
# SECRET_KEY = 'your-secret-key'
app.config.from_pyfile('config.py')
# 方式4:从环境变量加载
import os
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'default-key')
# 查看所有配置
print(app.config)
路由
基本路由
from flask import Flask
app = Flask(__name__)
# 最简单的路由
@app.route('/')
def index():
return '首页'
# 多个路由指向同一函数
@app.route('/hello')
@app.route('/hi')
def greet():
return '你好!'
# 路由参数
@app.route('/user/<username>')
def show_user_profile(username):
return f'用户: {username}'
# 指定参数类型
@app.route('/post/<int:post_id>')
def show_post(post_id):
return f'文章ID: {post_id}'
# 多个参数
@app.route('/user/<username>/post/<int:post_id>')
def show_user_post(username, post_id):
return f'{username} 的文章 {post_id}'
# 支持的类型
@app.route('/path/<path:subpath>')
def show_subpath(subpath):
return f'子路径: {subpath}'
@app.route('/float/<float:amount>')
def show_amount(amount):
return f'金额: {amount}'
if __name__ == '__main__':
app.run()
HTTP 方法
from flask import Flask, request
app = Flask(__name__)
# 默认只允许 GET
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
return '登录请求'
else:
return '登录页面'
# 分别处理不同方法
@app.route('/api/data', methods=['GET'])
def get_data():
return {'data': '获取数据'}
@app.route('/api/data', methods=['POST'])
def create_data():
return {'message': '创建数据'}
@app.route('/api/data', methods=['PUT'])
def update_data():
return {'message': '更新数据'}
@app.route('/api/data', methods=['DELETE'])
def delete_data():
return {'message': '删除数据'}
路由构建
from flask import Flask, url_for
app = Flask(__name__)
@app.route('/')
def index():
return '首页'
@app.route('/user/<username>')
def profile(username):
return f'用户: {username}'
# 使用 url_for 反向生成 URL
with app.test_request_context():
print(url_for('index')) # /
print(url_for('profile', username='john')) # /user/john
print(url_for('profile', username='john', page=2)) # /user/john?page=2
# 在模板中使用
# <a href="{{ url_for('index') }}">首页</a>
请求处理
Request 对象
from flask import Flask, request
app = Flask(__name__)
@app.route('/request-info')
def request_info():
# 请求方法
method = request.method
# 请求 URL
url = request.url
base_url = request.base_url
path = request.path
# 请求头
user_agent = request.headers.get('User-Agent')
accept_language = request.headers.get('Accept-Language')
# 请求参数
# GET 参数: ?name=John&age=30
name = request.args.get('name')
age = request.args.get('age', type=int)
all_args = request.args.to_dict()
# POST 数据
# 表单数据
form_name = request.form.get('name')
all_form = request.form.to_dict()
# JSON 数据
json_data = request.get_json()
json_name = json_data.get('name') if json_data else None
# 文件
file = request.files.get('file')
# Cookies
session_id = request.cookies.get('session_id')
# IP 地址
ip = request.remote_addr
return {
'method': method,
'url': url,
'name': name,
'ip': ip
}
表单处理
from flask import Flask, request, render_template_string, redirect, url_for
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret-key'
# 简单的表单处理
@app.route('/form', methods=['GET', 'POST'])
def form():
if request.method == 'POST':
# 获取表单数据
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
# 处理数据...
return f'接收到的数据: {username}, {email}'
# GET 请求显示表单
return '''
<form method="POST">
<input type="text" name="username" placeholder="用户名" required>
<input type="email" name="email" placeholder="邮箱" required>
<input type="password" name="password" placeholder="密码" required>
<button type="submit">提交</button>
</form>
'''
# 使用 WTForm 验证
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField
from wtforms.validators import DataRequired, Email, Length
class LoginForm(FlaskForm):
username = StringField('用户名', validators=[DataRequired(), Length(min=4, max=20)])
email = StringField('邮箱', validators=[DataRequired(), Email()])
password = PasswordField('密码', validators=[DataRequired(), Length(min=6)])
submit = SubmitField('登录')
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
username = form.username.data
email = form.email.data
password = form.password.data
# 验证逻辑...
return f'登录成功: {username}'
return render_template_string('''
<form method="POST">
{{ form.hidden_tag() }}
{{ form.username.label }} {{ form.username() }}<br>
{{ form.email.label }} {{ form.email() }}<br>
{{ form.password.label }} {{ form.password() }}<br>
{{ form.submit() }}
</form>
''', form=form)
文件上传
from flask import Flask, request, render_template_string
import os
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# 允许的文件类型
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
# 检查是否有文件
if 'file' not in request.files:
return '没有文件'
file = request.files['file']
# 检查文件名
if file.filename == '':
return '未选择文件'
# 验证文件类型
if file and allowed_file(file.filename):
# 安全的文件名
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
return f'文件已保存: {filename}'
return '''
<form method="POST" enctype="multipart/form-data">
<input type="file" name="file" required>
<button type="submit">上传</button>
</form>
'''
# 多文件上传
@app.route('/multi-upload', methods=['POST'])
def multi_upload():
files = request.files.getlist('files')
saved_files = []
for file in files:
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
saved_files.append(filename)
return f'已上传: {", ".join(saved_files)}'
响应
返回不同类型响应
from flask import Flask, render_template, jsonify, redirect, url_for
from flask import make_response
app = Flask(__name__)
# HTML 响应
@app.route('/html')
def html_response():
html_content = '<h1>Hello, World!</h1>'
return html_content
# JSON 响应
@app.route('/json')
def json_response():
data = {
'name': 'John',
'age': 30,
'city': 'New York'
}
return jsonify(data)
# 或使用字典
@app.route('/json2')
def json_response2():
return {
'message': 'Success',
'data': [1, 2, 3]
}
# 元组形式: (response, status)
@app.route('/not-found')
def not_found():
return 'Not Found', 404
# 元组形式: (response, status, headers)
@app.route('/custom')
def custom_response():
return 'Custom Response', 200, {'X-Custom-Header': 'value'}
# 使用 make_response
@app.route('/advanced')
def advanced_response():
response = make_response('Advanced Response')
response.status_code = 200
response.headers['X-Custom'] = 'Custom Header'
response.set_cookie('key', 'value')
return response
# 重定向
@app.route('/redirect')
def redirect_example():
return redirect(url_for('index'))
@app.route('/redirect-external')
def redirect_external():
return redirect('https://www.example.com')
# 返回模板
@app.route('/template')
def template_response():
return render_template('index.html', title='首页')
模板渲染
from flask import Flask, render_template
app = Flask(__name__)
# 传递变量
@app.route('/user/<name>')
def user_profile(name):
return render_template('user.html', username=name, age=25)
# 传递列表和字典
@app.route('/data')
def data_view():
users = [
{'name': 'Alice', 'age': 25},
{'name': 'Bob', 'age': 30},
{'name': 'Charlie', 'age': 35}
]
settings = {'title': '用户列表', 'show_age': True}
return render_template('data.html', users=users, settings=settings)
模板
Jinja2 语法
<!-- templates/base.html -->
<!DOCTYPE html>
<html>
<head>
<title>{% block title %}{% endblock %}</title>
</head>
<body>
<nav>
<a href="{{ url_for('index') }}">首页</a>
<a href="{{ url_for('about') }}">关于</a>
</nav>
<main>
{% block content %}{% endblock %}
</main>
</body>
</html>
<!-- templates/index.html -->
{% extends "base.html" %}
{% block title %}首页 - 我的网站{% endblock %}
{% block content %}
<h1>欢迎来到首页</h1>
<!-- 变量 -->
<p>用户名: {{ username }}</p>
<!-- 条件语句 -->
{% if user.is_admin %}
<p>管理员用户</p>
{% elif user.is_moderator %}
<p>版主用户</p>
{% else %}
<p>普通用户</p>
{% endif %}
<!-- 循环 -->
<ul>
{% for item in items %}
<li>{{ item.name }} - {{ item.price }}</li>
{% endfor %}
</ul>
<!-- 过滤器 -->
<p>{{ text|upper }}</p> <!-- 转大写 -->
<p>{{ text|lower }}</p> <!-- 转小写 -->
<p>{{ text|capitalize }}</p> <!-- 首字母大写 -->
<p="{{ text|truncate(50) }}</p> <!-- 截断 -->
<p>{{ price|float(2) }}</p> <!-- 保留两位小数 -->
<p="{{ date|strftime('%Y-%m-%d') }}</p> <!-- 日期格式化 -->
<!-- 自定义过滤器 -->
<p>{{ text|custom_filter }}</p>
<!-- 静态文件 -->
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
<img src="{{ url_for('static', filename='logo.png') }}">
{% endblock %}
模板继承
from flask import Flask, render_template
app = Flask(__name__)
# 基础模板
# templates/base.html
"""
<!DOCTYPE html>
<html>
<head>
{% block head %}
<title>{% block title %}{% endblock %}</title>
{% endblock %}
</head>
<body>
{% block header %}{% endblock %}
<main>
{% block content %}{% endblock %}
</main>
{% block footer %}{% endblock %}
</body>
</html>
"""
# 继承模板
# templates/index.html
"""
{% extends "base.html" %}
{% block title %}首页{% endblock %}
{% block header %}
<header>
<h1>欢迎</h1>
</header>
{% endblock %}
{% block content %}
<p>这是首页内容</p>
{% endblock %}
"""
@app.route('/')
def index():
return render_template('index.html')
自定义过滤器
from flask import Flask
app = Flask(__name__)
# 方式1:直接注册过滤器
@app.template_filter('reverse')
def reverse_filter(s):
return s[::-1]
@app.template_filter('format_price')
def format_price(price):
return f'¥{price:.2f}'
# 方式2:在模板中使用
"""
{{ text|reverse }}
{{ price|format_price }}
"""
# 方式3:全局函数
@app.template_global()
def get_current_year():
from datetime import datetime
return datetime.now().year
# 在模板中使用: {{ get_current_year() }}
# 方式4:上下文处理器
@app.context_processor
def inject_variables():
return {
'app_name': 'My App',
'version': '1.0.0'
}
# 在模板中可以直接使用: {{ app_name }}
数据库集成
Flask-SQLAlchemy 配置
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 配置数据库
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True # 打印 SQL 语句
# 创建数据库实例
db = SQLAlchemy(app)
# 定义模型
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password = db.Column(db.String(200), nullable=False)
created_at = db.Column(db.DateTime, server_default=db.func.now())
# 关系
posts = db.relationship('Post', backref='author', lazy=True)
def __repr__(self):
return f'<User {self.username}>'
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
created_at = db.Column(db.DateTime, server_default=db.func.now())
def __repr__(self):
return f'<Post {self.title}>'
# 创建数据库表
with app.app_context():
db.create_all()
CRUD 操作
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
# Create: 创建
@app.route('/user/create', methods=['POST'])
def create_user():
data = request.get_json()
username = data.get('username')
email = data.get('email')
# 方式1
user = User(username=username, email=email)
db.session.add(user)
db.session.commit()
# 方式2
# new_user = User(**data)
# db.session.add(new_user)
# db.session.commit()
return jsonify({'message': '用户创建成功', 'id': user.id})
# Read: 查询
@app.route('/user/<int:user_id>')
def get_user(user_id):
# 通过主键查询
user = User.query.get(user_id)
if not user:
return {'error': '用户不存在'}, 404
return {
'id': user.id,
'username': user.username,
'email': user.email
}
@app.route('/users')
def get_users():
# 查询所有
users = User.query.all()
# 条件查询
# users = User.query.filter_by(username='john').all()
# 排序
# users = User.query.order_by(User.username.desc()).all()
# 限制
# users = User.query.limit(10).all()
return [{
'id': user.id,
'username': user.username,
'email': user.email
} for user in users]
# Update: 更新
@app.route('/user/<int:user_id>', methods=['PUT'])
def update_user(user_id):
user = User.query.get(user_id)
if not user:
return {'error': '用户不存在'}, 404
data = request.get_json()
# 方式1
if 'username' in data:
user.username = data['username']
if 'email' in data:
user.email = data['email']
db.session.commit()
return {'message': '用户更新成功'}
# Delete: 删除
@app.route('/user/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
user = User.query.get(user_id)
if not user:
return {'error': '用户不存在'}, 404
db.session.delete(user)
db.session.commit()
return {'message': '用户删除成功'}
# 复杂查询
@app.route('/search')
def search_users():
keyword = request.args.get('q', '')
# 模糊查询
users = User.query.filter(User.username.like(f'%{keyword}%')).all()
# 或查询
# from sqlalchemy import or_
# users = User.query.filter(
# or_(
# User.username.like(f'%{keyword}%'),
# User.email.like(f'%{keyword}%')
# )
# ).all()
return [{
'id': user.id,
'username': user.username
} for user in users]
数据库迁移
# 使用 Flask-Migrate 管理数据库迁移
# 安装
# pip install flask-migrate
# 配置
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
migrate = Migrate(app, db)
# 定义模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
# 初始化迁移
# 在命令行执行:
# flask db init
# 创建迁移
# flask db migrate -m "Initial migration"
# 应用迁移
# flask db upgrade
# 回滚
# flask db downgrade
用户认证
Flask-Login
from flask import Flask, render_template, redirect, url_for, request, flash
from flask_login import LoginManager, UserMixin, login_user, logout_user
from flask_login import login_required, current_user
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# 用户模型
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(200), nullable=False)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# 注册
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
# 检查用户是否存在
if User.query.filter_by(username=username).first():
flash('用户名已存在')
return redirect(url_for('register'))
# 创建用户
user = User(username=username, email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash('注册成功')
return redirect(url_for('login'))
return render_template('register.html')
# 登录
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
flash('登录成功')
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
else:
flash('用户名或密码错误')
return render_template('login.html')
# 登出
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('已登出')
return redirect(url_for('index'))
# 受保护的路由
@app.route('/protected')
@login_required
def protected():
return f'你好, {current_user.username}!'
# 检查角色
@app.route('/admin')
@login_required
def admin():
if not current_user.is_admin:
flash('需要管理员权限')
return redirect(url_for('index'))
return '管理员页面'
Session 管理
from flask import Flask, session, redirect, url_for
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
# 设置 Session
@app.route('/set-session')
def set_session():
session['username'] = 'john'
session['user_id'] = 123
session.permanent = True # 持久化 Session
return 'Session 已设置'
# 获取 Session
@app.route('/get-session')
def get_session():
username = session.get('username')
user_id = session.get('user_id')
return f'用户: {username}, ID: {user_id}'
# 删除 Session
@app.route('/delete-session')
def delete_session():
session.pop('username', None)
session.pop('user_id', None)
return 'Session 已删除'
# 清除所有 Session
@app.route('/clear-session')
def clear_session():
session.clear()
return '所有 Session 已清除'
# Flash 消息
from flask import flash
@app.route('/flash-message')
def flash_message():
flash('这是一条消息')
flash('这是警告', 'warning')
flash('这是错误', 'error')
return redirect(url_for('index'))
# 在模板中显示 Flash 消息
"""
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category }}">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}
"""
RESTful API
API 设计
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///api.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
migrate = Migrate(app, db)
class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text)
completed = db.Column(db.Boolean, default=False)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
def to_dict(self):
return {
'id': self.id,
'title': self.title,
'description': self.description,
'completed': self.completed
}
# API: 获取所有任务
@app.route('/api/tasks', methods=['GET'])
def get_tasks():
tasks = Task.query.all()
return jsonify([task.to_dict() for task in tasks])
# API: 获取单个任务
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
def get_task(task_id):
task = Task.query.get_or_404(task_id)
return jsonify(task.to_dict())
# API: 创建任务
@app.route('/api/tasks', methods=['POST'])
def create_task():
data = request.get_json()
if not data or not data.get('title'):
return jsonify({'error': '标题不能为空'}), 400
task = Task(
title=data['title'],
description=data.get('description', '')
)
db.session.add(task)
db.session.commit()
return jsonify(task.to_dict()), 201
# API: 更新任务
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
def update_task(task_id):
task = Task.query.get_or_404(task_id)
data = request.get_json()
task.title = data.get('title', task.title)
task.description = data.get('description', task.description)
task.completed = data.get('completed', task.completed)
db.session.commit()
return jsonify(task.to_dict())
# API: 删除任务
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
def delete_task(task_id):
task = Task.query.get_or_404(task_id)
db.session.delete(task)
db.session.commit()
return '', 204
# API: 分页
@app.route('/api/tasks/page', methods=['GET'])
def get_tasks_page():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
pagination = Task.query.paginate(
page=page,
per_page=per_page,
error_out=False
)
return jsonify({
'tasks': [task.to_dict() for task in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
})
请求验证
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
# 验证装饰器
def validate_json(*fields):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
data = request.get_json()
if not data:
return jsonify({'error': '请求体必须为 JSON'}), 400
# 检查必需字段
missing_fields = [field for field in fields if field not in data]
if missing_fields:
return jsonify({
'error': f'缺少必需字段: {", ".join(missing_fields)}'
}), 400
return f(*args, **kwargs)
return wrapper
return decorator
# API Key 验证
def require_api_key(f):
@wraps(f)
def wrapper(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key or api_key != 'your-secret-api-key':
return jsonify({'error': '无效的 API Key'}), 401
return f(*args, **kwargs)
return wrapper
# 使用验证装饰器
@app.route('/api/users', methods=['POST'])
@validate_json('username', 'email')
@require_api_key
def create_user():
data = request.get_json()
# 处理逻辑...
return jsonify({'message': '用户创建成功'})
# 使用 Marshmallow 验证
from marshmallow import Schema, fields, validate, ValidationError
class UserSchema(Schema):
username = fields.Str(required=True, validate=validate.Length(min=4, max=20))
email = fields.Email(required=True)
age = fields.Int(validate=validate.Range(min=18, max=120))
@app.route('/api/users/validate', methods=['POST'])
def create_user_validated():
try:
data = UserSchema().load(request.get_json())
# 处理验证通过的数据
return jsonify(data)
except ValidationError as err:
return jsonify(err.messages), 400
错误处理
from flask import Flask, jsonify
from werkzeug.exceptions import HTTPException
app = Flask(__name__)
# 自定义异常
class InvalidUsage(Exception):
status_code = 400
def __init__(self, message, status_code=None, payload=None):
super().__init__()
self.message = message
if status_code is not None:
self.status_code = status_code
self.payload = payload
def to_dict(self):
rv = dict(self.payload or ())
rv['message'] = self.message
return rv
# 异常处理器
@app.errorhandler(InvalidUsage)
def handle_invalid_usage(error):
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response
# 404 处理
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': '资源未找到'}), 404
# 500 处理
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': '服务器内部错误'}), 500
# 所有 HTTP 异常
@app.errorhandler(HTTPException)
def handle_http_exception(error):
return jsonify({
'error': error.name,
'description': error.description
}), error.code
# 使用自定义异常
@app.route('/api/test')
def test():
raise InvalidUsage('这是一个错误消息', status_code=410)
# API 标准响应格式
def api_response(data=None, message='Success', code=200):
return jsonify({
'code': code,
'message': message,
'data': data
}), code
@app.route('/api/data')
def api_data():
return api_response(
data={'name': 'John', 'age': 30},
message='获取数据成功'
)
项目结构
推荐的项目结构
myproject/
├── app/
│ ├── __init__.py # 应用工厂
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── post.py
│ ├── routes/ # 路由
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ └── api.py
│ ├── templates/ # 模板
│ │ ├── base.html
│ │ ├── index.html
│ │ └── auth/
│ │ ├── login.html
│ │ └── register.html
│ ├── static/ # 静态文件
│ │ ├── css/
│ │ ├── js/
│ │ └── images/
│ └── forms/ # 表单
│ ├── __init__.py
│ └── auth.py
├── migrations/ # 数据库迁移
├── tests/ # 测试
│ ├── __init__.py
│ ├── test_auth.py
│ └── test_api.py
├── config.py # 配置文件
├── requirements.txt # 依赖
└── run.py # 运行文件
应用工厂模式
# config.py
import os
class Config:
SECRET_KEY = os.getenv('SECRET_KEY') or 'dev-key'
SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URI') or 'sqlite:///site.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
class DevelopmentConfig(Config):
DEBUG = True
class ProductionConfig(Config):
DEBUG = False
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///test.db'
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig
}
# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
db = SQLAlchemy()
login_manager = LoginManager()
def create_app(config_name='development'):
app = Flask(__name__)
app.config.from_object(config[config_name])
# 初始化扩展
db.init_app(app)
login_manager.init_app(app)
login_manager.login_view = 'auth.login'
# 注册蓝图
from app.routes.auth import auth_bp
from app.routes.main import main_bp
from app.routes.api import api_bp
app.register_blueprint(auth_bp, url_prefix='/auth')
app.register_blueprint(main_bp)
app.register_blueprint(api_bp, url_prefix='/api')
# 创建数据库表
with app.app_context():
db.create_all()
return app
# app/routes/auth.py
from flask import Blueprint, render_template, redirect, url_for
from flask_login import login_required, current_user
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login')
def login():
return '登录页面'
@auth_bp.route('/logout')
@login_required
def logout():
return '登出'
# app/routes/main.py
from flask import Blueprint
main_bp = Blueprint('main', __name__)
@main_bp.route('/')
def index():
return '首页'
# app/routes/api.py
from flask import Blueprint
api_bp = Blueprint('api', __name__)
@api_bp.route('/data')
def get_data():
return {'data': []}
# run.py
from app import create_app
app = create_app('development')
if __name__ == '__main__':
app.run()
部署
使用 Gunicorn
# 安装 Gunicorn
pip install gunicorn
# 运行应用
gunicorn -w 4 -b 0.0.0.0:8000 "app:create_app()"
# 配置选项
# -w: 工作进程数 (通常为 CPU 核心数 * 2 + 1)
# -b: 绑定地址
# --timeout: 超时时间
# --access-logfile: 访问日志
# --error-logfile: 错误日志
gunicorn -w 4 -b 0.0.0.0:8000 \
--timeout 120 \
--access-logfile - \
--error-logfile - \
"app:create_app()"
使用 Nginx 反向代理
# /etc/nginx/sites-available/myproject
server {
listen 80;
server_name example.com;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /static {
alias /path/to/your/app/static;
}
location /media {
alias /path/to/your/app/media;
}
}
Docker 部署
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 运行应用
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "app:create_app()"]
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- DATABASE_URI=postgresql://user:pass@db:5432/mydb
depends_on:
- db
db:
image: postgres:13
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=mydb
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
最佳实践
安全建议
from flask import Flask, session
from flask_talisman import Talisman
from flask_cors import CORS
app = Flask(__name__)
# 1. 使用 HTTPS 强制
Talisman(app, force_https=True)
# 2. 设置安全的 Cookie
app.config.update(
SESSION_COOKIE_SECURE=True, # 仅 HTTPS
SESSION_COOKIE_HTTPONLY=True, # 防止 JavaScript 访问
SESSION_COOKIE_SAMESITE='Lax' # 防止 CSRF
)
# 3. CORS 配置
CORS(app, resources={
r"/api/*": {
"origins": ["https://example.com"],
"methods": ["GET", "POST"],
"allow_headers": ["Content-Type"]
}
})
# 4. 密码加密
from werkzeug.security import generate_password_hash, check_password_hash
def hash_password(password):
return generate_password_hash(password, method='pbkdf2:sha256')
def verify_password hashed_password, password):
return check_password_hash(hashed_password, password)
# 5. 防止 SQL 注入 (使用 ORM)
# 好的做法
user = User.query.filter_by(username=username).first()
# 不好的做法
# user = db.execute(f"SELECT * FROM users WHERE username = '{username}'")
# 6. 输入验证
from wtforms import StringField, validators
class SafeForm(FlaskForm):
username = StringField('用户名', [
validators.Length(min=4, max=20),
validators.Regexp('^[a-zA-Z0-9_]+$')
])
性能优化
from flask import Flask
from flask_caching import Cache
from functools import wraps
app = Flask(__name__)
# 1. 使用缓存
cache = Cache(app, config={
'CACHE_TYPE': 'redis',
'CACHE_REDIS_URL': 'redis://localhost:6379/0'
})
@app.route('/api/data')
@cache.cached(timeout=60) # 缓存 60 秒
def get_data():
# 耗时操作
return {'data': []}
# 2. 数据库查询优化
# 好的做法:使用 join 减少查询次数
users = User.query.options(
db.joinedload(User.posts)
).all()
# 不好的做法:N+1 查询
users = User.query.all()
for user in users:
posts = user.posts # 每次都查询数据库
# 3. 使用分页
@app.route('/users')
def list_users():
page = request.args.get('page', 1, type=int)
pagination = User.query.paginate(
page=page,
per_page=20
)
return render_template('users.html', users=pagination.items)
# 4. 压缩响应
from flask_compress import Compress
Compress(app)
# 5. 异步任务
from celery import Celery
celery = Celery(app.name, broker='redis://localhost:6379/0')
@celery.task
def send_email(to, subject):
# 发送邮件
pass
@app.route('/register')
def register():
send_email.delay('user@example.com', '欢迎')
return '注册成功'
测试
import pytest
from app import create_app, db
from app.models import User
@pytest.fixture
def app():
app = create_app('testing')
with app.app_context():
db.create_all()
yield app
db.session.remove()
db.drop_all()
@pytest.fixture
def client(app):
return app.test_client()
def test_index(client):
response = client.get('/')
assert response.status_code == 200
def test_register(client):
response = client.post('/register', data={
'username': 'testuser',
'email': 'test@example.com',
'password': 'password123'
})
assert response.status_code == 302 # 重定向
user = User.query.filter_by(username='testuser').first()
assert user is not None
总结
Flask 是一个灵活强大的 Web 框架:
核心概念
- 轻量级:核心简洁,按需扩展
- 路由系统:灵活的 URL 路由和参数处理
- 模板引擎:Jinja2 提供强大的模板功能
- 请求处理:完善的请求和响应处理
- 数据库集成:SQLAlchemy 提供 ORM 支持
- 用户认证:Flask-Login 管理用户会话
- RESTful API:轻松构建 API 服务
关键特性
- 应用工厂模式组织代码
- 蓝图(Blueprint)模块化路由
- 丰富的第三方扩展
- 完善的测试支持
- 多种部署方案
最佳实践
- 使用虚拟环境隔离依赖
- 应用工厂模式创建应用
- 蓝图组织路由和视图
- ORM 操作数据库防止 SQL 注入
- 使用表单验证库处理输入
- 实现完善的错误处理
- 编写单元测试保证质量
- 生产环境使用 Gunicorn + Nginx
Flask 适合快速开发中小型应用和 API,是学习 Web 开发的优秀框架。