跳到主要内容

PyTorch

PyTorch 是 Facebook 开发的开源深度学习框架,提供灵活的动态计算图和强大的 GPU 加速能力,是研究和工业界最流行的深度学习框架之一。

简介

PyTorch 特性

"""
PyTorch 核心特性:
- 动态计算图: 灵活的图结构,便于调试
- Tensor计算: 类似NumPy的张量操作,支持GPU加速
- 自动求导: torch.autograd自动微分
- 丰富API: 神经网络层、优化器、损失函数
- 模型部署: 支持移动端和Web端部署
- 社区活跃: 大量预训练模型和工具
- Python风格: 直观的Python接口

适用场景:
- 计算机视觉: 图像分类、目标检测、图像分割
- 自然语言处理: 文本分类、机器翻译、问答系统
- 强化学习: 游戏、机器人控制
- 生成模型: GAN、VAE、扩散模型
- 时序预测: 时间序列、语音识别
"""

安装 PyTorch

# 创建虚拟环境
python -m venv venv

# Windows 激活
venv\Scripts\activate

# Linux/Mac 激活
source venv/bin/activate

# CPU版本
pip install torch torchvision torchaudio

# GPU版本 (CUDA 11.8)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# GPU版本 (CUDA 12.1)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

# 查看版本
python -c "import torch; print(torch.__version__)"

# 验证GPU是否可用
python -c "import torch; print(torch.cuda.is_available())"

# 安装其他工具
pip install numpy matplotlib pandas scikit-learn
pip install pillow # 图像处理
pip install tqdm # 进度条

快速开始

Tensor基础

import torch

# 创建Tensor
x = torch.tensor([1, 2, 3, 4])
print(x)

# 查看设备
print(f'设备: {x.device}')

# 查看形状
print(f'形状: {x.shape}')

# 查看数据类型
print(f'类型: {x.dtype}')

# 创建在GPU上
if torch.cuda.is_available():
x_gpu = torch.tensor([1, 2, 3], device='cuda')
# 或
x_gpu = torch.tensor([1, 2, 3]).cuda()

# Tensor与NumPy转换
import numpy as np
a = np.array([1, 2, 3])
t = torch.from_numpy(a) # NumPy转Tensor
n = t.numpy() # Tensor转NumPy

Tensor操作

创建Tensor

import torch

# 从列表创建
x = torch.tensor([1, 2, 3, 4])

# 指定类型
x = torch.tensor([1, 2, 3], dtype=torch.float32)

# 创建全零Tensor
x = torch.zeros(3, 4)
x = torch.zeros_like(x) # 按形状创建

# 创建全一Tensor
x = torch.ones(3, 4)
x = torch.ones_like(x)

# 创建单位矩阵
x = torch.eye(3)

# 创建随机Tensor
x = torch.randn(3, 4) # 标准正态分布
x = torch.rand(3, 4) # 均匀分布[0,1)
x = torch.randint(0, 10, (3, 4)) # 整数随机

# 创建范围Tensor
x = torch.arange(0, 10, 1) # [0, 1, 2, ..., 9]
x = torch.linspace(0, 10, 5) # [0.0, 2.5, 5.0, 7.5, 10.0]

# 未初始化Tensor
x = torch.empty(3, 4)

Tensor索引和切片

import torch

x = torch.arange(12).reshape(3, 4)

# 索引
print(x[0, 0]) # 第1行第1列
print(x[0]) # 第1行
print(x[:, 0]) # 第1列

# 切片
print(x[0:2, :]) # 前2行
print(x[:, 1:3]) # 第2-3列

# 高级索引
indices = torch.tensor([0, 2])
print(x[:, indices]) # 第1列和第3列

# 布尔索引
mask = x > 5
print(x[mask])

# 条件索引
print(x[x > 5])

Tensor运算

import torch

x = torch.tensor([1, 2, 3, 4], dtype=torch.float32)
y = torch.tensor([5, 6, 7, 8], dtype=torch.float32)

# 算术运算
print(x + y) # 加法
print(x - y) # 减法
print(x * y) # 乘法(逐元素)
print(x / y) # 除法
print(x @ y) # 矩阵乘法
print(torch.matmul(x, y)) # 矩阵乘法

# 标量运算
print(x + 10)
print(x * 2)

# 广播机制
x = torch.ones(3, 1)
y = torch.ones(1, 3)
print(x + y) # (3,1)+(1,3) -> (3,3)

