跳到主要内容

Flask

Flask 是一个轻量级的 Python Web 框架,被称为微框架。它提供了核心功能,同时保持简单和灵活,允许开发者根据需求选择各种扩展。

简介

Flask 特性

Flask 核心特性:

  • 轻量级:核心代码简洁,易于理解
  • 灵活性:开发者可以选择数据库、模板引擎等组件
  • 可扩展性:丰富的第三方扩展
  • 内置开发服务器和调试器
  • 集成单元测试支持
  • RESTful 请求分发
  • Jinja2 模板引擎
  • WebSocket 支持

适用场景:

  • 中小型 Web 应用
  • RESTful API 服务
  • 微服务架构
  • 快速原型开发

安装 Flask

# 创建虚拟环境
python -m venv venv

# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate

# 安装 Flask
pip install flask

# 安装常用扩展
pip install flask-sqlalchemy # ORM
pip install flask-migrate # 数据库迁移
pip install flask-login # 用户认证
pip install flask-wtf # 表单验证
pip install flask-cors # 跨域支持

# 查看 Flask 版本
python -c "import flask; print(flask.__version__)"

快速开始

第一个 Flask 应用

from flask import Flask

# 创建应用实例
app = Flask(__name__)

# 路由装饰器
@app.route('/')
def hello_world():
return 'Hello, World!'

# 运行应用
if __name__ == '__main__':
app.run(debug=True)

应用配置

from flask import Flask

app = Flask(__name__)

# 方式1:直接配置
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SESSION_COOKIE_NAME'] = 'session'

# 方式2:从配置对象加载
class Config:
DEBUG = True
SECRET_KEY = 'your-secret-key'
DATABASE_URI = 'sqlite:///site.db'

app.config.from_object(Config)

# 方式3:从配置文件加载
# config.py
# DEBUG = True
# SECRET_KEY = 'your-secret-key'
app.config.from_pyfile('config.py')

# 方式4:从环境变量加载
import os
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'default-key')

# 查看所有配置
print(app.config)

路由

基本路由

from flask import Flask

app = Flask(__name__)

# 最简单的路由
@app.route('/')
def index():
return '首页'

# 多个路由指向同一函数
@app.route('/hello')
@app.route('/hi')
def greet():
return '你好!'

# 路由参数
@app.route('/user/<username>')
def show_user_profile(username):
return f'用户: {username}'

# 指定参数类型
@app.route('/post/<int:post_id>')
def show_post(post_id):
return f'文章ID: {post_id}'

# 多个参数
@app.route('/user/<username>/post/<int:post_id>')
def show_user_post(username, post_id):
return f'{username} 的文章 {post_id}'

# 支持的类型
@app.route('/path/<path:subpath>')
def show_subpath(subpath):
return f'子路径: {subpath}'

@app.route('/float/<float:amount>')
def show_amount(amount):
return f'金额: {amount}'

if __name__ == '__main__':
app.run()

HTTP 方法

from flask import Flask, request

app = Flask(__name__)

# 默认只允许 GET
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
return '登录请求'
else:
return '登录页面'

# 分别处理不同方法
@app.route('/api/data', methods=['GET'])
def get_data():
return {'data': '获取数据'}

@app.route('/api/data', methods=['POST'])
def create_data():
return {'message': '创建数据'}

@app.route('/api/data', methods=['PUT'])
def update_data():
return {'message': '更新数据'}

@app.route('/api/data', methods=['DELETE'])
def delete_data():
return {'message': '删除数据'}

路由构建

from flask import Flask, url_for

app = Flask(__name__)

@app.route('/')
def index():
return '首页'

@app.route('/user/<username>')
def profile(username):
return f'用户: {username}'

# 使用 url_for 反向生成 URL
with app.test_request_context():
print(url_for('index')) # /
print(url_for('profile', username='john')) # /user/john
print(url_for('profile', username='john', page=2)) # /user/john?page=2

# 在模板中使用
# <a href="{{ url_for('index') }}">首页</a>

请求处理

Request 对象

from flask import Flask, request

app = Flask(__name__)

@app.route('/request-info')
def request_info():
# 请求方法
method = request.method

# 请求 URL
url = request.url
base_url = request.base_url
path = request.path

# 请求头
user_agent = request.headers.get('User-Agent')
accept_language = request.headers.get('Accept-Language')

