Pytorch 卷积神经网络

第 1 节 概述

在本文节中,使用卷积神经网络,对上一篇的 Fashion-Minist 数据集再次进行分类。

第 2 节 分类

2.1 数据预处理 & 构建数据集

编写数据预处理函数:

  • ToTensor: 将图像的颜色从 0~255 变为 0~1,并转为张量 [N,C,H,W]
  • Normalize: 对图像进行标准化,颜色从 0~1 变为 -1~1

数据集划分为训练集、验证集和测试集。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import torchvision
from torchvision.transforms import Compose, ToTensor, Normalize
from torch.utils.data import random_split,DataLoader
def load_data_fashion_mnist(batch_size, val_ratio=0.2):
    trans = Compose([ToTensor(),Normalize((0.5,), (0.5,))])
    total_dataset = torchvision.datasets.FashionMNIST(
        root="../data", train=True, transform=trans, download=True)
    test_dataset = torchvision.datasets.FashionMNIST(
        root="../data", train=False, transform=trans, download=True)
    
    total_size = len(total_dataset)
    val_size = int(total_size * val_ratio)
    train_size = total_size - val_size
    train_dataset, val_dataset = random_split(
        total_dataset, [train_size, val_size]
    )
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True
    )
    val_loader = DataLoader(
        val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True
    )
    test_loader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True
    )
    
    return (train_loader,val_loader,test_loader)
batch_size = 256
train_loader, val_loader, test_loader = load_data_fashion_mnist(batch_size)

2.2 构建网络

使用卷积神经网络构建:

  • 首先使用大小为 3×3 的卷积核,填充 1 像素,卷积后尺寸为 32×28×28,经过最大池化后为 32×14×14
  • 然后使用大小为 3×3 的卷积核,填充 1 像素,卷积后尺寸为 64×14×14,经过最大池化后为 64×7×7
  • 经过展平、两次全连接和 relu,最终变为 10

构建卷积神经网络的模型时,没有使用 nn.Sequential 进行构建,通过继承 nn.Module 构建模型的方式具有一般性。

网络中的一些层使用了 nn 进行定义,而另外一些层使用了 F 进行定义,区别在于:

  • 使用 nn 进行定义的层,需要注册到 model.parameters() 中,在反向传播的过程中计算梯度并更新参数。或者像是 dropout,在训练和测试时会分别启用和自动关闭。
  • 使用 F 进行定义的层,其作为单独的函数无需计算梯度,也不会进行更新,使用 F 定义更加简洁。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import torch
import torch.nn as nn
import torch.nn.functional as F

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class FashionCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.fc1 = nn.Linear(64*7*7, 128)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

def init_weights(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)

model = FashionCNN()
model.apply(init_weights)
model.to(device)
FashionCNN(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (fc1): Linear(in_features=3136, out_features=128, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc2): Linear(in_features=128, out_features=10, bias=True)
)
1
sum(p.numel() for p in model.parameters())
421642
1
2
3
4
5
from torchviz import make_dot
x = torch.randn(1, 1, 28, 28).to(device, non_blocking=True)
y = model(x)
dot = make_dot(y, params=dict(model.named_parameters()))
dot

https://img.papergate.top:5000/i/2026/01/695a480605e4d.webp

2.3 设置优化器和损失函数

优化器使用 Adam,学习率为 10^-3,并设置 L2 正则防止过拟合,损失函数用均方误差损失。

1
2
3
import torch.optim as optim
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()

2.4 编写训练代码

设置最大 epoch 为 50,设置"早停",10 个 epoch 验证集没有优化就停止训练。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
max_epochs = 50
# 早停
patience_counter = 0
patience = 10
best_loss = float('inf')