# 数学函数
print(torch.sqrt(x))
print(torch.exp(x))
print(torch.log(x))
print(torch.abs(x))

# 聚合函数
x = torch.randn(3, 4)
print(x.sum()) # 所有元素求和
print(x.sum(dim=0)) # 列求和
print(x.sum(dim=1)) # 行求和
print(x.mean()) # 平均值
print(x.std()) # 标准差
print(x.max()) # 最大值
print(x.argmax()) # 最大值索引

# 形状操作
x = torch.arange(12)
x = x.reshape(3, 4) # 重塑
x = x.view(3, 4) # 重塑(共享内存)
x = x.unsqueeze(0) # 增加维度
x = x.squeeze() # 减少维度
x = x.transpose(0, 1) # 转置
x = x.permute(1, 0, 2) # 多维转置

# 拼接和分割
x1 = torch.randn(2, 3)
x2 = torch.randn(2, 3)
x = torch.cat([x1, x2], dim=0) # 拼接(纵向)
x = torch.cat([x1, x2], dim=1) # 拼接(横向)
x = torch.stack([x1, x2], dim=0) # 堆叠

# 修改形状
x = torch.randn(2, 3)
y = x.flatten() # 展平
y = x.view(-1) # 展平(自动推断)

自动求导

import torch

# 创建需要梯度的Tensor
x = torch.tensor([2.0, 3.0], requires_grad=True)

# 定义计算
y = x ** 2 + 2 * x + 1

# 计算梯度
y_sum = y.sum()
y_sum.backward()

# 查看梯度
print(x.grad) # dy/dx = 2*x + 2 = [6, 8]

# 清除梯度
x.grad.zero_()

# 复杂计算
x = torch.ones(2, 2, requires_grad=True)
y = x + 2
z = y * y * 3
out = z.mean()
out.backward()
print(x.grad)

# 控制梯度计算
with torch.no_grad():
# 在此块内的计算不会追踪梯度
y = x * 2

# 或使用 detach()
y = x.detach() # 分离出不需要梯度的Tensor

神经网络基础

nn.Module

import torch
import torch.nn as nn

# 定义神经网络
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super(SimpleNet, self).__init__()
# 定义层
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, num_classes)

def forward(self, x):
# 前向传播
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
return out

# 创建模型
model = SimpleNet(input_size=784, hidden_size=128, num_classes=10)

# 查看模型结构
print(model)

# 查看模型参数
for name, param in model.named_parameters():
print(f'{name}: {param.shape}')

# 前向传播
x = torch.randn(64, 784) # batch_size=64
output = model(x)
print(output.shape) # [64, 10]

损失函数

import torch.nn as nn

# 分类损失
# 交叉熵损失 (包含softmax)
criterion = nn.CrossEntropyLoss()
# logits: [batch_size, num_classes]
# target: [batch_size] (类别索引)
output = torch.randn(3, 5) # 3个样本,5个类别
target = torch.tensor([1, 2, 0]) # 真实类别
loss = criterion(output, target)

# 二分类交叉熵损失
criterion = nn.BCEWithLogitsLoss() # 包含sigmoid
# logits: [batch_size, 1] or [batch_size]
# target: [batch_size] (0或1)

# 回归损失
criterion = nn.MSELoss() # 均方误差
criterion = nn.L1Loss() # 平均绝对误差

# 使用
pred = model(x)
loss = criterion(pred, target)

反向传播

import torch.optim as optim

# 定义优化器
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# 训练步骤
# 1. 前向传播
output = model(input)

# 2. 计算损失
loss = criterion(output, target)

# 3. 清除梯度
optimizer.zero_grad()

# 4. 反向传播
loss.backward()

# 5. 更新参数
optimizer.step()

# 完整训练循环
for epoch in range(num_epochs):
for batch_x, batch_y in dataloader:
# 前向传播
output = model(batch_x)
loss = criterion(output, batch_y)

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

优化器

import torch.optim as optim

# SGD
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# Adam
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999))

# RMSprop
optimizer = optim.RMSprop(model.parameters(), lr=0.001, alpha=0.99)

# Adagrad
optimizer = optim.Adagrad(model.parameters(), lr=0.01)

# 学习率调度
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) # 每10个epoch降低10倍
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5) # 验证损失不降低时降低学习率

# 在训练循环中使用
for epoch in range(num_epochs):
# 训练
train(model, dataloader)
# 验证
val_loss = validate(model, val_loader)
# 更新学习率
scheduler.step(val_loss)