# 请求参数
# GET 参数: ?name=John&age=30
name = request.args.get('name')
age = request.args.get('age', type=int)
all_args = request.args.to_dict()

# POST 数据
# 表单数据
form_name = request.form.get('name')
all_form = request.form.to_dict()

# JSON 数据
json_data = request.get_json()
json_name = json_data.get('name') if json_data else None

# 文件
file = request.files.get('file')

# Cookies
session_id = request.cookies.get('session_id')

# IP 地址
ip = request.remote_addr

return {
'method': method,
'url': url,
'name': name,
'ip': ip
}

表单处理

from flask import Flask, request, render_template_string, redirect, url_for

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret-key'

# 简单的表单处理
@app.route('/form', methods=['GET', 'POST'])
def form():
if request.method == 'POST':
# 获取表单数据
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')

# 处理数据...
return f'接收到的数据: {username}, {email}'

# GET 请求显示表单
return '''
<form method="POST">
<input type="text" name="username" placeholder="用户名" required>
<input type="email" name="email" placeholder="邮箱" required>
<input type="password" name="password" placeholder="密码" required>
<button type="submit">提交</button>
</form>
'''

# 使用 WTForm 验证
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField
from wtforms.validators import DataRequired, Email, Length

class LoginForm(FlaskForm):
username = StringField('用户名', validators=[DataRequired(), Length(min=4, max=20)])
email = StringField('邮箱', validators=[DataRequired(), Email()])
password = PasswordField('密码', validators=[DataRequired(), Length(min=6)])
submit = SubmitField('登录')

@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()

if form.validate_on_submit():
username = form.username.data
email = form.email.data
password = form.password.data

# 验证逻辑...
return f'登录成功: {username}'

return render_template_string('''
<form method="POST">
{{ form.hidden_tag() }}
{{ form.username.label }} {{ form.username() }}<br>
{{ form.email.label }} {{ form.email() }}<br>
{{ form.password.label }} {{ form.password() }}<br>
{{ form.submit() }}
</form>
''', form=form)

文件上传

from flask import Flask, request, render_template_string
import os
from werkzeug.utils import secure_filename

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)

# 允许的文件类型
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'}

def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
# 检查是否有文件
if 'file' not in request.files:
return '没有文件'

file = request.files['file']

# 检查文件名
if file.filename == '':
return '未选择文件'

# 验证文件类型
if file and allowed_file(file.filename):
# 安全的文件名
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)

return f'文件已保存: {filename}'

return '''
<form method="POST" enctype="multipart/form-data">
<input type="file" name="file" required>
<button type="submit">上传</button>
</form>
'''

# 多文件上传
@app.route('/multi-upload', methods=['POST'])
def multi_upload():
files = request.files.getlist('files')
saved_files = []

for file in files:
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
saved_files.append(filename)

return f'已上传: {", ".join(saved_files)}'

响应

返回不同类型响应

from flask import Flask, render_template, jsonify, redirect, url_for
from flask import make_response

app = Flask(__name__)

# HTML 响应
@app.route('/html')
def html_response():
html_content = '<h1>Hello, World!</h1>'
return html_content

# JSON 响应
@app.route('/json')
def json_response():
data = {
'name': 'John',
'age': 30,
'city': 'New York'
}
return jsonify(data)

# 或使用字典
@app.route('/json2')
def json_response2():
return {
'message': 'Success',
'data': [1, 2, 3]
}

# 元组形式: (response, status)
@app.route('/not-found')
def not_found():
return 'Not Found', 404

# 元组形式: (response, status, headers)
@app.route('/custom')
def custom_response():
return 'Custom Response', 200, {'X-Custom-Header': 'value'}

# 使用 make_response
@app.route('/advanced')
def advanced_response():
response = make_response('Advanced Response')
response.status_code = 200
response.headers['X-Custom'] = 'Custom Header'
response.set_cookie('key', 'value')
return response

# 重定向
@app.route('/redirect')
def redirect_example():
return redirect(url_for('index'))

@app.route('/redirect-external')
def redirect_external():
return redirect('https://www.example.com')

# 返回模板
@app.route('/template')
def template_response():
return render_template('index.html', title='首页')

模板渲染

from flask import Flask, render_template

app = Flask(__name__)

# 传递变量
@app.route('/user/<name>')
def user_profile(name):
return render_template('user.html', username=name, age=25)