for epoch in range(max_epochs):
    model.train()
    train_samples = 0
    train_loss = 0
    train_acc = 0
    for x, y in train_loader:
        x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
        y_hat = model(x)
        loss = criterion(y_hat, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_samples += len(x)
        train_loss += loss.item() * len(x)
        pred = torch.argmax(y_hat, axis=1)
        train_acc += (pred == y).sum().item()
    train_loss = train_loss / train_samples
    train_acc = train_acc / train_samples
    model.eval()
    val_samples = 0
    val_loss = 0
    val_acc = 0
    with torch.no_grad():
        for x,y in val_loader:
            x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
            y_hat = model(x)
            loss = criterion(y_hat, y)
            val_samples += len(x)
            val_loss += loss.item() * len(x)
            pred = torch.argmax(y_hat, axis=1)
            val_acc += (pred == y).sum().item()
        val_loss = val_loss / val_samples
        val_acc = val_acc / val_samples
        if val_loss < best_loss:
            best_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping at epoch {epoch + 1}")
                break
    if (epoch + 1) % 5 == 0:
        print(f"epoch {epoch + 1}: train_loss: {train_loss}, train_acc: {train_acc}, val_loss: {val_loss}, val_acc: {val_acc}")

test_samples = 0
test_loss = 0
test_acc = 0
for x,y in test_loader:
    x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
    y_hat = model(x)
    loss = criterion(y_hat,y)
    test_samples += len(x)
    test_loss += loss.item() * len(x)
    pred = torch.argmax(y_hat, axis=1)
    test_acc += (pred == y).sum().item()
test_loss = test_loss / test_samples
test_acc = test_acc / test_samples
print(f"epoch {epoch + 1}: train_loss: {train_loss}, train_acc: {train_acc}, val_loss: {val_loss}, val_acc: {val_acc}, test_loss: {test_loss}, test_acc: {test_acc}")
epoch 5: train_loss: 0.3323359337647756, train_acc: 0.882625, val_loss: 0.2947879132429759, val_acc: 0.8954166666666666
epoch 10: train_loss: 0.24026671481132508, train_acc: 0.9115833333333333, val_loss: 0.25261858995755515, val_acc: 0.9113333333333333
epoch 15: train_loss: 0.1967949519753456, train_acc: 0.9285625, val_loss: 0.23852672167619068, val_acc: 0.9178333333333333
epoch 20: train_loss: 0.16271487843990326, train_acc: 0.9406458333333333, val_loss: 0.22613940691947937, val_acc: 0.9230833333333334
epoch 25: train_loss: 0.1353553236722946, train_acc: 0.9498958333333334, val_loss: 0.23934940230846405, val_acc: 0.9220833333333334
Early stopping at epoch 30
epoch 30: train_loss: 0.11594114247957865, train_acc: 0.9563125, val_loss: 0.24821518286069233, val_acc: 0.9226666666666666, test_loss: 0.24525352787971497, test_acc: 0.923

第 3 节 LeNet-5

第一个真正成功的卷积神经网络,用于手写数字识别,奠定了"卷积 + 池化 + 全连接"的范式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
model = nn.Sequential(
    nn.Conv2d(1, 6, kernel_size=5, padding=2),
    nn.Tanh(),
    nn.AvgPool2d(kernel_size=2, stride=2),
    nn.Conv2d(6, 16, kernel_size=5),
    nn.Tanh(),
    nn.AvgPool2d(kernel_size=2, stride=2),
    nn.Flatten(),
    nn.Linear(16 * 5 * 5, 120),
    nn.Tanh(),
    nn.Linear(120, 84),
    nn.Tanh(),
    nn.Linear(84, 10))

x = torch.rand(1, 1, 28, 28)
for layer in model:
    if isinstance(layer,nn.Conv2d) or isinstance(layer,nn.Linear):
        print('-'*50)
    x = layer(x)
    print(layer.__class__.__name__,'output shape: \t',x.shape)
--------------------------------------------------
Conv2d output shape:     torch.Size([1, 6, 28, 28])
Tanh output shape:   torch.Size([1, 6, 28, 28])
AvgPool2d output shape:      torch.Size([1, 6, 14, 14])
--------------------------------------------------
Conv2d output shape:     torch.Size([1, 16, 10, 10])
Tanh output shape:   torch.Size([1, 16, 10, 10])
AvgPool2d output shape:      torch.Size([1, 16, 5, 5])
Flatten output shape:    torch.Size([1, 400])
--------------------------------------------------
Linear output shape:     torch.Size([1, 120])
Tanh output shape:   torch.Size([1, 120])
--------------------------------------------------
Linear output shape:     torch.Size([1, 84])
Tanh output shape:   torch.Size([1, 84])
--------------------------------------------------
Linear output shape:     torch.Size([1, 10])
1
sum(p.numel() for p in model.parameters())
61706

第 4 节 AlexNet

使用 ReLU 作为激活函数,在全连接层使用 Dropout,有效抑制过拟合,使用了随机裁剪、水平翻转、颜色扰动等数据增强,使用了局部响应归一化 (LRN)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
model = nn.Sequential(
    # Conv1
    nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    # Conv2
    nn.Conv2d(96, 256, kernel_size=5, padding=2),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    # Conv3
    nn.Conv2d(256, 384, kernel_size=3, padding=1),
    nn.ReLU(),
    # Conv4
    nn.Conv2d(384, 384, kernel_size=3, padding=1),
    nn.ReLU(),
    # Conv5
    nn.Conv2d(384, 256, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nn.Flatten(),
    # FC6
    nn.Linear(6400, 4096),
    nn.ReLU(),
    nn.Dropout(p=0.5),
    # FC7
    nn.Linear(4096, 4096),
    nn.ReLU(),
    nn.Dropout(p=0.5),
    # FC8
    nn.Linear(4096, 1000))
x = torch.randn(1, 3, 224, 224)
for layer in model:
    if isinstance(layer,nn.Conv2d) or isinstance(layer,nn.Linear):
        print('-'*50)
    x=layer(x)
    print(layer.__class__.__name__,'output shape:\t',x.shape)
--------------------------------------------------
Conv2d output shape:     torch.Size([1, 96, 54, 54])
ReLU output shape:   torch.Size([1, 96, 54, 54])
MaxPool2d output shape:  torch.Size([1, 96, 26, 26])
--------------------------------------------------
Conv2d output shape:     torch.Size([1, 256, 26, 26])
ReLU output shape:   torch.Size([1, 256, 26, 26])
MaxPool2d output shape:  torch.Size([1, 256, 12, 12])
--------------------------------------------------
Conv2d output shape:     torch.Size([1, 384, 12, 12])
ReLU output shape:   torch.Size([1, 384, 12, 12])
--------------------------------------------------
Conv2d output shape:     torch.Size([1, 384, 12, 12])
ReLU output shape:   torch.Size([1, 384, 12, 12])
--------------------------------------------------
Conv2d output shape:     torch.Size([1, 256, 12, 12])
ReLU output shape:   torch.Size([1, 256, 12, 12])
MaxPool2d output shape:  torch.Size([1, 256, 5, 5])
Flatten output shape:    torch.Size([1, 6400])
--------------------------------------------------
Linear output shape:     torch.Size([1, 4096])
ReLU output shape:   torch.Size([1, 4096])
Dropout output shape:    torch.Size([1, 4096])
--------------------------------------------------
Linear output shape:     torch.Size([1, 4096])
ReLU output shape:   torch.Size([1, 4096])
Dropout output shape:    torch.Size([1, 4096])
--------------------------------------------------
Linear output shape:     torch.Size([1, 1000])
1
sum(p.numel() for p in model.parameters())
50844008

第 5 节 VGG

VGG(Visual Geometry Group)网络是牛津大学 VGG 组在 2014 年提出的经典卷积神经网络,在 ImageNet 竞赛中表现非常突出。VGG 的核心思想就是用大量连续的 3×3 小卷积核堆叠,构建深层 CNN。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def vgg_block(num_convs, in_channels, out_channels):
    layers = []
    for _ in range(num_convs):
        layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
        layers.append(nn.ReLU())
        in_channels = out_channels
    layers.append(nn.MaxPool2d(kernel_size=2,stride=2))
    return nn.Sequential(*layers)

def vgg16():
    conv_arch = ((2, 64), (2, 128), (3, 256), (3, 512), (3, 512))
    conv_blks = []
    in_channels = 3
    for (num_convs, out_channels) in conv_arch:
        conv_blks.append(vgg_block(num_convs, in_channels, out_channels))
        in_channels = out_channels
    return nn.Sequential(
        *conv_blks,
        nn.Flatten(),
        nn.Linear(out_channels * 7 * 7, 4096),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(4096, 4096),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(4096, 1000))

model = vgg16()
x = torch.randn(size=(1, 3, 224, 224))
for block in model:
    x = block(x)
    print(block.__class__.__name__,'output shape:\t',x.shape)
Sequential output shape:     torch.Size([1, 64, 112, 112])
Sequential output shape:     torch.Size([1, 128, 56, 56])
Sequential output shape:     torch.Size([1, 256, 28, 28])
Sequential output shape:     torch.Size([1, 512, 14, 14])
Sequential output shape:     torch.Size([1, 512, 7, 7])
Flatten output shape:    torch.Size([1, 25088])
Linear output shape:     torch.Size([1, 4096])
ReLU output shape:   torch.Size([1, 4096])
Dropout output shape:    torch.Size([1, 4096])
Linear output shape:     torch.Size([1, 4096])
ReLU output shape:   torch.Size([1, 4096])
Dropout output shape:    torch.Size([1, 4096])
Linear output shape:     torch.Size([1, 1000])
1
sum(p.numel() for p in model.parameters())
138357544

第 6 节 GoogleNet

在 GoogLeNet 中,基本的卷积块被称为Inception 块(Inception block)。Inception 块由四条并行路径组成。

  • 前三条路径使用窗口大小为 1×11\times 13×33\times 35×55\times 5 的卷积层,从不同空间大小中提取信息。
  • 中间的两条路径在输入上执行 1×11\times 1 卷积,以减少通道数,从而降低模型的复杂性。
  • 第四条路径使用 3×33\times 3 最大汇聚层,然后使用 1×11\times 1 卷积层来改变通道数。

这四条路径都使用合适的填充来使输入与输出的高和宽一致,最后我们将每条线路的输出在通道维度上连结,并构成 Inception 块的输出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Inception(nn.Module):
    def __init__(self, in_channels, c1, c2, c3, c4, **kwargs):
        super(Inception, self).__init__(**kwargs)
        # 线路1,单1x1卷积层
        self.p1_1 = nn.Conv2d(in_channels, c1, kernel_size=1)
        # 线路2,1x1卷积层后接3x3卷积层
        self.p2_1 = nn.Conv2d(in_channels, c2[0], kernel_size=1)
        self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
        # 线路3,1x1卷积层后接5x5卷积层
        self.p3_1 = nn.Conv2d(in_channels, c3[0], kernel_size=1)
        self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
        # 线路4,3x3最大汇聚层后接1x1卷积层
        self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.p4_2 = nn.Conv2d(in_channels, c4, kernel_size=1)

    def forward(self, x):
        p1 = F.relu(self.p1_1(x))
        p2 = F.relu(self.p2_2(F.relu(self.p2_1(x))))
        p3 = F.relu(self.p3_2(F.relu(self.p3_1(x))))
        p4 = F.relu(self.p4_2(self.p4_1(x)))
        # 在通道维度上连结输出
        return torch.cat((p1, p2, p3, p4), dim=1)

第一个模块使用 64 个通道、7×77\times 7 卷积层。

1
2
3
b1 = nn.Sequential(nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
                   nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

第二个模块使用两个卷积层:第一个卷积层是 64 个通道、1×11\times 1 卷积层;第二个卷积层使用将通道数量增加三倍的 3×33\times 3 卷积层。 这对应于 Inception 块中的第二条路径。

1
2
3
4
5
b2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=1),
                   nn.ReLU(),
                   nn.Conv2d(64, 192, kernel_size=3, padding=1),
                   nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

第三个模块串联两个完整的 Inception 块。

  • 第一个 Inception 块的输出通道数为 64+128+32+32=25664+128+32+32=256,四个路径之间的输出通道数量比为 64:128:32:32=2:4:1:164:128:32:32=2:4:1:1。第二个和第三个路径首先将输入通道的数量分别减少到 96/192=1/296/192=1/216/192=1/1216/192=1/12,然后连接第二个卷积层。
  • 第二个 Inception 块的输出通道数增加到 128+192+96+64=480128+192+96+64=480,四个路径之间的输出通道数量比为 128:192:96:64=4:6:3:2128:192:96:64 = 4:6:3:2。第二条和第三条路径首先将输入通道的数量分别减少到 128/256=1/2128/256=1/232/256=1/832/256=1/8
1
2
3
b3 = nn.Sequential(Inception(192, 64, (96, 128), (16, 32), 32),
                   Inception(256, 128, (128, 192), (32, 96), 64),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

第四模块更加复杂,它串联了 5 个 Inception 块,其输出通道数分别是 192+208+48+64=512192+208+48+64=512160+224+64+64=512160+224+64+64=512128+256+64+64=512128+256+64+64=512112+288+64+64=528112+288+64+64=528256+320+128+128=832256+320+128+128=832

这些路径的通道数分配和第三模块中的类似,首先是含 3×33×3 卷积层的第二条路径输出最多通道,其次是仅含 1×11×1 卷积层的第一条路径,之后是含 5×55×5 卷积层的第三条路径和含 3×33×3 最大汇聚层的第四条路径。其中第二、第三条路径都会先按比例减小通道数。 这些比例在各个 Inception 块中都略有不同。

1
2
3
4
5
6
b4 = nn.Sequential(Inception(480, 192, (96, 208), (16, 48), 64),
                   Inception(512, 160, (112, 224), (24, 64), 64),
                   Inception(512, 128, (128, 256), (24, 64), 64),
                   Inception(512, 112, (144, 288), (32, 64), 64),
                   Inception(528, 256, (160, 320), (32, 128), 128),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

第五模块包含输出通道数为 256+320+128+128=832256+320+128+128=832384+384+128+128=1024384+384+128+128=1024 的两个 Inception 块。其中每条路径通道数的分配思路和第三、第四模块中的一致,只是在具体数值上有所不同。

需要注意的是,第五模块的后面紧跟输出层,该模块使用全局平均汇聚层,将每个通道的高和宽变成 1。最后我们将输出变成二维数组,再接上一个输出个数为标签类别数的全连接层。

1
2
3
4
5
6
b5 = nn.Sequential(Inception(832, 256, (160, 320), (32, 128), 128),
                   Inception(832, 384, (192, 384), (48, 128), 128),
                   nn.AdaptiveAvgPool2d((1,1)),
                   nn.Flatten())

model = nn.Sequential(b1, b2, b3, b4, b5, nn.Linear(1024, 1000))
1
2
3
4
x = torch.rand(size=(1, 3, 224, 224))
for layer in model:
    x = layer(x)
    print(layer.__class__.__name__,'output shape:\t', x.shape)
Sequential output shape:     torch.Size([1, 64, 56, 56])
Sequential output shape:     torch.Size([1, 192, 28, 28])
Sequential output shape:     torch.Size([1, 480, 14, 14])
Sequential output shape:     torch.Size([1, 832, 7, 7])
Sequential output shape:     torch.Size([1, 1024])
Linear output shape:     torch.Size([1, 1000])
1
sum(p.numel() for p in model.parameters())
6998552

第 7 节 批量归一化

在训练深层网络时,中间层的数值分布会不断变化、有时变化幅度还很大,这会让训练变得困难、收敛变慢;批量归一化就是为了解决这种"分布不稳定"的问题。

xB\mathbf{x} \in \mathcal{B} 表示一个来自小批量 B\mathcal{B} 的输入,批量规范化 BN\mathrm{BN}

BN(x)=γxμ^Bσ^B+β.\mathrm{BN}(\mathbf{x}) = \boldsymbol{\gamma} \odot \frac{\mathbf{x} - \hat{\boldsymbol{\mu}}_\mathcal{B}}{\hat{\boldsymbol{\sigma}}_\mathcal{B}} + \boldsymbol{\beta}.

μ^B\hat{\boldsymbol{\mu}}_\mathcal{B} 是小批量 B\mathcal{B} 的样本均值,σ^B\hat{\boldsymbol{\sigma}}_\mathcal{B} 是小批量 B\mathcal{B} 的样本标准差。 标准化后,生成的小批量的平均值为 0 和单位方差为 1。

拉伸参数(scale)γ\boldsymbol{\gamma}偏移参数(shift)β\boldsymbol{\beta},它们的形状与 x\mathbf{x} 相同,参与反向传播,用于让网络"有权决定要不要标准化,以及标准化到什么程度"。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def batch_norm(X, gamma, beta, moving_mean, moving_var, eps, momentum):
    # 通过is_grad_enabled来判断当前模式是训练模式还是预测模式
    if not torch.is_grad_enabled():
        # 如果是在预测模式下,直接使用传入的移动平均所得的均值和方差
        X_hat = (X - moving_mean) / torch.sqrt(moving_var + eps)
    else:
        assert len(X.shape) in (2, 4)
        if len(X.shape) == 2:
            # 使用全连接层的情况,计算特征维上的均值和方差
            mean = X.mean(dim=0)
            var = ((X - mean) ** 2).mean(dim=0)
        else:
            # 使用二维卷积层的情况,计算通道维上(axis=1)的均值和方差。
            # 这里我们需要保持X的形状以便后面可以做广播运算
            mean = X.mean(dim=(0, 2, 3), keepdim=True)
            var = ((X - mean) ** 2).mean(dim=(0, 2, 3), keepdim=True)
        # 训练模式下,用当前的均值和方差做标准化
        X_hat = (X - mean) / torch.sqrt(var + eps)
        # 更新移动平均的均值和方差
        moving_mean = momentum * moving_mean + (1.0 - momentum) * mean
        moving_var = momentum * moving_var + (1.0 - momentum) * var
    Y = gamma * X_hat + beta  # 缩放和移位
    return Y, moving_mean.data, moving_var.data
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class BatchNorm(nn.Module):
    # num_features:完全连接层的输出数量或卷积层的输出通道数。
    # num_dims:2表示完全连接层,4表示卷积层
    def __init__(self, num_features, num_dims):
        super().__init__()
        if num_dims == 2:
            shape = (1, num_features)
        else:
            shape = (1, num_features, 1, 1)
        # 参与求梯度和迭代的拉伸和偏移参数,分别初始化成1和0
        self.gamma = nn.Parameter(torch.ones(shape))
        self.beta = nn.Parameter(torch.zeros(shape))
        # 非模型参数的变量初始化为0和1
        self.moving_mean = torch.zeros(shape)
        self.moving_var = torch.ones(shape)

    def forward(self, X):
        # 如果X不在内存上,将moving_mean和moving_var
        # 复制到X所在显存上
        if self.moving_mean.device != X.device:
            self.moving_mean = self.moving_mean.to(X.device)
            self.moving_var = self.moving_var.to(X.device)
        # 保存更新过的moving_mean和moving_var
        Y, self.moving_mean, self.moving_var = batch_norm(
            X, self.gamma, self.beta, self.moving_mean,
            self.moving_var, eps=1e-5, momentum=0.9)
        return Y

第 8 节 Resnet

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class Residual(nn.Module):
    def __init__(self, input_channels, num_channels, use_1x1conv=False, strides=1):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, num_channels, kernel_size=3, padding=1, stride=strides)
        self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size=3, padding=1)
        if use_1x1conv:
            self.conv3 = nn.Conv2d(input_channels, num_channels, kernel_size=1, stride=strides)
        else:
            self.conv3 = None
        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            X = self.conv3(X)
        Y += X
        return F.relu(Y)

ResNet 的前两层跟之前介绍的 GoogLeNet 中的一样: 在输出通道数为 64、步幅为 2 的 7×77 \times 7 卷积层后,接步幅为 2 的 3×33 \times 3 的最大汇聚层。 不同之处在于 ResNet 每个卷积层后增加了批量规范化层。

1
2
3
4
b1 = nn.Sequential(nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
                   nn.BatchNorm2d(64),
                   nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

GoogLeNet 在后面接了 4 个由 Inception 块组成的模块。 ResNet 则使用 4 个由残差块组成的模块,每个模块使用若干个同样输出通道数的残差块。

第一个模块的通道数同输入通道数一致。 由于之前已经使用了步幅为 2 的最大汇聚层,所以无须减小高和宽。 之后的每个模块在第一个残差块里将上一个模块的通道数翻倍,并将高和宽减半。

下面我们来实现这个模块。注意,我们对第一个模块做了特别处理。

1
2
3
4
5
6
7
8
9
def resnet_block(input_channels, num_channels, num_residuals,
                 first_block=False):
    blk = []
    for i in range(num_residuals):
        if i == 0 and not first_block:
            blk.append(Residual(input_channels, num_channels, use_1x1conv=True, strides=2))
        else:
            blk.append(Residual(num_channels, num_channels))
    return blk
1
2
3
4
b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
b3 = nn.Sequential(*resnet_block(64, 128, 2))
b4 = nn.Sequential(*resnet_block(128, 256, 2))
b5 = nn.Sequential(*resnet_block(256, 512, 2))
1
2
3
model = nn.Sequential(b1, b2, b3, b4, b5,
                    nn.AdaptiveAvgPool2d((1,1)),
                    nn.Flatten(), nn.Linear(512, 1000))
1
2
3
4
x = torch.rand(size=(1, 3, 224, 224))
for layer in model:
    x = layer(x)
    print(layer.__class__.__name__,'output shape:\t', x.shape)
Sequential output shape:     torch.Size([1, 64, 56, 56])
Sequential output shape:     torch.Size([1, 64, 56, 56])
Sequential output shape:     torch.Size([1, 128, 28, 28])
Sequential output shape:     torch.Size([1, 256, 14, 14])
Sequential output shape:     torch.Size([1, 512, 7, 7])
AdaptiveAvgPool2d output shape:  torch.Size([1, 512, 1, 1])
Flatten output shape:    torch.Size([1, 512])
Linear output shape:     torch.Size([1, 1000])
1
sum(p.numel() for p in model.parameters())
11692520