构建模型

线性层

import torch.nn as nn

# 全连接层
fc = nn.Linear(in_features=100, out_features=50)

# 示例
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, 10)
)

卷积层

import torch.nn as nn

# 2D卷积
conv = nn.Conv2d(
in_channels=3, # 输入通道数(RGB=3)
out_channels=64, # 输出通道数(卷积核数量)
kernel_size=3, # 卷积核大小
stride=1, # 步长
padding=1 # 填充
)

# 示例
model = nn.Sequential(
# Conv1
nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2), # 28x28 -> 14x14

# Conv2
nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2), # 14x14 -> 7x7

# Flatten
nn.Flatten(),

# FC
nn.Linear(64 * 7 * 7, 128),
nn.ReLU(),
nn.Linear(128, 10)
)

# 转置卷积 (上采样)
deconv = nn.ConvTranspose2d(
in_channels=64,
out_channels=32,
kernel_size=2,
stride=2
)

循环层

import torch.nn as nn

# RNN
rnn = nn.RNN(
input_size=100, # 输入特征维度
hidden_size=128, # 隐藏层大小
num_layers=2, # RNN层数
batch_first=True, # 输入形状为(batch, seq, feature)
bidirectional=False # 是否双向
)

# LSTM
lstm = nn.LSTM(
input_size=100,
hidden_size=128,
num_layers=2,
batch_first=True,
bidirectional=False,
dropout=0.5
)

# GRU
gru = nn.GRU(
input_size=100,
hidden_size=128,
num_layers=2,
batch_first=True,
bidirectional=False
)

# 示例
class LSTMModel(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
super(LSTMModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, 2)

def forward(self, x):
# x: [batch_size, seq_len]
embed = self.embedding(x) # [batch_size, seq_len, embed_size]
lstm_out, (h_n, c_n) = self.lstm(embed)
out = self.fc(lstm_out[:, -1, :]) # 取最后时刻
return out

激活函数

import torch.nn as nn

# ReLU
relu = nn.ReLU()

# LeakyReLU
leaky_relu = nn.LeakyReLU(negative_slope=0.01)

# Sigmoid
sigmoid = nn.Sigmoid()

# Tanh
tanh = nn.Tanh()

# Softmax
softmax = nn.Softmax(dim=1)

# GELU (用于Transformer)
gelu = nn.GELU()

# 在模型中使用
model = nn.Sequential(
nn.Linear(100, 50),
nn.ReLU(),
nn.Linear(50, 10)
)

池化层

import torch.nn as nn

# 最大池化
maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

# 平均池化
avgpool = nn.AvgPool2d(kernel_size=2, stride=2)

# 自适应平均池化 (输出固定大小)
adapool = nn.AdaptiveAvgPool2d((1, 1)) # 任意大小 -> 1x1

# 全局平均池化
class GlobalAvgPool(nn.Module):
def forward(self, x):
return x.mean(dim=[2, 3]) # [batch, channel, h, w] -> [batch, channel]

批归一化

import torch.nn as nn

# 1D批归一化 (用于全连接层)
bn1d = nn.BatchNorm1d(num_features=100)

# 2D批归一化 (用于CNN)
bn2d = nn.BatchNorm2d(num_features=64)

# 层归一化
ln = nn.LayerNorm(normalized_shape=100)

# 在CNN中使用
model = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2)
)

Dropout

import torch.nn as nn

# Dropout
dropout = nn.Dropout(p=0.5) # 丢弃概率

# Dropout2d (用于CNN)
dropout2d = nn.Dropout2d(p=0.5)

# 在模型中使用
model = nn.Sequential(
nn.Linear(100, 50),
nn.ReLU(),
nn.Dropout(0.5), # 训练时丢弃,测试时不丢弃
nn.Linear(50, 10)
)

# 训练时注意
model.train() # 设置为训练模式
model.eval() # 设置为评估模式

数据加载

Dataset

from torch.utils.data import Dataset

# 自定义Dataset
class CustomDataset(Dataset):
def __init__(self, data, labels):
self.data = data
self.labels = labels

def __len__(self):
return len(self.data)

def __getitem__(self, idx):
x = self.data[idx]
y = self.labels[idx]
return x, y

# 使用
dataset = CustomDataset(data, labels)
sample = dataset[0] # 获取第一个样本
x, y = sample