# 传递列表和字典
@app.route('/data')
def data_view():
users = [
{'name': 'Alice', 'age': 25},
{'name': 'Bob', 'age': 30},
{'name': 'Charlie', 'age': 35}
]
settings = {'title': '用户列表', 'show_age': True}
return render_template('data.html', users=users, settings=settings)

模板

Jinja2 语法

<!-- templates/base.html -->
<!DOCTYPE html>
<html>
<head>
<title>{% block title %}{% endblock %}</title>
</head>
<body>
<nav>
<a href="{{ url_for('index') }}">首页</a>
<a href="{{ url_for('about') }}">关于</a>
</nav>

<main>
{% block content %}{% endblock %}
</main>
</body>
</html>

<!-- templates/index.html -->
{% extends "base.html" %}

{% block title %}首页 - 我的网站{% endblock %}

{% block content %}
<h1>欢迎来到首页</h1>

<!-- 变量 -->
<p>用户名: {{ username }}</p>

<!-- 条件语句 -->
{% if user.is_admin %}
<p>管理员用户</p>
{% elif user.is_moderator %}
<p>版主用户</p>
{% else %}
<p>普通用户</p>
{% endif %}

<!-- 循环 -->
<ul>
{% for item in items %}
<li>{{ item.name }} - {{ item.price }}</li>
{% endfor %}
</ul>

<!-- 过滤器 -->
<p>{{ text|upper }}</p> <!-- 转大写 -->
<p>{{ text|lower }}</p> <!-- 转小写 -->
<p>{{ text|capitalize }}</p> <!-- 首字母大写 -->
<p="{{ text|truncate(50) }}</p> <!-- 截断 -->
<p>{{ price|float(2) }}</p> <!-- 保留两位小数 -->
<p="{{ date|strftime('%Y-%m-%d') }}</p> <!-- 日期格式化 -->

<!-- 自定义过滤器 -->
<p>{{ text|custom_filter }}</p>

<!-- 静态文件 -->
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
<img src="{{ url_for('static', filename='logo.png') }}">
{% endblock %}

模板继承

from flask import Flask, render_template

app = Flask(__name__)

# 基础模板
# templates/base.html
"""
<!DOCTYPE html>
<html>
<head>
{% block head %}
<title>{% block title %}{% endblock %}</title>
{% endblock %}
</head>
<body>
{% block header %}{% endblock %}

<main>
{% block content %}{% endblock %}
</main>

{% block footer %}{% endblock %}
</body>
</html>
"""

# 继承模板
# templates/index.html
"""
{% extends "base.html" %}

{% block title %}首页{% endblock %}

{% block header %}
<header>
<h1>欢迎</h1>
</header>
{% endblock %}

{% block content %}
<p>这是首页内容</p>
{% endblock %}
"""

@app.route('/')
def index():
return render_template('index.html')

自定义过滤器

from flask import Flask

app = Flask(__name__)

# 方式1:直接注册过滤器
@app.template_filter('reverse')
def reverse_filter(s):
return s[::-1]

@app.template_filter('format_price')
def format_price(price):
return f'¥{price:.2f}'

# 方式2:在模板中使用
"""
{{ text|reverse }}
{{ price|format_price }}
"""

# 方式3:全局函数
@app.template_global()
def get_current_year():
from datetime import datetime
return datetime.now().year

# 在模板中使用: {{ get_current_year() }}

# 方式4:上下文处理器
@app.context_processor
def inject_variables():
return {
'app_name': 'My App',
'version': '1.0.0'
}

# 在模板中可以直接使用: {{ app_name }}

数据库集成

Flask-SQLAlchemy 配置

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 配置数据库
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True # 打印 SQL 语句

# 创建数据库实例
db = SQLAlchemy(app)

# 定义模型
class User(db.Model):
__tablename__ = 'users'

id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password = db.Column(db.String(200), nullable=False)
created_at = db.Column(db.DateTime, server_default=db.func.now())

# 关系
posts = db.relationship('Post', backref='author', lazy=True)

def __repr__(self):
return f'<User {self.username}>'

class Post(db.Model):
__tablename__ = 'posts'

id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
created_at = db.Column(db.DateTime, server_default=db.func.now())

def __repr__(self):
return f'<Post {self.title}>'

# 创建数据库表
with app.app_context():
db.create_all()