DataLoader

from torch.utils.data import DataLoader

# 创建DataLoader
dataloader = DataLoader(
dataset=dataset,
batch_size=32, # 批次大小
shuffle=True, # 是否打乱数据
num_workers=4, # 加载数据的进程数
pin_memory=True, # 是否锁页内存(加速GPU传输)
drop_last=True # 丢弃最后不完整的batch
)

# 遍历数据
for batch_x, batch_y in dataloader:
# batch_x: [batch_size, ...]
# batch_y: [batch_size, ...]
output = model(batch_x)
loss = criterion(output, batch_y)

数据变换

from torchvision import transforms

# 常用变换
transform = transforms.Compose([
transforms.Resize(256), # 调整大小
transforms.CenterCrop(224), # 中心裁剪
transforms.ToTensor(), # 转为Tensor
transforms.Normalize( # 标准化
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])

# 数据增强
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224), # 随机裁剪
transforms.RandomHorizontalFlip(), # 随机水平翻转
transforms.RandomRotation(15), # 随机旋转
transforms.ColorJitter( # 颜色抖动
brightness=0.2,
contrast=0.2,
saturation=0.2
),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])

# 使用
from torchvision.datasets import ImageFolder
dataset = ImageFolder('data/train', transform=transform)

训练流程

完整训练循环

import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

# 定义模型
model = SimpleNet(input_size=784, hidden_size=128, num_classes=10)

# 定义损失函数
criterion = nn.CrossEntropyLoss()

# 定义优化器
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 学习率调度器
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# 训练循环
num_epochs = 50
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

for epoch in range(num_epochs):
# 训练阶段
model.train()
train_loss = 0
train_correct = 0
train_total = 0

pbar = tqdm(dataloader, desc=f'Epoch {epoch+1}/{num_epochs}')
for batch_x, batch_y in pbar:
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)

# 前向传播
output = model(batch_x)
loss = criterion(output, batch_y)

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

# 统计
train_loss += loss.item()
_, predicted = output.max(1)
train_total += batch_y.size(0)
train_correct += predicted.eq(batch_y).sum().item()

# 更新进度条
pbar.set_postfix({
'loss': f'{loss.item():.4f}',
'acc': f'{100.*train_correct/train_total:.2f}%'
})

# 计算平均损失和准确率
train_loss = train_loss / len(dataloader)
train_acc = 100. * train_correct / train_total

print(f'Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')

# 验证阶段
model.eval()
val_loss = 0
val_correct = 0
val_total = 0

with torch.no_grad():
for batch_x, batch_y in val_loader:
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)

output = model(batch_x)
loss = criterion(output, batch_y)

val_loss += loss.item()
_, predicted = output.max(1)
val_total += batch_y.size(0)
val_correct += predicted.eq(batch_y).sum().item()

val_loss = val_loss / len(val_loader)
val_acc = 100. * val_correct / val_total

print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')

# 更新学习率
scheduler.step()

模型保存与加载

# 保存整个模型
torch.save(model, 'model.pth')

# 保存模型参数 (推荐)
torch.save(model.state_dict(), 'model_state_dict.pth')

# 保存检查点 (包含优化器等)
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
}, 'checkpoint.pth')

# 加载模型
model = torch.load('model.pth')

# 加载模型参数
model = SimpleNet(input_size=784, hidden_size=128, num_classes=10)
model.load_state_dict(torch.load('model_state_dict.pth'))
model.eval()

# 加载检查点
checkpoint = torch.load('checkpoint.pth')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']

CNN

卷积神经网络

import torch
import torch.nn as nn

class CNN(nn.Module):
def __init__(self, num_classes=10):
super(CNN, self).__init__()
self.features = nn.Sequential(
# Block 1
nn.Conv2d(3, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.Conv2d(64, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2), # 32x32 -> 16x16

# Block 2
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.Conv2d(128, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2), # 16x16 -> 8x8

# Block 3
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2), # 8x8 -> 4x4
)

self.classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(256 * 4 * 4, 512),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(512, num_classes)
)

def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x

# 创建模型
model = CNN(num_classes=10)

图像分类示例

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms

# 数据变换
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# 加载数据
trainset = datasets.CIFAR10(root='./data', train=True,
download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=32,
shuffle=True, num_workers=2)

testset = datasets.CIFAR10(root='./data', train=False,
download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=32,
shuffle=False, num_workers=2)

# 训练
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CNN(num_classes=10).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