CRUD 操作

from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)

# Create: 创建
@app.route('/user/create', methods=['POST'])
def create_user():
data = request.get_json()
username = data.get('username')
email = data.get('email')

# 方式1
user = User(username=username, email=email)
db.session.add(user)
db.session.commit()

# 方式2
# new_user = User(**data)
# db.session.add(new_user)
# db.session.commit()

return jsonify({'message': '用户创建成功', 'id': user.id})

# Read: 查询
@app.route('/user/<int:user_id>')
def get_user(user_id):
# 通过主键查询
user = User.query.get(user_id)

if not user:
return {'error': '用户不存在'}, 404

return {
'id': user.id,
'username': user.username,
'email': user.email
}

@app.route('/users')
def get_users():
# 查询所有
users = User.query.all()

# 条件查询
# users = User.query.filter_by(username='john').all()

# 排序
# users = User.query.order_by(User.username.desc()).all()

# 限制
# users = User.query.limit(10).all()

return [{
'id': user.id,
'username': user.username,
'email': user.email
} for user in users]

# Update: 更新
@app.route('/user/<int:user_id>', methods=['PUT'])
def update_user(user_id):
user = User.query.get(user_id)

if not user:
return {'error': '用户不存在'}, 404

data = request.get_json()

# 方式1
if 'username' in data:
user.username = data['username']
if 'email' in data:
user.email = data['email']

db.session.commit()

return {'message': '用户更新成功'}

# Delete: 删除
@app.route('/user/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
user = User.query.get(user_id)

if not user:
return {'error': '用户不存在'}, 404

db.session.delete(user)
db.session.commit()

return {'message': '用户删除成功'}

# 复杂查询
@app.route('/search')
def search_users():
keyword = request.args.get('q', '')

# 模糊查询
users = User.query.filter(User.username.like(f'%{keyword}%')).all()

# 或查询
# from sqlalchemy import or_
# users = User.query.filter(
# or_(
# User.username.like(f'%{keyword}%'),
# User.email.like(f'%{keyword}%')
# )
# ).all()

return [{
'id': user.id,
'username': user.username
} for user in users]

数据库迁移

# 使用 Flask-Migrate 管理数据库迁移

# 安装
# pip install flask-migrate

# 配置
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)
migrate = Migrate(app, db)

# 定义模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)

# 初始化迁移
# 在命令行执行:
# flask db init

# 创建迁移
# flask db migrate -m "Initial migration"

# 应用迁移
# flask db upgrade

# 回滚
# flask db downgrade

用户认证

Flask-Login

from flask import Flask, render_template, redirect, url_for, request, flash
from flask_login import LoginManager, UserMixin, login_user, logout_user
from flask_login import login_required, current_user
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'

# 用户模型
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(200), nullable=False)

def set_password(self, password):
self.password_hash = generate_password_hash(password)

def check_password(self, password):
return check_password_hash(self.password_hash, password)

@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))

# 注册
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')

# 检查用户是否存在
if User.query.filter_by(username=username).first():
flash('用户名已存在')
return redirect(url_for('register'))

# 创建用户
user = User(username=username, email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()

flash('注册成功')
return redirect(url_for('login'))

return render_template('register.html')

# 登录
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')

user = User.query.filter_by(username=username).first()

if user and user.check_password(password):
login_user(user)
flash('登录成功')
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('index'))
else:
flash('用户名或密码错误')

return render_template('login.html')

# 登出
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('已登出')
return redirect(url_for('index'))

# 受保护的路由
@app.route('/protected')
@login_required
def protected():
return f'你好, {current_user.username}!'

# 检查角色
@app.route('/admin')
@login_required
def admin():
if not current_user.is_admin:
flash('需要管理员权限')
return redirect(url_for('index'))
return '管理员页面'

Session 管理

from flask import Flask, session, redirect, url_for

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

# 设置 Session
@app.route('/set-session')
def set_session():
session['username'] = 'john'
session['user_id'] = 123
session.permanent = True # 持久化 Session
return 'Session 已设置'

# 获取 Session
@app.route('/get-session')
def get_session():
username = session.get('username')
user_id = session.get('user_id')
return f'用户: {username}, ID: {user_id}'

# 删除 Session
@app.route('/delete-session')
def delete_session():
session.pop('username', None)
session.pop('user_id', None)
return 'Session 已删除'

# 清除所有 Session
@app.route('/clear-session')
def clear_session():
session.clear()
return '所有 Session 已清除'

# Flash 消息
from flask import flash

@app.route('/flash-message')
def flash_message():
flash('这是一条消息')
flash('这是警告', 'warning')
flash('这是错误', 'error')
return redirect(url_for('index'))

# 在模板中显示 Flash 消息
"""
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category }}">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}
"""

RESTful API

API 设计

from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///api.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)
migrate = Migrate(app, db)

class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text)
completed = db.Column(db.Boolean, default=False)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))

def to_dict(self):
return {
'id': self.id,
'title': self.title,
'description': self.description,
'completed': self.completed
}

# API: 获取所有任务
@app.route('/api/tasks', methods=['GET'])
def get_tasks():
tasks = Task.query.all()
return jsonify([task.to_dict() for task in tasks])

# API: 获取单个任务
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
def get_task(task_id):
task = Task.query.get_or_404(task_id)
return jsonify(task.to_dict())

# API: 创建任务
@app.route('/api/tasks', methods=['POST'])
def create_task():
data = request.get_json()

if not data or not data.get('title'):
return jsonify({'error': '标题不能为空'}), 400

task = Task(
title=data['title'],
description=data.get('description', '')
)

db.session.add(task)
db.session.commit()

return jsonify(task.to_dict()), 201

# API: 更新任务
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
def update_task(task_id):
task = Task.query.get_or_404(task_id)
data = request.get_json()

task.title = data.get('title', task.title)
task.description = data.get('description', task.description)
task.completed = data.get('completed', task.completed)

db.session.commit()

return jsonify(task.to_dict())

# API: 删除任务
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
def delete_task(task_id):
task = Task.query.get_or_404(task_id)
db.session.delete(task)
db.session.commit()

return '', 204

# API: 分页
@app.route('/api/tasks/page', methods=['GET'])
def get_tasks_page():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)

pagination = Task.query.paginate(
page=page,
per_page=per_page,
error_out=False
)

return jsonify({
'tasks': [task.to_dict() for task in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
})

请求验证

from flask import Flask, request, jsonify
from functools import wraps

app = Flask(__name__)

# 验证装饰器
def validate_json(*fields):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
data = request.get_json()

if not data:
return jsonify({'error': '请求体必须为 JSON'}), 400

# 检查必需字段
missing_fields = [field for field in fields if field not in data]
if missing_fields:
return jsonify({
'error': f'缺少必需字段: {", ".join(missing_fields)}'
}), 400

return f(*args, **kwargs)
return wrapper
return decorator

# API Key 验证
def require_api_key(f):
@wraps(f)
def wrapper(*args, **kwargs):
api_key = request.headers.get('X-API-Key')

if not api_key or api_key != 'your-secret-api-key':
return jsonify({'error': '无效的 API Key'}), 401

return f(*args, **kwargs)
return wrapper

# 使用验证装饰器
@app.route('/api/users', methods=['POST'])
@validate_json('username', 'email')
@require_api_key
def create_user():
data = request.get_json()
# 处理逻辑...
return jsonify({'message': '用户创建成功'})

# 使用 Marshmallow 验证
from marshmallow import Schema, fields, validate, ValidationError

class UserSchema(Schema):
username = fields.Str(required=True, validate=validate.Length(min=4, max=20))
email = fields.Email(required=True)
age = fields.Int(validate=validate.Range(min=18, max=120))

@app.route('/api/users/validate', methods=['POST'])
def create_user_validated():
try:
data = UserSchema().load(request.get_json())
# 处理验证通过的数据
return jsonify(data)
except ValidationError as err:
return jsonify(err.messages), 400

错误处理

from flask import Flask, jsonify
from werkzeug.exceptions import HTTPException

app = Flask(__name__)

# 自定义异常
class InvalidUsage(Exception):
status_code = 400

def __init__(self, message, status_code=None, payload=None):
super().__init__()
self.message = message
if status_code is not None:
self.status_code = status_code
self.payload = payload

def to_dict(self):
rv = dict(self.payload or ())
rv['message'] = self.message
return rv

# 异常处理器
@app.errorhandler(InvalidUsage)
def handle_invalid_usage(error):
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response

# 404 处理
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': '资源未找到'}), 404

# 500 处理
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': '服务器内部错误'}), 500

# 所有 HTTP 异常
@app.errorhandler(HTTPException)
def handle_http_exception(error):
return jsonify({
'error': error.name,
'description': error.description
}), error.code

# 使用自定义异常
@app.route('/api/test')
def test():
raise InvalidUsage('这是一个错误消息', status_code=410)

# API 标准响应格式
def api_response(data=None, message='Success', code=200):
return jsonify({
'code': code,
'message': message,
'data': data
}), code

@app.route('/api/data')
def api_data():
return api_response(
data={'name': 'John', 'age': 30},
message='获取数据成功'
)

项目结构

推荐的项目结构

myproject/
├── app/
│ ├── __init__.py # 应用工厂
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── post.py
│ ├── routes/ # 路由
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ └── api.py
│ ├── templates/ # 模板
│ │ ├── base.html
│ │ ├── index.html
│ │ └── auth/
│ │ ├── login.html
│ │ └── register.html
│ ├── static/ # 静态文件
│ │ ├── css/
│ │ ├── js/
│ │ └── images/
│ └── forms/ # 表单
│ ├── __init__.py
│ └── auth.py
├── migrations/ # 数据库迁移
├── tests/ # 测试
│ ├── __init__.py
│ ├── test_auth.py
│ └── test_api.py
├── config.py # 配置文件
├── requirements.txt # 依赖
└── run.py # 运行文件

应用工厂模式

# config.py
import os

class Config:
SECRET_KEY = os.getenv('SECRET_KEY') or 'dev-key'
SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URI') or 'sqlite:///site.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False

class DevelopmentConfig(Config):
DEBUG = True

class ProductionConfig(Config):
DEBUG = False

class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///test.db'

config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig
}

# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager

db = SQLAlchemy()
login_manager = LoginManager()

def create_app(config_name='development'):
app = Flask(__name__)
app.config.from_object(config[config_name])

# 初始化扩展
db.init_app(app)
login_manager.init_app(app)
login_manager.login_view = 'auth.login'

# 注册蓝图
from app.routes.auth import auth_bp
from app.routes.main import main_bp
from app.routes.api import api_bp

app.register_blueprint(auth_bp, url_prefix='/auth')
app.register_blueprint(main_bp)
app.register_blueprint(api_bp, url_prefix='/api')

# 创建数据库表
with app.app_context():
db.create_all()

return app

# app/routes/auth.py
from flask import Blueprint, render_template, redirect, url_for
from flask_login import login_required, current_user

auth_bp = Blueprint('auth', __name__)

@auth_bp.route('/login')
def login():
return '登录页面'

@auth_bp.route('/logout')
@login_required
def logout():
return '登出'

# app/routes/main.py
from flask import Blueprint

main_bp = Blueprint('main', __name__)

@main_bp.route('/')
def index():
return '首页'

# app/routes/api.py
from flask import Blueprint

api_bp = Blueprint('api', __name__)

@api_bp.route('/data')
def get_data():
return {'data': []}

# run.py
from app import create_app

app = create_app('development')

if __name__ == '__main__':
app.run()

部署

使用 Gunicorn

# 安装 Gunicorn
pip install gunicorn

# 运行应用
gunicorn -w 4 -b 0.0.0.0:8000 "app:create_app()"

# 配置选项
# -w: 工作进程数 (通常为 CPU 核心数 * 2 + 1)
# -b: 绑定地址
# --timeout: 超时时间
# --access-logfile: 访问日志
# --error-logfile: 错误日志

gunicorn -w 4 -b 0.0.0.0:8000 \
--timeout 120 \
--access-logfile - \
--error-logfile - \
"app:create_app()"

使用 Nginx 反向代理

# /etc/nginx/sites-available/myproject

server {
listen 80;
server_name example.com;

location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}

location /static {
alias /path/to/your/app/static;
}

location /media {
alias /path/to/your/app/media;
}
}

Docker 部署

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 8000

# 运行应用
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "app:create_app()"]
# docker-compose.yml
version: '3.8'

services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- DATABASE_URI=postgresql://user:pass@db:5432/mydb
depends_on:
- db

db:
image: postgres:13
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=mydb
volumes:
- postgres_data:/var/lib/postgresql/data

volumes:
postgres_data:

最佳实践

安全建议

from flask import Flask, session
from flask_talisman import Talisman
from flask_cors import CORS