for epoch in range(10):
running_loss = 0.0
for i, (inputs, labels) in enumerate(trainloader, 0):
inputs, labels = inputs.to(device), labels.to(device)

optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

running_loss += loss.item()
if i % 100 == 99:
print(f'[{epoch+1}, {i+1}] loss: {running_loss/100:.3f}')
running_loss = 0.0

print('Finished Training')

RNN

LSTM模型

import torch.nn as nn

class LSTMModel(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers, num_classes):
super(LSTMModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(
embed_size,
hidden_size,
num_layers,
batch_first=True,
bidirectional=True
)
self.fc = nn.Linear(hidden_size * 2, num_classes) # 双向LSTM

def forward(self, x):
# x: [batch_size, seq_len]
embed = self.embedding(x) # [batch_size, seq_len, embed_size]
lstm_out, (h_n, c_n) = self.lstm(embed)
# 使用最后时刻的输出
out = self.fc(lstm_out[:, -1, :])
return out

# 使用
model = LSTMModel(
vocab_size=10000,
embed_size=128,
hidden_size=256,
num_layers=2,
num_classes=2
)

GRU模型

class GRUModel(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers, num_classes):
super(GRUModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.gru = nn.GRU(
embed_size,
hidden_size,
num_layers,
batch_first=True,
bidirectional=True
)
self.fc = nn.Linear(hidden_size * 2, num_classes)

def forward(self, x):
embed = self.embedding(x)
gru_out, h_n = self.gru(embed)
out = self.fc(gru_out[:, -1, :])
return out

Transformer

自注意力机制

import torch
import torch.nn as nn
import torch.nn.functional as F

class SelfAttention(nn.Module):
def __init__(self, embed_size, heads):
super(SelfAttention, self).__init__()
self.embed_size = embed_size
self.heads = heads
self.head_dim = embed_size // heads

assert (self.head_dim * heads == embed_size), "Embed size needs to be divisible by heads"

self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.fc_out = nn.Linear(heads * self.head_dim, embed_size)

def forward(self, values, keys, query, mask):
N = query.shape[0]
value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

# Split into multiple heads
values = values.reshape(N, value_len, self.heads, self.head_dim)
keys = keys.reshape(N, key_len, self.heads, self.head_dim)
queries = query.reshape(N, query_len, self.heads, self.head_dim)

values = self.values(values)
keys = self.keys(keys)
queries = self.queries(queries)

# QK^T / sqrt(d_k)
energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])

if mask is not None:
energy = energy.masked_fill(mask == 0, float("-1e20"))

attention = torch.softmax(energy / (self.embed_size ** (1/2)), dim=3)

# Attention * V
out = torch.einsum("nhql,nlhd->nqhd", [attention, values])

out = out.reshape(N, query_len, self.heads * self.head_dim)
out = self.fc_out(out)
return out

Transformer块

class TransformerBlock(nn.Module):
def __init__(self, embed_size, heads, dropout, forward_expansion):
super(TransformerBlock, self).__init__()
self.attention = SelfAttention(embed_size, heads)
self.norm1 = nn.LayerNorm(embed_size)
self.norm2 = nn.LayerNorm(embed_size)

self.feed_forward = nn.Sequential(
nn.Linear(embed_size, forward_expansion * embed_size),
nn.ReLU(),
nn.Linear(forward_expansion * embed_size, embed_size)
)

self.dropout = nn.Dropout(dropout)

def forward(self, value, key, query, mask):
attention = self.attention(value, key, query, mask)
x = self.dropout(self.norm1(attention + query))
forward = self.feed_forward(x)
out = self.dropout(self.norm2(forward + x))
return out

完整Transformer

class Transformer(nn.Module):
def __init__(
self,
src_vocab_size,
trg_vocab_size,
embed_size=512,
num_layers=6,
forward_expansion=4,
heads=8,
dropout=0,
device="cuda",
max_length=100
):
super(Transformer, self).__init__()
self.encoder = Encoder(
src_vocab_size,
embed_size,
num_layers,
heads,
device,
forward_expansion,
dropout,
max_length
)

self.decoder = Decoder(
trg_vocab_size,
embed_size,
num_layers,
heads,
forward_expansion,
dropout,
device,
max_length
)

self.device = device
self.fc_out = nn.Linear(embed_size, trg_vocab_size)