app = Flask(__name__)

# 1. 使用 HTTPS 强制
Talisman(app, force_https=True)

# 2. 设置安全的 Cookie
app.config.update(
SESSION_COOKIE_SECURE=True, # 仅 HTTPS
SESSION_COOKIE_HTTPONLY=True, # 防止 JavaScript 访问
SESSION_COOKIE_SAMESITE='Lax' # 防止 CSRF
)

# 3. CORS 配置
CORS(app, resources={
r"/api/*": {
"origins": ["https://example.com"],
"methods": ["GET", "POST"],
"allow_headers": ["Content-Type"]
}
})

# 4. 密码加密
from werkzeug.security import generate_password_hash, check_password_hash

def hash_password(password):
return generate_password_hash(password, method='pbkdf2:sha256')

def verify_password hashed_password, password):
return check_password_hash(hashed_password, password)

# 5. 防止 SQL 注入 (使用 ORM)
# 好的做法
user = User.query.filter_by(username=username).first()

# 不好的做法
# user = db.execute(f"SELECT * FROM users WHERE username = '{username}'")

# 6. 输入验证
from wtforms import StringField, validators

class SafeForm(FlaskForm):
username = StringField('用户名', [
validators.Length(min=4, max=20),
validators.Regexp('^[a-zA-Z0-9_]+$')
])

性能优化

from flask import Flask
from flask_caching import Cache
from functools import wraps

app = Flask(__name__)

# 1. 使用缓存
cache = Cache(app, config={
'CACHE_TYPE': 'redis',
'CACHE_REDIS_URL': 'redis://localhost:6379/0'
})

@app.route('/api/data')
@cache.cached(timeout=60) # 缓存 60 秒
def get_data():
# 耗时操作
return {'data': []}

# 2. 数据库查询优化
# 好的做法:使用 join 减少查询次数
users = User.query.options(
db.joinedload(User.posts)
).all()

# 不好的做法:N+1 查询
users = User.query.all()
for user in users:
posts = user.posts # 每次都查询数据库

# 3. 使用分页
@app.route('/users')
def list_users():
page = request.args.get('page', 1, type=int)
pagination = User.query.paginate(
page=page,
per_page=20
)
return render_template('users.html', users=pagination.items)

# 4. 压缩响应
from flask_compress import Compress

Compress(app)

# 5. 异步任务
from celery import Celery

celery = Celery(app.name, broker='redis://localhost:6379/0')

@celery.task
def send_email(to, subject):
# 发送邮件
pass

@app.route('/register')
def register():
send_email.delay('user@example.com', '欢迎')
return '注册成功'

测试

import pytest
from app import create_app, db
from app.models import User

@pytest.fixture
def app():
app = create_app('testing')
with app.app_context():
db.create_all()
yield app
db.session.remove()
db.drop_all()

@pytest.fixture
def client(app):
return app.test_client()

def test_index(client):
response = client.get('/')
assert response.status_code == 200

def test_register(client):
response = client.post('/register', data={
'username': 'testuser',
'email': 'test@example.com',
'password': 'password123'
})
assert response.status_code == 302 # 重定向

user = User.query.filter_by(username='testuser').first()
assert user is not None

总结

Flask 是一个灵活强大的 Web 框架:

核心概念

  • 轻量级:核心简洁,按需扩展
  • 路由系统:灵活的 URL 路由和参数处理
  • 模板引擎:Jinja2 提供强大的模板功能
  • 请求处理:完善的请求和响应处理
  • 数据库集成:SQLAlchemy 提供 ORM 支持
  • 用户认证:Flask-Login 管理用户会话
  • RESTful API:轻松构建 API 服务

关键特性

  1. 应用工厂模式组织代码
  2. 蓝图(Blueprint)模块化路由
  3. 丰富的第三方扩展
  4. 完善的测试支持
  5. 多种部署方案

最佳实践

  1. 使用虚拟环境隔离依赖
  2. 应用工厂模式创建应用
  3. 蓝图组织路由和视图
  4. ORM 操作数据库防止 SQL 注入
  5. 使用表单验证库处理输入
  6. 实现完善的错误处理
  7. 编写单元测试保证质量
  8. 生产环境使用 Gunicorn + Nginx

Flask 适合快速开发中小型应用和 API,是学习 Web 开发的优秀框架。