def forward(self, src, trg, src_mask, trg_mask):
enc_src = self.encoder(src, src_mask)
out = self.decoder(trg, enc_src, src_mask, trg_mask)
out = self.fc_out(out)
return out

GPU加速

CUDA基础

# 检查CUDA是否可用
print(torch.cuda.is_available())

# 查看GPU数量
print(torch.cuda.device_count())

# 查看当前GPU
print(torch.cuda.current_device())

# 查看GPU名称
print(torch.cuda.get_device_name(0))

# 将模型移动到GPU
model = model.cuda()
# 或
device = torch.device('cuda:0')
model = model.to(device)

# 将Tensor移动到GPU
x = x.cuda()
# 或
x = x.to(device)

# 清除GPU缓存
torch.cuda.empty_cache()

混合精度训练

from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()

for inputs, labels in dataloader:
inputs, labels = inputs.cuda(), labels.cuda()

optimizer.zero_grad()

with autocast(): # 自动混合精度
outputs = model(inputs)
loss = criterion(outputs, labels)

scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()

迁移学习

import torchvision.models as models

# 加载预训练模型
model = models.resnet18(pretrained=True)

# 冻结参数
for param in model.parameters():
param.requires_grad = False

# 修改最后的全连接层
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, num_classes)

# 只训练最后的全连接层
optimizer = optim.SGD(model.fc.parameters(), lr=0.001, momentum=0.9)

# 或微调整个模型
for param in model.parameters():
param.requires_grad = True

optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9)

实战案例

图像分类

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models

# 数据准备
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

train_dataset = datasets.ImageFolder('data/train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)

# 迁移学习
model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# 训练
model.train()
for epoch in range(10):
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)

optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

文本分类

import torch
import torch.nn as nn

class TextClassifier(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_classes):
super(TextClassifier, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_size)
self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)

def forward(self, x):
embed = self.embedding(x)
lstm_out, (h_n, _) = self.lstm(embed)
out = self.fc(lstm_out[:, -1, :])
return out

# 使用
model = TextClassifier(vocab_size=10000, embed_size=128, hidden_size=256, num_classes=2)

最佳实践

模型训练技巧

# 1. 梯度裁剪 (防止梯度爆炸)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

# 2. 学习率预热
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

# 3. 早停
best_val_loss = float('inf')
patience = 5
counter = 0

for epoch in range(num_epochs):
train_loss = train_one_epoch(model, dataloader)
val_loss = validate(model, val_loader)

if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_model.pth')
counter = 0
else:
counter += 1
if counter >= patience:
print('Early stopping')
break

# 4. 模型集成
models = [model1, model2, model3]
for model in models:
model.eval()

predictions = []
for model in models:
pred = model(x)
predictions.append(pred)

ensemble_pred = torch.mean(torch.stack(predictions), dim=0)

代码组织

# 1. 分离模型定义
# models.py
class MyModel(nn.Module):
def __init__(self):
super(MyModel, self).__init__()
# ...

def forward(self, x):
# ...

# 2. 分离数据集
# dataset.py
class MyDataset(Dataset):
def __init__(self):
# ...

def __getitem__(self, idx):
# ...

# 3. 分离训练逻辑
# train.py
def train_one_epoch(model, dataloader, optimizer, criterion):
model.train()
for batch_x, batch_y in dataloader:
# ...
return avg_loss

# 4. 配置文件
# config.py
class Config:
batch_size = 32
learning_rate = 0.001
num_epochs = 100

总结

PyTorch 是深度学习的主流框架:

核心概念

  • Tensor: 基本数据结构,类似NumPy数组但支持GPU
  • Autograd: 自动求导系统
  • nn.Module: 神经网络模块基类
  • 优化器: 参数更新算法

主要功能

  1. Tensor操作: 创建、索引、运算、形状变换
  2. 神经网络: 层、激活函数、损失函数
  3. 数据加载: Dataset、DataLoader、变换
  4. 训练流程: 前向传播、反向传播、参数更新
  5. 模型构建: CNN、RNN、Transformer
  6. GPU加速: CUDA、混合精度训练
  7. 迁移学习: 预训练模型微调

最佳实践

  1. 使用DataLoader高效加载数据
  2. 合理使用学习率调度
  3. 使用验证集防止过拟合
  4. 定期保存模型检查点
  5. 使用混合精度训练加速
  6. 合理组织代码结构

PyTorch 灵活直观,适合研究和生产环境,是深度学习的首选框架之一。