日期
心得
昇思MindSpore 应用学习-ResNet50图像分类学习 (AI 代码解析)
图像分类是最基础的计算机视觉应用,属于有监督学习类别,如给定一张图像(猫、狗、飞机、汽车等等),判断图像所属的类别。本章将介绍使用ResNet50网络对CIFAR-10数据集进行分类。
ResNet网络介绍
ResNet50网络是2015年由微软实验室的何恺明提出,获得ILSVRC2015图像分类竞赛第一名。在ResNet网络提出之前,传统的卷积神经网络都是将一系列的卷积层和池化层堆叠得到的,但当网络堆叠到一定深度时,就会出现退化问题。下图是在CIFAR-10数据集上使用56层网络与20层网络训练误差和测试误差图,由图中数据可以看出,56层网络比20层网络训练误差和测试误差更大,随着网络的加深,其误差并没有如预想的一样减小。
ResNet网络提出了残差网络结构(Residual Network)来减轻退化问题,使用ResNet网络可以实现搭建较深的网络结构(突破1000层)。论文中使用ResNet网络在CIFAR-10数据集上的训练误差与测试误差图如下图所示,图中虚线表示训练误差,实线表示测试误差。由图中数据可以看出,ResNet网络层数越深,其训练误差和测试误差越小。
了解ResNet网络更多详细内容,参见ResNet论文。
数据集准备与加载
CIFAR-10数据集共有60000张32*32的彩色图像,分为10个类别,每类有6000张图,数据集一共有50000张训练图片和10000张评估图片。首先,如下示例使用download
接口下载并解压,目前仅支持解析二进制版本的CIFAR-10文件(CIFAR-10 binary version)。
from download import download # 从download模块导入download函数
url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-binary.tar.gz"
# 定义一个字符串变量url,表示需要下载的文件的URL地址
download(url, "./datasets-cifar10-bin", kind="tar.gz", replace=True)
# 调用download函数,下载指定URL的文件到本地目录"./datasets-cifar10-bin"
# 参数解析:
# 1. url: 要下载文件的URL
# 2. "./datasets-cifar10-bin": 本地保存下载文件的目录
# 3. kind="tar.gz": 指定下载文件的类型为.tar.gz,表示下载后需要进行解压
# 4. replace=True: 如果本地目录已经存在同名文件,是否替换(True表示替换)
- 导入模块:
from download import download
在代码开始部分,从download
模块中导入了download
函数,这个函数负责下载文件。
- 定义URL变量:
url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-binary.tar.gz"
这一行代码定义了一个字符串变量url
,表示要下载的文件的URL地址。这里的文件是CIFAR-10数据集的tar.gz压缩包。
- 调用download函数:
download(url, "./datasets-cifar10-bin", kind="tar.gz", replace=True)
这行代码调用了download
函数,并传入了四个参数:
url
:待下载文件的URL地址。"./datasets-cifar10-bin"
:下载文件保存的本地目录路径。kind="tar.gz"
:指定下载文件的类型为tar.gz
,指示函数在下载完成后进行解压。replace=True
:如果本地已经存在同名文件,是否替换它。True
表示替换,False
表示不替换。download(url, path, kind, replace)
:download
函数是一个常用的文件下载函数。其具体功能是从指定的URL下载文件,并将其保存到指定的本地目录。该函数还支持对下载的文件进行解压缩(取决于kind
参数的值),以及在本地存在同名文件时选择是否替换。url
:字符串类型,表示要下载文件的URL。path
:字符串类型,表示下载文件的保存路径。kind
:字符串类型,表示下载文件的类型,如tar.gz
,zip
等,函数会根据类型进行相应的解压缩操作。replace
:布尔类型,表示是否替换本地已有的同名文件。
‘./datasets-cifar10-bin’
下载后的数据集目录结构如下:
datasets-cifar10-bin/cifar-10-batches-bin
├── batches.meta.text
├── data_batch_1.bin
├── data_batch_2.bin
├── data_batch_3.bin
├── data_batch_4.bin
├── data_batch_5.bin
├── readme.html
└── test_batch.bin
然后,使用mindspore.dataset.Cifar10Dataset
接口来加载数据集,并进行相关图像增强操作。
import mindspore as ms # 导入MindSpore框架
import mindspore.dataset as ds # 导入MindSpore的数据集模块
import mindspore.dataset.vision as vision # 导入MindSpore的视觉处理模块
import mindspore.dataset.transforms as transforms # 导入MindSpore的数据变换模块
from mindspore import dtype as mstype # 导入MindSpore的数据类型模块
data_dir = "./datasets-cifar10-bin/cifar-10-batches-bin" # 数据集根目录
batch_size = 256 # 批量大小
image_size = 32 # 训练图像空间大小
workers = 4 # 并行线程个数
num_classes = 10 # 分类数量
def create_dataset_cifar10(dataset_dir, usage, resize, batch_size, workers):
"""
创建CIFAR-10数据集
参数:
dataset_dir: str, 数据集路径
usage: str, 数据集用途("train"或"test")
resize: int, 图像调整大小
batch_size: int, 批量大小
workers: int, 并行线程个数
返回:
data_set: MindSpore数据集对象
"""
# 加载CIFAR-10数据集
data_set = ds.Cifar10Dataset(dataset_dir=dataset_dir,
usage=usage,
num_parallel_workers=workers,
shuffle=True)
trans = [] # 图像变换操作列表
if usage == "train":
# 如果是训练集,添加随机裁剪和随机水平翻转
trans += [
vision.RandomCrop((32, 32), (4, 4, 4, 4)),
vision.RandomHorizontalFlip(prob=0.5)
]
# 通用图像变换操作
trans += [
vision.Resize(resize),
vision.Rescale(1.0 / 255.0, 0.0),
vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
vision.HWC2CHW()
]
target_trans = transforms.TypeCast(mstype.int32) # 标签变换操作
# 对图像数据进行变换
data_set = data_set.map(operations=trans,
input_columns='image',
num_parallel_workers=workers)
# 对标签数据进行类型转换
data_set = data_set.map(operations=target_trans,
input_columns='label',
num_parallel_workers=workers)
# 批量操作
data_set = data_set.batch(batch_size)
return data_set
# 获取处理后的训练与测试数据集
dataset_train = create_dataset_cifar10(dataset_dir=data_dir,
usage="train",
resize=image_size,
batch_size=batch_size,
workers=workers)
step_size_train = dataset_train.get_dataset_size()
dataset_val = create_dataset_cifar10(dataset_dir=data_dir,
usage="test",
resize=image_size,
batch_size=batch_size,
workers=workers)
step_size_val = dataset_val.get_dataset_size()
- 导入模块:
import mindspore as ms
import mindspore.dataset as ds
import mindspore.dataset.vision as vision
import mindspore.dataset.transforms as transforms
from mindspore import dtype as mstype
导入了MindSpore框架及其相关的数据集、视觉处理和数据变换模块。
- 定义数据集路径和参数:
data_dir = "./datasets-cifar10-bin/cifar-10-batches-bin" # 数据集根目录
batch_size = 256 # 批量大小
image_size = 32 # 训练图像空间大小
workers = 4 # 并行线程个数
num_classes = 10 # 分类数量
定义了数据集路径和一些参数,包括批量大小、图像尺寸、并行线程个数和分类数量。
- 创建数据集函数:
def create_dataset_cifar10(dataset_dir, usage, resize, batch_size, workers):
"""
创建CIFAR-10数据集
参数:
dataset_dir: str, 数据集路径
usage: str, 数据集用途("train"或"test")
resize: int, 图像调整大小
batch_size: int, 批量大小
workers: int, 并行线程个数
返回:
data_set: MindSpore数据集对象
"""
定义了一个函数create_dataset_cifar10
,用于创建CIFAR-10数据集,函数参数包括数据集路径、用途(训练或测试)、图像调整大小、批量大小和并行线程个数。
- 加载和变换数据:
data_set = ds.Cifar10Dataset(dataset_dir=dataset_dir,
usage=usage,
num_parallel_workers=workers,
shuffle=True)
加载CIFAR-10数据集,并根据用途添加相应的图像变换操作。
- 图像变换操作:
trans = []
if usage == "train":
trans += [
vision.RandomCrop((32, 32), (4, 4, 4, 4)),
vision.RandomHorizontalFlip(prob=0.5)
]
trans += [
vision.Resize(resize),
vision.Rescale(1.0 / 255.0, 0.0),
vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
vision.HWC2CHW()
]
target_trans = transforms.TypeCast(mstype.int32)
根据用途(训练或测试)定义图像变换操作,包括随机裁剪、随机水平翻转、调整大小、归一化和通道变换。
- 应用图像和标签变换:
data_set = data_set.map(operations=trans,
input_columns='image',
num_parallel_workers=workers)
data_set = data_set.map(operations=target_trans,
input_columns='label',
num_parallel_workers=workers)
对图像数据应用定义的变换操作,对标签数据进行类型转换。
- 批量操作:
data_set = data_set.batch(batch_size)
将数据集按批量大小分批处理。
- 获取训练和测试数据集:
dataset_train = create_dataset_cifar10(dataset_dir=data_dir,
usage="train",
resize=image_size,
batch_size=batch_size,
workers=workers)
step_size_train = dataset_train.get_dataset_size()
dataset_val = create_dataset_cifar10(dataset_dir=data_dir,
usage="test",
resize=image_size,
batch_size=batch_size,
workers=workers)
step_size_val = dataset_val.get_dataset_size()
调用create_dataset_cifar10
函数,获取处理后的训练和测试数据集,并获取数据集大小。
对CIFAR-10训练数据集进行可视化。
import matplotlib.pyplot as plt # 导入matplotlib库用于绘图
import numpy as np # 导入numpy库用于数值计算
data_iter = next(dataset_train.create_dict_iterator()) # 从训练数据集中获取下一个批次的数据
images = data_iter["image"].asnumpy() # 将图像数据转换为numpy数组
labels = data_iter["label"].asnumpy() # 将标签数据转换为numpy数组
print(f"Image shape: {images.shape}, Label shape: {labels.shape}") # 打印图像和标签的形状
# 训练数据集中,前六张图片所对应的标签
print(f"Labels: {labels[:6]}") # 打印前六张图片的标签
classes = [] # 存储类别名称的列表
with open(data_dir + "/batches.meta.txt", "r") as f: # 打开类别元数据文件
for line in f:
line = line.rstrip() # 去除行尾的空白字符
if line:
classes.append(line) # 将类别名称添加到列表中
# 训练数据集的前六张图片
plt.figure() # 创建一个新的图形
for i in range(6):
plt.subplot(2, 3, i + 1) # 创建子图
image_trans = np.transpose(images[i], (1, 2, 0)) # 将图像从CHW格式转换为HWC格式
mean = np.array([0.4914, 0.4822, 0.4465]) # 图像的均值
std = np.array([0.2023, 0.1994, 0.2010]) # 图像的标准差
image_trans = std * image_trans + mean # 反标准化图像
image_trans = np.clip(image_trans, 0, 1) # 将像素值限制在0到1之间
plt.title(f"{classes[labels[i]]}") # 设置子图标题为对应的类别名称
plt.imshow(image_trans) # 显示图像
plt.axis("off") # 关闭坐标轴
plt.show() # 显示图形
- 导入库:
import matplotlib.pyplot as plt
import numpy as np
导入matplotlib.pyplot
用于绘图,导入numpy
用于数值计算。
- 获取数据:
data_iter = next(dataset_train.create_dict_iterator())
从训练数据集中获取下一个批次的数据。
- 转换数据格式:
images = data_iter["image"].asnumpy()
labels = data_iter["label"].asnumpy()
将图像和标签数据转换为numpy数组。
- 打印数据形状和标签:
print(f"Image shape: {images.shape}, Label shape: {labels.shape}")
print(f"Labels: {labels[:6]}")
打印图像和标签的形状,以及前六张图片的标签。
- 读取类别名称:
classes = []
with open(data_dir + "/batches.meta.txt", "r") as f:
for line in f:
line = line.rstrip()
if line:
classes.append(line)
从类别元数据文件中读取类别名称并存储在列表中。
- 绘制图像:
plt.figure()
for i in range(6):
plt.subplot(2, 3, i + 1)
image_trans = np.transpose(images[i], (1, 2, 0))
mean = np.array([0.4914, 0.4822, 0.4465])
std = np.array([0.2023, 0.1994, 0.2010])
image_trans = std * image_trans + mean
image_trans = np.clip(image_trans, 0, 1)
plt.title(f"{classes[labels[i]]}")
plt.imshow(image_trans)
plt.axis("off")
plt.show()
创建一个图形,并在其中绘制前六张图片。每张图片经过反标准化处理后显示,并设置对应的类别名称作为标题。最后显示图形。
构建网络
残差网络结构(Residual Network)是ResNet网络的主要亮点,ResNet使用残差网络结构后可有效地减轻退化问题,实现更深的网络结构设计,提高网络的训练精度。本节首先讲述如何构建残差网络结构,然后通过堆叠残差网络来构建ResNet50网络。
构建残差网络结构
残差网络结构图如下图所示,残差网络由两个分支构成:一个主分支,一个shortcuts(图中弧线表示)。主分支通过堆叠一系列的卷积操作得到,shortcuts从输入直接到输出,主分支输出的特征矩阵F(x)𝐹(𝑥)加上shortcuts输出的特征矩阵x𝑥得到F(x)+x𝐹(𝑥)+𝑥,通过Relu激活函数后即为残差网络最后的输出。
残差网络结构主要由两种,一种是Building Block,适用于较浅的ResNet网络,如ResNet18和ResNet34;另一种是Bottleneck,适用于层数较深的ResNet网络,如ResNet50、ResNet101和ResNet152。
Building Block
Building Block结构图如下图所示,主分支有两层卷积网络结构:
- 主分支第一层网络以输入channel为64为例,首先通过一个3×33×3的卷积层,然后通过Batch Normalization层,最后通过Relu激活函数层,输出channel为64;
- 主分支第二层网络的输入channel为64,首先通过一个3×33×3的卷积层,然后通过Batch Normalization层,输出channel为64。
最后将主分支输出的特征矩阵与shortcuts输出的特征矩阵相加,通过Relu激活函数即为Building Block最后的输出。
主分支与shortcuts输出的特征矩阵相加时,需要保证主分支与shortcuts输出的特征矩阵shape相同。如果主分支与shortcuts输出的特征矩阵shape不相同,如输出channel是输入channel的一倍时,shortcuts上需要使用数量与输出channel相等,大小为1×11×1的卷积核进行卷积操作;若输出的图像较输入图像缩小一倍,则要设置shortcuts中卷积操作中的stride
为2,主分支第一层卷积操作的stride
也需设置为2。
如下代码定义ResidualBlockBase
类实现Building Block结构。
from typing import Type, Union, List, Optional # 导入类型提示相关模块
import mindspore.nn as nn # 导入MindSpore的神经网络模块
from mindspore.common.initializer import Normal # 导入MindSpore的初始化器
# 初始化卷积层与BatchNorm的参数
weight_init = Normal(mean=0, sigma=0.02) # 卷积层权重初始化
gamma_init = Normal(mean=1, sigma=0.02) # BatchNorm层的gamma参数初始化
class ResidualBlockBase(nn.Cell): # 定义残差块基础类,继承自MindSpore的nn.Cell
expansion: int = 1 # 最后一个卷积核数量与第一个卷积核数量相等
def __init__(self, in_channel: int, out_channel: int,
stride: int = 1, norm: Optional[nn.Cell] = None,
down_sample: Optional[nn.Cell] = None) -> None:
"""
初始化残差块基础类
参数:
in_channel: int, 输入通道数
out_channel: int, 输出通道数
stride: int, 步幅大小
norm: Optional[nn.Cell], 可选的归一化层
down_sample: Optional[nn.Cell], 可选的下采样层
"""
super(ResidualBlockBase, self).__init__()
if not norm:
self.norm = nn.BatchNorm2d(out_channel) # 如果没有指定归一化层,使用BatchNorm2d
else:
self.norm = norm
self.conv1 = nn.Conv2d(in_channel, out_channel,
kernel_size=3, stride=stride,
weight_init=weight_init) # 第一个卷积层
self.conv2 = nn.Conv2d(out_channel, out_channel,
kernel_size=3, weight_init=weight_init) # 第二个卷积层
self.relu = nn.ReLU() # ReLU激活函数
self.down_sample = down_sample # 下采样层
def construct(self, x):
"""
前向传播函数
参数:
x: 输入张量
返回:
out: 输出张量
"""
identity = x # shortcuts分支
out = self.conv1(x) # 主分支第一层:3*3卷积层
out = self.norm(out)
out = self.relu(out)
out = self.conv2(out) # 主分支第二层:3*3卷积层
out = self.norm(out)
if self.down_sample is not None:
identity = self.down_sample(x) # 下采样操作
out += identity # 输出为主分支与shortcuts之和
out = self.relu(out)
return out
- 导入模块:
from typing import Type, Union, List, Optional
import mindspore.nn as nn
from mindspore.common.initializer import Normal
导入了必要的模块和函数,其中包括类型提示相关模块、MindSpore的神经网络模块以及初始化器。
- 初始化参数:
weight_init = Normal(mean=0, sigma=0.02)
gamma_init = Normal(mean=1, sigma=0.02)
定义了卷积层权重初始化和BatchNorm层的gamma参数初始化。
- 定义残差块基础类:
class ResidualBlockBase(nn.Cell):
expansion: int = 1
def __init__(self, in_channel: int, out_channel: int,
stride: int = 1, norm: Optional[nn.Cell] = None,
down_sample: Optional[nn.Cell] = None) -> None:
super(ResidualBlockBase, self).__init__()
if not norm:
self.norm = nn.BatchNorm2d(out_channel)
else:
self.norm = norm
self.conv1 = nn.Conv2d(in_channel, out_channel,
kernel_size=3, stride=stride,
weight_init=weight_init)
self.conv2 = nn.Conv2d(out_channel, out_channel,
kernel_size=3, weight_init=weight_init)
self.relu = nn.ReLU()
self.down_sample = down_sample
定义了一个残差块基础类,包含初始化函数和前向传播函数。
- 前向传播函数:
def construct(self, x):
identity = x
out = self.conv1(x)
out = self.norm(out)
out = self.relu(out)
out = self.conv2(out)
out = self.norm(out)
if self.down_sample is not None:
identity = self.down_sample(x)
out += identity
out = self.relu(out)
return out
定义了前向传播函数construct
,该函数包括主分支和shortcuts分支,最后将主分支输出与shortcuts输出相加并通过ReLU激活函数。
Bottleneck
Bottleneck结构图如下图所示,在输入相同的情况下Bottleneck结构相对Building Block结构的参数数量更少,更适合层数较深的网络,ResNet50使用的残差结构就是Bottleneck。该结构的主分支有三层卷积结构,分别为1×11×1的卷积层、3×33×3卷积层和1×11×1的卷积层,其中1×11×1的卷积层分别起降维和升维的作用。
- 主分支第一层网络以输入channel为256为例,首先通过数量为64,大小为1×11×1的卷积核进行降维,然后通过Batch Normalization层,最后通过Relu激活函数层,其输出channel为64;
- 主分支第二层网络通过数量为64,大小为3×33×3的卷积核提取特征,然后通过Batch Normalization层,最后通过Relu激活函数层,其输出channel为64;
- 主分支第三层通过数量为256,大小1×11×1的卷积核进行升维,然后通过Batch Normalization层,其输出channel为256。
最后将主分支输出的特征矩阵与shortcuts输出的特征矩阵相加,通过Relu激活函数即为Bottleneck最后的输出。
主分支与shortcuts输出的特征矩阵相加时,需要保证主分支与shortcuts输出的特征矩阵shape相同。如果主分支与shortcuts输出的特征矩阵shape不相同,如输出channel是输入channel的一倍时,shortcuts上需要使用数量与输出channel相等,大小为1×11×1的卷积核进行卷积操作;若输出的图像较输入图像缩小一倍,则要设置shortcuts中卷积操作中的stride
为2,主分支第二层卷积操作的stride
也需设置为2。
如下代码定义ResidualBlock
类实现Bottleneck结构。
class ResidualBlock(nn.Cell):
expansion = 4 # 最后一个卷积核的数量是第一个卷积核数量的4倍
def __init__(self, in_channel: int, out_channel: int,
stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
"""
初始化残差块类
参数:
in_channel: int, 输入通道数
out_channel: int, 输出通道数
stride: int, 步幅大小
down_sample: Optional[nn.Cell], 可选的下采样层
"""
super(ResidualBlock, self).__init__()
# 主分支的卷积层和归一化层
self.conv1 = nn.Conv2d(in_channel, out_channel,
kernel_size=1, weight_init=weight_init) # 1*1 卷积层
self.norm1 = nn.BatchNorm2d(out_channel) # 归一化层
self.conv2 = nn.Conv2d(out_channel, out_channel,
kernel_size=3, stride=stride,
weight_init=weight_init) # 3*3 卷积层
self.norm2 = nn.BatchNorm2d(out_channel) # 归一化层
self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
kernel_size=1, weight_init=weight_init) # 1*1 卷积层
self.norm3 = nn.BatchNorm2d(out_channel * self.expansion) # 归一化层
self.relu = nn.ReLU() # ReLU激活函数
self.down_sample = down_sample # 下采样层
def construct(self, x):
"""
前向传播函数
参数:
x: 输入张量
返回:
out: 输出张量
"""
identity = x # shortcuts分支
# 主分支第一层:1*1卷积层
out = self.conv1(x)
out = self.norm1(out)
out = self.relu(out)
# 主分支第二层:3*3卷积层
out = self.conv2(out)
out = self.norm2(out)
out = self.relu(out)
# 主分支第三层:1*1卷积层
out = self.conv3(out)
out = self.norm3(out)
if self.down_sample is not None:
identity = self.down_sample(x) # 对shortcuts分支进行下采样
out += identity # 输出为主分支与shortcuts之和
out = self.relu(out) # 通过ReLU激活函数
return out # 返回输出张量
- 定义残差块类:
class ResidualBlock(nn.Cell):
expansion = 4 # 最后一个卷积核的数量是第一个卷积核数量的4倍
def __init__(self, in_channel: int, out_channel: int,
stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
super(ResidualBlock, self).__init__()
...
定义一个残差块类ResidualBlock
,继承自MindSpore的nn.Cell
。该类的expansion
属性设置为4,表示输出通道数是输入通道数的4倍。
- 初始化函数:
def __init__(self, in_channel: int, out_channel: int,
stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
super(ResidualBlock, self).__init__()
# 主分支的卷积层和归一化层
self.conv1 = nn.Conv2d(in_channel, out_channel,
kernel_size=1, weight_init=weight_init)
self.norm1 = nn.BatchNorm2d(out_channel)
self.conv2 = nn.Conv2d(out_channel, out_channel,
kernel_size=3, stride=stride,
weight_init=weight_init)
self.norm2 = nn.BatchNorm2d(out_channel)
self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
kernel_size=1, weight_init=weight_init)
self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)
self.relu = nn.ReLU()
self.down_sample = down_sample
初始化函数定义了主分支的卷积层和归一化层,分别是一个1x1卷积层、一个3x3卷积层和一个1x1卷积层,并且使用ReLU激活函数。down_sample
参数用于处理短连接分支。
- 前向传播函数:
def construct(self, x):
identity = x # shortcuts分支
# 主分支第一层:1*1卷积层
out = self.conv1(x)
out = self.norm1(out)
out = self.relu(out)
# 主分支第二层:3*3卷积层
out = self.conv2(out)
out = self.norm2(out)
out = self.relu(out)
# 主分支第三层:1*1卷积层
out = self.conv3(out)
out = self.norm3(out)
if self.down_sample is not None:
identity = self.down_sample(x) # 对shortcuts分支进行下采样
out += identity # 输出为主分支与shortcuts之和
out = self.relu(out) # 通过ReLU激活函数
return out # 返回输出张量
前向传播函数construct
包括主分支和shortcuts分支。主分支依次经过三个卷积层和归一化层,再通过ReLU激活函数。如果down_sample
不为空,则对shortcuts分支进行下采样。最后将主分支和shortcuts分支相加,并通过ReLU激活函数,返回最终输出。
构建ResNet50网络
ResNet网络层结构如下图所示,以输入彩色图像224×224为例,首先通过数量64,卷积核大小为7×7,stride为2的卷积层conv1,该层输出图片大小为112×112,输出channel为64;然后通过一个3×3的最大下采样池化层,该层输出图片大小为56×56,输出channel为64;再堆叠4个残差网络块(conv2_x、conv3_x、conv4_x和conv5_x),此时输出图片大小为7×7,输出channel为2048;最后通过一个平均池化层、全连接层和softmax,得到分类概率。
对于每个残差网络块,以ResNet50网络中的conv2_x为例,其由3个Bottleneck结构堆叠而成,每个Bottleneck输入的channel为64,输出channel为256。
如下示例定义make_layer
实现残差块的构建,其参数如下所示:
last_out_channel
:上一个残差网络输出的通道数。block
:残差网络的类别,分别为ResidualBlockBase
和ResidualBlock
。channel
:残差网络输入的通道数。block_nums
:残差网络块堆叠的个数。stride
:卷积移动的步幅。
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
channel: int, block_nums: int, stride: int = 1):
"""
构建残差层函数
参数:
last_out_channel: int, 上一层的输出通道数
block: Type[Union[ResidualBlockBase, ResidualBlock]], 残差块类型,可以是ResidualBlockBase或ResidualBlock
channel: int, 当前层的通道数
block_nums: int, 残差块的数量
stride: int, 步幅大小
返回:
nn.SequentialCell, 包含多个残差块的顺序层
"""
down_sample = None # shortcuts分支
if stride != 1 or last_out_channel != channel * block.expansion:
down_sample = nn.SequentialCell([
nn.Conv2d(last_out_channel, channel * block.expansion,
kernel_size=1, stride=stride, weight_init=weight_init),
nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)
])
# 如果步幅不等于1或者输入输出通道数不匹配,通过1x1卷积和BatchNorm实现下采样
layers = []
layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))
# 添加第一个残差块,并进行下采样(如果需要)
in_channel = channel * block.expansion
# 更新输入通道数为扩展后的通道数
for _ in range(1, block_nums):
layers.append(block(in_channel, channel))
# 堆叠剩余的残差块
return nn.SequentialCell(layers)
# 返回包含多个残差块的顺序层
- 函数定义:
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
channel: int, block_nums: int, stride: int = 1):
定义了一个函数make_layer
,用于构建包含多个残差块的层。参数包括上一层的输出通道数、残差块类型(可以是ResidualBlockBase
或ResidualBlock
)、当前层的通道数、残差块的数量和步幅大小。
- 初始化
down_sample
:
down_sample = None # shortcuts分支
if stride != 1或 last_out_channel != channel * block.expansion:
down_sample = nn.SequentialCell([
nn.Conv2d(last_out_channel, channel * block.expansion,
kernel_size=1, stride=stride, weight_init=weight_init),
nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)
])
初始化down_sample
为None
。如果步幅不等于1或者输入通道数和输出通道数不匹配,通过1x1卷积和BatchNorm实现下采样,确保输入和输出尺寸匹配。
- 构建残差块层:
layers = []
layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))
创建一个列表layers
用于存储残差块。首先添加一个残差块,并进行下采样(如果需要)。
- 更新输入通道数:
in_channel = channel * block.expansion
更新输入通道数为扩展后的通道数。
- 堆叠残差块:
for _ in range(1, block_nums):
layers.append(block(in_channel, channel))
使用循环堆叠剩余数量的残差块,构建完整的残差层。
- 返回顺序层:
return nn.SequentialCell(layers)
返回一个包含多个残差块的顺序层nn.SequentialCell
。这个顺序层可以作为一个整体应用于输入张量上,实现残差网络的多层结构。
ResNet50网络共有5个卷积结构,一个平均池化层,一个全连接层,以CIFAR-10数据集为例:
- conv1:输入图片大小为32×3232×32,输入channel为3。首先经过一个卷积核数量为64,卷积核大小为7×77×7,stride为2的卷积层;然后通过一个Batch Normalization层;最后通过Reul激活函数。该层输出feature map大小为16×1616×16,输出channel为64。
- conv2_x:输入feature map大小为16×1616×16,输入channel为64。首先经过一个卷积核大小为3×33×3,stride为2的最大下采样池化操作;然后堆叠3个[1×1,64;3×3,64;1×1,256][1×1,64;3×3,64;1×1,256]结构的Bottleneck。该层输出feature map大小为8×88×8,输出channel为256。
- conv3_x:输入feature map大小为8×88×8,输入channel为256。该层堆叠4个[1×1,128;3×3,128;1×1,512]结构的Bottleneck。该层输出feature map大小为4×44×4,输出channel为512。
- conv4_x:输入feature map大小为4×44×4,输入channel为512。该层堆叠6个[1×1,256;3×3,256;1×1,1024]结构的Bottleneck。该层输出feature map大小为2×22×2,输出channel为1024。
- conv5_x:输入feature map大小为2×22×2,输入channel为1024。该层堆叠3个[1×1,512;3×3,512;1×1,2048]结构的Bottleneck。该层输出feature map大小为1×11×1,输出channel为2048。
- average pool & fc:输入channel为2048,输出channel为分类的类别数。
如下示例代码实现ResNet50模型的构建,通过用调函数resnet50
即可构建ResNet50模型,函数resnet50
参数如下:
num_classes
:分类的类别数,默认类别数为1000。pretrained
:下载对应的训练模型,并加载预训练模型中的参数到网络中。
from mindspore import load_checkpoint, load_param_into_net
class ResNet(nn.Cell):
def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
layer_nums: List[int], num_classes: int, input_channel: int) -> None:
"""
初始化ResNet模型
参数:
block: Type[Union[ResidualBlockBase, ResidualBlock]], 残差块类型
layer_nums: List[int], 每个残差层的残差块数量
num_classes: int, 分类数
input_channel: int, 输入通道数
"""
super(ResNet, self).__init__()
self.relu = nn.ReLU()
# 第一个卷积层,输入channel为3(彩色图像),输出channel为64
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init)
self.norm = nn.BatchNorm2d(64)
# 最大池化层,缩小图片的尺寸
self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
# 各个残差网络结构块定义
self.layer1 = make_layer(64, block, 64, layer_nums[0])
self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
# 平均池化层
self.avg_pool = nn.AvgPool2d()
# flattern层
self.flatten = nn.Flatten()
# 全连接层
self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)
def construct(self, x):
"""
前向传播函数
参数:
x: 输入张量
返回:
x: 输出张量
"""
x = self.conv1(x)
x = self.norm(x)
x = self.relu(x)
x = self.max_pool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avg_pool(x)
x = self.flatten(x)
x = self.fc(x)
return x
- 导入模块:
from mindspore import load_checkpoint, load_param_into_net
导入MindSpore的检查点加载和参数加载功能。
- 定义ResNet类:
class ResNet(nn.Cell):
def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
layer_nums: List[int], num_classes: int, input_channel: int) -> None:
super(ResNet, self).__init__()
...
定义ResNet类,继承自MindSpore的nn.Cell
。初始化函数接受残差块类型、每个残差层的残差块数量、分类数和输入通道数作为参数。
- 初始化网络层:
self.relu = nn.ReLU()
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init)
self.norm = nn.BatchNorm2d(64)
self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
self.layer1 = make_layer(64, block, 64, layer_nums[0])
self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
self.avg_pool = nn.AvgPool2d()
self.flatten = nn.Flatten()
self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes)
初始化网络的各个层,包括卷积层、归一化层、池化层、残差层、平均池化层、展平层和全连接层。
- 前向传播函数:
def construct(self, x):
x = self.conv1(x)
x = self.norm(x)
x = self.relu(x)
x = self.max_pool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avg_pool(x)
x = self.flatten(x)
x = self.fc(x)
return x
定义前向传播函数construct
,依次通过各个层,最终输出分类结果。
def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],
layers: List[int], num_classes: int, pretrained: bool, pretrained_ckpt: str,
input_channel: int):
"""
构建ResNet模型,并根据需要加载预训练模型
参数:
model_url: str, 预训练模型的下载地址
block: Type[Union[ResidualBlockBase, ResidualBlock]], 残差块类型
layers: List[int], 每个残差层的残差块数量
num_classes: int, 分类数
pretrained: bool, 是否加载预训练模型
pretrained_ckpt: str, 预训练模型的本地存储路径
input_channel: int, 输入通道数
返回:
model: ResNet模型
"""
model = ResNet(block, layers, num_classes, input_channel)
if pretrained:
# 加载预训练模型
download(url=model_url, path=pretrained_ckpt, replace=True)
param_dict = load_checkpoint(pretrained_ckpt)
load_param_into_net(model, param_dict)
return model
def resnet50(num_classes: int = 1000, pretrained: bool = False):
"""ResNet50模型
参数:
num_classes: int, 分类数,默认为1000
pretrained: bool, 是否加载预训练模型,默认为False
返回:
ResNet50模型
"""
resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"
resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,
pretrained, resnet50_ckpt, 2048)
- 定义
_resnet
函数:
def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],
layers: List[int], num_classes: int, pretrained: bool, pretrained_ckpt: str,
input_channel: int):
定义一个内部函数 _resnet
,用于构建 ResNet 模型,并根据需要加载预训练模型。参数包括预训练模型的下载地址、残差块类型、每个残差层的残差块数量、分类数、是否加载预训练模型、预训练模型的本地存储路径和输入通道数。
- 初始化 ResNet 模型:
model = ResNet(block, layers, num_classes, input_channel)
使用给定的参数初始化 ResNet 模型。
- 加载预训练模型:
if pretrained:
# 加载预训练模型
download(url=model_url, path=pretrained_ckpt, replace=True)
param_dict = load_checkpoint(pretrained_ckpt)
load_param_into_net(model, param_dict)
如果 pretrained
参数为 True
,则下载预训练模型,并加载到当前模型中。
- 返回模型:
return model
返回构建好的 ResNet 模型。
- 定义
resnet50
函数:
def resnet50(num_classes: int = 1000, pretrained: bool = False):
"""ResNet50模型
参数:
num_classes: int, 分类数,默认为1000
pretrained: bool, 是否加载预训练模型,默认为False
返回:
ResNet50模型
"""
resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"
resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes,
pretrained, resnet50_ckpt, 2048)
定义 resnet50
函数,用于构建 ResNet-50 模型。默认分类数为 1000,是否加载预训练模型默认为 False
。构建 ResNet-50 模型时,调用 _resnet
函数,并传入 ResNet-50 相应的参数,包括预训练模型的下载地址和本地存储路径。
模型训练与评估
本节使用ResNet50预训练模型进行微调。调用resnet50
构造ResNet50模型,并设置pretrained
参数为True,将会自动下载ResNet50预训练模型,并加载预训练模型中的参数到网络中。然后定义优化器和损失函数,逐个epoch打印训练的损失值和评估精度,并保存评估精度最高的ckpt文件(resnet50-best.ckpt)到当前路径的./BestCheckPoint下。
由于预训练模型全连接层(fc)的输出大小(对应参数num_classes
)为1000, 为了成功加载预训练权重,我们将模型的全连接输出大小设置为默认的1000。CIFAR10数据集共有10个分类,在使用该数据集进行训练时,需要将加载好预训练权重的模型全连接层输出大小重置为10。
此处我们展示了5个epochs的训练过程,如果想要达到理想的训练效果,建议训练80个epochs。
# 定义ResNet50网络
network = resnet50(pretrained=True)
# 全连接层输入层的大小
in_channel = network.fc.in_channels
fc = nn.Dense(in_channels=in_channel, out_channels=10)
# 重置全连接层
network.fc = fc
- 定义ResNet50网络:
network = resnet50(pretrained=True)
调用 resnet50
函数,并设置 pretrained=True
,以加载预训练的 ResNet-50 模型。
- 获取全连接层输入层的大小:
in_channel = network.fc.in_channels
获取当前 ResNet-50 模型中全连接层(fc
)的输入通道数(in_channels
)。
- 定义新的全连接层:
fc = nn.Dense(in_channels=in_channel, out_channels=10)
创建一个新的全连接层(nn.Dense
),输入通道数为 in_channel
,输出通道数为 10(假设分类数为 10)。
- 重置全连接层:
network.fc = fc
将 ResNet-50 模型中的全连接层替换为新定义的全连接层。
通过这些步骤,你可以加载预训练的 ResNet-50 模型,并根据需要调整其全连接层以适应新的分类任务。
# 设置学习率
num_epochs = 5
lr = nn.cosine_decay_lr(min_lr=0.00001, max_lr=0.001, total_step=step_size_train * num_epochs,
step_per_epoch=step_size_train, decay_epoch=num_epochs)
# 定义优化器和损失函数
opt = nn.Momentum(params=network.trainable_params(), learning_rate=lr, momentum=0.9)
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
def forward_fn(inputs, targets):
logits = network(inputs)
loss = loss_fn(logits, targets)
return loss
grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)
def train_step(inputs, targets):
loss, grads = grad_fn(inputs, targets)
opt(grads)
return loss
- 设置学习率:
num_epochs = 5
lr = nn.cosine_decay_lr(min_lr=0.00001, max_lr=0.001, total_step=step_size_train * num_epochs,
step_per_epoch=step_size_train, decay_epoch=num_epochs)
使用余弦退火学习率调度器(nn.cosine_decay_lr
)来设置学习率。min_lr
和 max_lr
分别是学习率的最小值和最大值,total_step
是总的训练步数,step_per_epoch
是每个 epoch 的步数,decay_epoch
是退火的 epoch 数。
- 定义优化器和损失函数:
opt = nn.Momentum(params=network.trainable_params(), learning_rate=lr, momentum=0.9)
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
定义优化器(nn.Momentum
)和损失函数(nn.SoftmaxCrossEntropyWithLogits
)。优化器使用动量法,学习率为 lr
,动量参数为 0.9。损失函数为稀疏的 softmax 交叉熵损失,结果取平均值。
- 定义前向传播函数:
def forward_fn(inputs, targets):
logits = network(inputs)
loss = loss_fn(logits, targets)
return loss
定义前向传播函数 forward_fn
,输入为数据和标签,输出为损失值。
- 定义梯度计算函数:
grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)
使用 MindSpore 的 value_and_grad
函数来定义梯度计算函数 grad_fn
,该函数会计算前向传播的损失值和对应的梯度。
- 定义训练步骤函数:
def train_step(inputs, targets):
loss, grads = grad_fn(inputs, targets)
opt(grads)
return loss
定义训练步骤函数 train_step
,输入为数据和标签,输出为损失值。该函数会计算损失和梯度,并使用优化器更新模型参数。
通过这些步骤,你可以设置学习率调度器、定义优化器和损失函数,并实现模型的训练步骤。
import os
# 创建迭代器
data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs)
data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs)
# 最佳模型存储路径
best_acc = 0
best_ckpt_dir = "./BestCheckpoint"
best_ckpt_path = "./BestCheckpoint/resnet50-best.ckpt"
if not os.path.exists(best_ckpt_dir):
os.mkdir(best_ckpt_dir)
- 创建迭代器:
data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs)
data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs)
使用 create_tuple_iterator
方法为训练集和验证集创建迭代器,num_epochs
参数指定迭代器在训练过程中迭代的 epoch 数。
- 最佳模型存储路径:
best_acc = 0
best_ckpt_dir = "./BestCheckpoint"
best_ckpt_path = "./BestCheckpoint/resnet50-best.ckpt"
定义变量 best_acc
用于记录最佳准确率,best_ckpt_dir
和 best_ckpt_path
用于指定最佳模型的存储目录和路径。
- 检查并创建目录:
if not os.path.exists(best_ckpt_dir):
os.mkdir(best_ckpt_dir)
检查 best_ckpt_dir
目录是否存在,如果不存在则使用 os.mkdir
方法创建该目录。
通过这些步骤,你可以为训练集和验证集创建迭代器,并设置最佳模型的存储路径和目录。
import mindspore.ops as ops
def train(data_loader, epoch):
"""模型训练"""
losses = []
network.set_train(True)
for i, (images, labels) in enumerate(data_loader):
loss = train_step(images, labels)
if i % 100 == 0 or i == step_size_train - 1:
print('Epoch: [%3d/%3d], Steps: [%3d/%3d], Train Loss: [%5.3f]' %
(epoch + 1, num_epochs, i + 1, step_size_train, loss))
losses.append(loss)
return sum(losses) / len(losses)
def evaluate(data_loader):
"""模型验证"""
network.set_train(False)
correct_num = 0.0 # 预测正确个数
total_num = 0.0 # 预测总数
for images, labels in data_loader:
logits = network(images)
pred = logits.argmax(axis=1) # 预测结果
correct = ops.equal(pred, labels).reshape((-1,))
correct_num += correct.sum().asnumpy()
total_num += correct.shape[0]
acc = correct_num / total_num # 准确率
return acc
- 导入必要的库:
import mindspore.ops as ops
导入 MindSpore 的操作函数库 ops
,用于模型的验证过程。
- **训练函数 **
train
:
def train(data_loader, epoch):
"""模型训练"""
losses = []
network.set_train(True)
for i, (images, labels) in enumerate(data_loader):
loss = train_step(images, labels)
if i % 100 == 0 or i == step_size_train - 1:
print('Epoch: [%3d/%3d], Steps: [%3d/%3d], Train Loss: [%5.3f]' %
(epoch + 1, num_epochs, i + 1, step_size_train, loss))
losses.append(loss)
return sum(losses) / len(losses)
- 模型设置为训练模式:
network.set_train(True)
设置模型为训练模式。
- 训练数据迭代:
for i, (images, labels) in enumerate(data_loader):
loss = train_step(images, labels)
if i % 100 == 0 or i == step_size_train - 1:
print('Epoch: [%3d/%3d], Steps: [%3d/%3d], Train Loss: [%5.3f]' %
(epoch + 1, num_epochs, i + 1, step_size_train, loss))
losses.append(loss)
使用训练数据迭代器 data_loader
进行迭代训练,每完成100个步骤或最后一个步骤时打印当前的训练损失。
- 返回平均损失:
return sum(losses) / len(losses)
计算并返回本 epoch 的平均损失。
- **验证函数 **
evaluate
:
def evaluate(data_loader):
"""模型验证"""
network.set_train(False)
correct_num = 0.0
total_num = 0.0
for images, labels in data_loader:
logits = network(images)
pred = logits.argmax(axis=1)
correct = ops.equal(pred, labels).reshape((-1,))
correct_num += correct.sum().asnumpy()
total_num += correct.shape[0]
acc = correct_num / total_num
return acc
- 模型设置为评估模式:
network.set_train(False)
设置模型为评估模式。
- 验证数据迭代:
correct_num = 0.0
total_num = 0.0
for images, labels in data_loader:
logits = network(images)
pred = logits.argmax(axis=1)
correct = ops.equal(pred, labels).reshape((-1,))
correct_num += correct.sum().asnumpy()
total_num += correct.shape[0]
使用验证数据迭代器 data_loader
进行迭代验证。计算预测结果 pred
和真实标签 labels
是否相等,对应正确预测的个数累加到 correct_num
,累加总的样本数到 total_num
。
- 计算并返回准确率:
acc = correct_num / total_num
return acc
计算并返回准确率 acc
。
通过这些步骤,你可以实现模型的训练和验证过程。训练函数 train
用于模型的训练,每个 epoch 打印部分训练损失。验证函数 evaluate
用于模型的评估,返回评估准确率。
# 开始循环训练
print("Start Training Loop ...")
for epoch in range(num_epochs):
curr_loss = train(data_loader_train, epoch)
curr_acc = evaluate(data_loader_val)
print("-" * 50)
print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (
epoch + 1, num_epochs, curr_loss, curr_acc
))
print("-" * 50)
# 保存当前预测准确率最高的模型
if curr_acc > best_acc:
best_acc = curr_acc
ms.save_checkpoint(network, best_ckpt_path)
print("=" * 80)
print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "
f"save the best ckpt file in {best_ckpt_path}", flush=True)
- 开始训练循环:
print("Start Training Loop ...")
打印开始训练循环的提示信息。
- 训练和验证循环:
for epoch in range(num_epochs):
curr_loss = train(data_loader_train, epoch)
curr_acc = evaluate(data_loader_val)
print("-" * 50)
print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (
epoch + 1, num_epochs, curr_loss, curr_acc
))
print("-" * 50)
- 使用
range(num_epochs)
创建一个循环,迭代指定的 epoch 数。 - 每个 epoch 调用
train
函数进行训练,并调用evaluate
函数进行验证。 - 打印当前 epoch 的平均训练损失和验证准确率。
- 保存最佳模型:
if curr_acc > best_acc:
best_acc = curr_acc
ms.save_checkpoint(network, best_ckpt_path)
- 如果当前 epoch 的验证准确率
curr_acc
高于之前记录的最佳准确率best_acc
,更新best_acc
。 - 使用
ms.save_checkpoint
函数保存当前的模型参数到best_ckpt_path
。
- 结束训练循环:
print("=" * 80)
print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "
f"save the best ckpt file in {best_ckpt_path}", flush=True)
- 打印提示信息,显示所有 epoch 训练完成后的最佳准确率
best_acc
,以及保存的最佳模型文件路径best_ckpt_path
。 flush=True
确保立即刷新输出缓冲区,将信息打印到控制台。
通过这些步骤,你可以实现模型的训练和验证循环。每个 epoch 完成后,打印训练损失和验证准确率,并保存最佳准确率的模型文件。
可视化模型预测
定义visualize_model
函数,使用上述验证精度最高的模型对CIFAR-10测试数据集进行预测,并将预测结果可视化。若预测字体颜色为蓝色表示为预测正确,预测字体颜色为红色则表示预测错误。
由上面的结果可知,5个epochs下模型在验证数据集的预测准确率在70%左右,即一般情况下,6张图片中会有2张预测失败。如果想要达到理想的训练效果,建议训练80个epochs。
import matplotlib.pyplot as plt
import mindspore as ms
import numpy as np
from resnet import resnet50 # 确保已定义或导入resnet50模型
def visualize_model(best_ckpt_path, dataset_val):
num_class = 10 # 对10类图像进行分类
# 初始化模型
net = resnet50(num_class)
# 加载模型参数
param_dict = ms.load_checkpoint(best_ckpt_path)
ms.load_param_into_net(net, param_dict)
# 加载验证集的数据进行验证
data = next(dataset_val.create_dict_iterator())
images = data["image"]
labels = data["label"]
# 预测图像类别
output = net(images)
pred = np.argmax(output.asnumpy(), axis=1)
# 图像分类标签
classes = []
with open(data_dir + "/batches.meta.txt", "r") as f:
for line in f:
line = line.rstrip()
if line:
classes.append(line)
# 显示图像及图像的预测值
plt.figure()
for i in range(6):
plt.subplot(2, 3, i + 1)
# 若预测正确,显示为蓝色;若预测错误,显示为红色
color = 'blue' if pred[i] == labels.asnumpy()[i] else 'red'
plt.title('predict:{}'.format(classes[pred[i]]), color=color)
picture_show = np.transpose(images.asnumpy()[i], (1, 2, 0))
mean = np.array([0.4914, 0.4822, 0.4465])
std = np.array([0.2023, 0.1994, 0.2010])
picture_show = std * picture_show + mean
picture_show = np.clip(picture_show, 0, 1)
plt.imshow(picture_show)
plt.axis('off')
plt.show()
# 使用测试数据集进行验证
visualize_model(best_ckpt_path=best_ckpt_path, dataset_val=dataset_val)
- 导入必要的库:
import matplotlib.pyplot as plt
import mindspore as ms
import numpy as np
from resnet import resnet50 # 确保已定义或导入resnet50模型
导入 matplotlib.pyplot
用于绘图,mindspore
用于加载和使用模型,numpy
进行数值计算,resnet50
模型用于图像分类。
- **定义模型可视化函数 **
visualize_model
:
def visualize_model(best_ckpt_path, dataset_val):
num_class = 10 # 对10类图像进行分类
net = resnet50(num_class)
param_dict = ms.load_checkpoint(best_ckpt_path)
ms.load_param_into_net(net, param_dict)
- 定义函数
visualize_model
,参数包括最佳模型路径best_ckpt_path
和验证数据集dataset_val
。 - 初始化
resnet50
模型,并加载指定路径的模型参数。
- 加载验证集数据:
data = next(dataset_val.create_dict_iterator())
images = data["image"]
labels = data["label"]
使用验证数据集的迭代器获取数据,包括图像和标签。
- 预测图像类别:
output = net(images)
pred = np.argmax(output.asnumpy(), axis=1)
使用模型对图像进行预测,得到输出后,使用 argmax
获取预测的类别。
- 读取图像分类标签:
classes = []
with open(data_dir + "/batches.meta.txt", "r") as f:
for line in f:
line = line.rstrip()
if line:
classes.append(line)
从文件中读取分类标签,存储在 classes
列表中。
- 显示图像及其预测结果:
plt.figure()
for i in range(6):
plt.subplot(2, 3, i + 1)
color = 'blue' if pred[i] == labels.asnumpy()[i] else 'red'
plt.title('predict:{}'.format(classes[pred[i]]), color=color)
picture_show = np.transpose(images.asnumpy()[i], (1, 2, 0))
mean = np.array([0.4914, 0.4822, 0.4465])
std = np.array([0.2023, 0.1994, 0.2010])
picture_show = std * picture_show + mean
picture_show = np.clip(picture_show, 0, 1)
plt.imshow(picture_show)
plt.axis('off')
plt.show()
- 创建一个图形窗口,循环显示6张图像。
- 如果预测正确,标题颜色为蓝色;如果预测错误,标题颜色为红色。
- 对图像进行标准化还原并显示。
- 调用可视化函数:
visualize_model(best_ckpt_path=best_ckpt_path, dataset_val=dataset_val)
调用 visualize_model
函数,使用测试数据集进行验证。
通过这些步骤,你可以加载最佳模型参数,并可视化模型在验证集上的预测结果。每张图像将显示其预测类别及其实际类别的颜色标注。
整体代码
#!/usr/bin/env python
# coding: utf-8
# # ResNet50图像分类
# [![下载Notebook](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image14539567018573547155.png)](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/r2.3/tutorials/application/zh_cn/cv/mindspore_resnet50.ipynb) [![下载样例代码](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image10706878759923307322.png)](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/r2.3/tutorials/application/zh_cn/cv/mindspore_resnet50.py) [![查看源文件](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image4568957979974431533.png)](https://gitee.com/mindspore/docs/blob/r2.3/tutorials/application/source_zh_cn/cv/resnet50.ipynb)
# 图像分类是最基础的计算机视觉应用,属于有监督学习类别,如给定一张图像(猫、狗、飞机、汽车等等),判断图像所属的类别。本章将介绍使用ResNet50网络对CIFAR-10数据集进行分类。
# ## ResNet网络介绍
# ResNet50网络是2015年由微软实验室的何恺明提出,获得ILSVRC2015图像分类竞赛第一名。在ResNet网络提出之前,传统的卷积神经网络都是将一系列的卷积层和池化层堆叠得到的,但当网络堆叠到一定深度时,就会出现退化问题。下图是在CIFAR-10数据集上使用56层网络与20层网络训练误差和测试误差图,由图中数据可以看出,56层网络比20层网络训练误差和测试误差更大,随着网络的加深,其误差并没有如预想的一样减小。
# ![resnet-1](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image13235098901558598171.png)
# ResNet网络提出了残差网络结构(Residual Network)来减轻退化问题,使用ResNet网络可以实现搭建较深的网络结构(突破1000层)。论文中使用ResNet网络在CIFAR-10数据集上的训练误差与测试误差图如下图所示,图中虚线表示训练误差,实线表示测试误差。由图中数据可以看出,ResNet网络层数越深,其训练误差和测试误差越小。
# ![resnet-4](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image3983657854719650342.png)
# > 了解ResNet网络更多详细内容,参见[ResNet论文](https://arxiv.org/pdf/1512.03385.pdf)。
# ## 数据集准备与加载
# [CIFAR-10数据集](http://www.cs.toronto.edu/~kriz/cifar.html)共有60000张32*32的彩色图像,分为10个类别,每类有6000张图,数据集一共有50000张训练图片和10000张评估图片。首先,如下示例使用`download`接口下载并解压,目前仅支持解析二进制版本的CIFAR-10文件(CIFAR-10 binary version)。
# In[1]:
from download import download
url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/cifar-10-binary.tar.gz"
download(url, "./datasets-cifar10-bin", kind="tar.gz", replace=True)
# 下载后的数据集目录结构如下:
#
# ```text
# datasets-cifar10-bin/cifar-10-batches-bin
# ├── batches.meta.text
# ├── data_batch_1.bin
# ├── data_batch_2.bin
# ├── data_batch_3.bin
# ├── data_batch_4.bin
# ├── data_batch_5.bin
# ├── readme.html
# └── test_batch.bin
#
# ```
#
# 然后,使用`mindspore.dataset.Cifar10Dataset`接口来加载数据集,并进行相关图像增强操作。
# In[2]:
import mindspore as ms
import mindspore.dataset as ds
import mindspore.dataset.vision as vision
import mindspore.dataset.transforms as transforms
from mindspore import dtype as mstype
data_dir = "./datasets-cifar10-bin/cifar-10-batches-bin" # 数据集根目录
batch_size = 256 # 批量大小
image_size = 32 # 训练图像空间大小
workers = 4 # 并行线程个数
num_classes = 10 # 分类数量
def create_dataset_cifar10(dataset_dir, usage, resize, batch_size, workers):
data_set = ds.Cifar10Dataset(dataset_dir=dataset_dir,
usage=usage,
num_parallel_workers=workers,
shuffle=True)
trans = []
if usage == "train":
trans += [
vision.RandomCrop((32, 32), (4, 4, 4, 4)),
vision.RandomHorizontalFlip(prob=0.5)
]
trans += [
vision.Resize(resize),
vision.Rescale(1.0 / 255.0, 0.0),
vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
vision.HWC2CHW()
]
target_trans = transforms.TypeCast(mstype.int32)
# 数据映射操作
data_set = data_set.map(operations=trans,
input_columns='image',
num_parallel_workers=workers)
data_set = data_set.map(operations=target_trans,
input_columns='label',
num_parallel_workers=workers)
# 批量操作
data_set = data_set.batch(batch_size)
return data_set
# 获取处理后的训练与测试数据集
dataset_train = create_dataset_cifar10(dataset_dir=data_dir,
usage="train",
resize=image_size,
batch_size=batch_size,
workers=workers)
step_size_train = dataset_train.get_dataset_size()
dataset_val = create_dataset_cifar10(dataset_dir=data_dir,
usage="test",
resize=image_size,
batch_size=batch_size,
workers=workers)
step_size_val = dataset_val.get_dataset_size()
# 对CIFAR-10训练数据集进行可视化。
# In[3]:
import matplotlib.pyplot as plt
import numpy as np
data_iter = next(dataset_train.create_dict_iterator())
images = data_iter["image"].asnumpy()
labels = data_iter["label"].asnumpy()
print(f"Image shape: {images.shape}, Label shape: {labels.shape}")
# 训练数据集中,前六张图片所对应的标签
print(f"Labels: {labels[:6]}")
classes = []
with open(data_dir + "/batches.meta.txt", "r") as f:
for line in f:
line = line.rstrip()
if line:
classes.append(line)
# 训练数据集的前六张图片
plt.figure()
for i in range(6):
plt.subplot(2, 3, i + 1)
image_trans = np.transpose(images[i], (1, 2, 0))
mean = np.array([0.4914, 0.4822, 0.4465])
std = np.array([0.2023, 0.1994, 0.2010])
image_trans = std * image_trans + mean
image_trans = np.clip(image_trans, 0, 1)
plt.title(f"{classes[labels[i]]}")
plt.imshow(image_trans)
plt.axis("off")
plt.show()
# ## 构建网络
#
# 残差网络结构(Residual Network)是ResNet网络的主要亮点,ResNet使用残差网络结构后可有效地减轻退化问题,实现更深的网络结构设计,提高网络的训练精度。本节首先讲述如何构建残差网络结构,然后通过堆叠残差网络来构建ResNet50网络。
#
# ### 构建残差网络结构
#
# 残差网络结构图如下图所示,残差网络由两个分支构成:一个主分支,一个shortcuts(图中弧线表示)。主分支通过堆叠一系列的卷积操作得到,shortcuts从输入直接到输出,主分支输出的特征矩阵$F(x)$加上shortcuts输出的特征矩阵$x$得到$F(x)+x$,通过Relu激活函数后即为残差网络最后的输出。
#
# ![residual](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image7215529231224791732.png)
#
# 残差网络结构主要由两种,一种是Building Block,适用于较浅的ResNet网络,如ResNet18和ResNet34;另一种是Bottleneck,适用于层数较深的ResNet网络,如ResNet50、ResNet101和ResNet152。
#
# #### Building Block
#
# Building Block结构图如下图所示,主分支有两层卷积网络结构:
#
# + 主分支第一层网络以输入channel为64为例,首先通过一个$3\times3$的卷积层,然后通过Batch Normalization层,最后通过Relu激活函数层,输出channel为64;
# + 主分支第二层网络的输入channel为64,首先通过一个$3\times3$的卷积层,然后通过Batch Normalization层,输出channel为64。
#
# 最后将主分支输出的特征矩阵与shortcuts输出的特征矩阵相加,通过Relu激活函数即为Building Block最后的输出。
#
# ![building-block-5](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image1825877107845952074.png)
#
# 主分支与shortcuts输出的特征矩阵相加时,需要保证主分支与shortcuts输出的特征矩阵shape相同。如果主分支与shortcuts输出的特征矩阵shape不相同,如输出channel是输入channel的一倍时,shortcuts上需要使用数量与输出channel相等,大小为$1\times1$的卷积核进行卷积操作;若输出的图像较输入图像缩小一倍,则要设置shortcuts中卷积操作中的`stride`为2,主分支第一层卷积操作的`stride`也需设置为2。
#
# 如下代码定义`ResidualBlockBase`类实现Building Block结构。
# In[4]:
from typing import Type, Union, List, Optional
import mindspore.nn as nn
from mindspore.common.initializer import Normal
# 初始化卷积层与BatchNorm的参数
weight_init = Normal(mean=0, sigma=0.02)
gamma_init = Normal(mean=1, sigma=0.02)
class ResidualBlockBase(nn.Cell):
expansion: int = 1 # 最后一个卷积核数量与第一个卷积核数量相等
def __init__(self, in_channel: int, out_channel: int,
stride: int = 1, norm: Optional[nn.Cell] = None,
down_sample: Optional[nn.Cell] = None) -> None:
super(ResidualBlockBase, self).__init__()
if not norm:
self.norm = nn.BatchNorm2d(out_channel)
else:
self.norm = norm
self.conv1 = nn.Conv2d(in_channel, out_channel,
kernel_size=3, stride=stride,
weight_init=weight_init)
self.conv2 = nn.Conv2d(in_channel, out_channel,
kernel_size=3, weight_init=weight_init)
self.relu = nn.ReLU()
self.down_sample = down_sample
def construct(self, x):
"""ResidualBlockBase construct."""
identity = x # shortcuts分支
out = self.conv1(x) # 主分支第一层:3*3卷积层
out = self.norm(out)
out = self.relu(out)
out = self.conv2(out) # 主分支第二层:3*3卷积层
out = self.norm(out)
if self.down_sample is not None:
identity = self.down_sample(x)
out += identity # 输出为主分支与shortcuts之和
out = self.relu(out)
return out
# #### Bottleneck
#
# Bottleneck结构图如下图所示,在输入相同的情况下Bottleneck结构相对Building Block结构的参数数量更少,更适合层数较深的网络,ResNet50使用的残差结构就是Bottleneck。该结构的主分支有三层卷积结构,分别为$1\times1$的卷积层、$3\times3$卷积层和$1\times1$的卷积层,其中$1\times1$的卷积层分别起降维和升维的作用。
#
# + 主分支第一层网络以输入channel为256为例,首先通过数量为64,大小为$1\times1$的卷积核进行降维,然后通过Batch Normalization层,最后通过Relu激活函数层,其输出channel为64;
# + 主分支第二层网络通过数量为64,大小为$3\times3$的卷积核提取特征,然后通过Batch Normalization层,最后通过Relu激活函数层,其输出channel为64;
# + 主分支第三层通过数量为256,大小$1\times1$的卷积核进行升维,然后通过Batch Normalization层,其输出channel为256。
#
# 最后将主分支输出的特征矩阵与shortcuts输出的特征矩阵相加,通过Relu激活函数即为Bottleneck最后的输出。
#
# ![building-block-6](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image18106536310912139455.png)
#
# 主分支与shortcuts输出的特征矩阵相加时,需要保证主分支与shortcuts输出的特征矩阵shape相同。如果主分支与shortcuts输出的特征矩阵shape不相同,如输出channel是输入channel的一倍时,shortcuts上需要使用数量与输出channel相等,大小为$1\times1$的卷积核进行卷积操作;若输出的图像较输入图像缩小一倍,则要设置shortcuts中卷积操作中的`stride`为2,主分支第二层卷积操作的`stride`也需设置为2。
#
# 如下代码定义`ResidualBlock`类实现Bottleneck结构。
# In[5]:
class ResidualBlock(nn.Cell):
expansion = 4 # 最后一个卷积核的数量是第一个卷积核数量的4倍
def __init__(self, in_channel: int, out_channel: int,
stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
super(ResidualBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channel, out_channel,
kernel_size=1, weight_init=weight_init)
self.norm1 = nn.BatchNorm2d(out_channel)
self.conv2 = nn.Conv2d(out_channel, out_channel,
kernel_size=3, stride=stride,
weight_init=weight_init)
self.norm2 = nn.BatchNorm2d(out_channel)
self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
kernel_size=1, weight_init=weight_init)
self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)
self.relu = nn.ReLU()
self.down_sample = down_sample
def construct(self, x):
identity = x # shortscuts分支
out = self.conv1(x) # 主分支第一层:1*1卷积层
out = self.norm1(out)
out = self.relu(out)
out = self.conv2(out) # 主分支第二层:3*3卷积层
out = self.norm2(out)
out = self.relu(out)
out = self.conv3(out) # 主分支第三层:1*1卷积层
out = self.norm3(out)
if self.down_sample is not None:
identity = self.down_sample(x)
out += identity # 输出为主分支与shortcuts之和
out = self.relu(out)
return out
# #### 构建ResNet50网络
#
# ResNet网络层结构如下图所示,以输入彩色图像$224\times224$为例,首先通过数量64,卷积核大小为$7\times7$,stride为2的卷积层conv1,该层输出图片大小为$112\times112$,输出channel为64;然后通过一个$3\times3$的最大下采样池化层,该层输出图片大小为$56\times56$,输出channel为64;再堆叠4个残差网络块(conv2_x、conv3_x、conv4_x和conv5_x),此时输出图片大小为$7\times7$,输出channel为2048;最后通过一个平均池化层、全连接层和softmax,得到分类概率。
#
# ![resnet-layer](https://qingyun-test.oss-cn-hangzhou.aliyuncs.com/images/2024/07/20/image12829022581112085.png)
#
# 对于每个```
## 构建ResNet50网络
## 在代码中定义了ResNet的架构,主要包括一个卷积层、四个残差块、一个池化层和一个全连接层
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
channel: int, block_nums: int, stride: int = 1):
down_sample = None # shortcuts分支
if stride != 1或last_out_channel != channel * block.expansion:
down_sample = nn.SequentialCell([
nn.Conv2d(last_out_channel, channel * block.expansion,
kernel_size=1, stride=stride, weight_init=weight_init),
nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init)
])
layers = [] # 存储残差块层
layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))
in_channel = channel * block.expansion
# 堆叠残差网络
for _ in range(1, block_nums):
layers.append(block(in_channel, channel))
return nn.SequentialCell(layers)
## 定义ResNet类
class ResNet(nn.Cell):
def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
layer_nums: List[int], num_classes: int, input_channel: int) -> None:
super(ResNet, self).__init__()
self.relu = nn.ReLU() # 激活函数
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init) # 第一个卷积层,输入通道为3(彩色图像),输出通道为64
self.norm = nn.BatchNorm2d(64) # 批标准化层
self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same') # 最大池化层
# 定义各个残差块
self.layer1 = make_layer(64, block, 64, layer_nums[0])
self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
self.avg_pool = nn.AvgPool2d() # 平均池化层
self.flatten = nn.Flatten() # 平坦层
self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes) # 全连接层
def construct(self, x):
x = self.conv1(x)
x = self.norm(x)
x = self.relu(x)
x = self.max_pool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avg_pool(x)
x = self.flatten(x)
x = self.fc(x)
return x
def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]],
layers: List[int], num_classes: int, pretrained: bool, pretrained_ckpt: str,
input_channel: int):
model = ResNet(block, layers, num_classes, input_channel)
if pretrained:
download(url=model_url, path=pretrained_ckpt, replace=True)
param_dict = load_checkpoint(pretrained_ckpt)
load_param_into_net(model, param_dict)
return model
def resnet50(num_classes: int = 1000, pretrained: bool = False):
resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt"
resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt"
return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes, pretrained, resnet50_ckpt, 2048)
## 模型训练与评估
## 定义了调用ResNet50网络的函数,并加载预训练模型进行微调
network = resnet50(pretrained=True)
# 全连接层的输入大小
in_channel = network.fc.in_channels
fc = nn.Dense(in_channels=in_channel, out_channels=10) # 重新定义全连接层
network.fc = fc
## 设置学习率
num_epochs = 5
lr = nn.cosine_decay_lr(min_lr=0.00001, max_lr=0.001, total_step=step_size_train * num_epochs,
step_per_epoch=step_size_train, decay_epoch=num_epochs)
## 定义优化器和损失函数
opt = nn.Momentum(params=network.trainable_params(), learning_rate=lr, momentum=0.9)
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean')
def forward_fn(inputs, targets):
logits = network(inputs)
loss = loss_fn(logits, targets)
return loss
grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters)
def train_step(inputs, targets):
loss, grads = grad_fn(inputs, targets)
opt(grads)
return loss
## 通过数据迭代器进行训练和验证
data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs)
data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs)
best_acc = 0
best_ckpt_dir = "./BestCheckpoint"
best_ckpt_path = "./BestCheckpoint/resnet50-best.ckpt"
if not os.path.exists(best_ckpt_dir):
os.mkdir(best_ckpt_dir)
import mindspore.ops as ops
## 训练函数
def train(data_loader, epoch):
losses = []
network.set_train(True)
for i, (images, labels) in enumerate(data_loader):
loss = train_step(images, labels)
if i % 100 == 0 or i == step_size_train - 1:
print('Epoch: [%3d/%3d], Steps: [%3d/%3d], Train Loss: [%5.3f]' %
(epoch + 1, num_epochs, i + 1, step_size_train, loss))
losses.append(loss)
return sum(losses) / len(losses)
## 验证函数
def evaluate(data_loader):
network.set_train(False)
correct_num = 0.0 # 预测正确个数
total_num = 0.0 # 预测总数
for images, labels in data_loader:
logits = network(images)
pred = logits.argmax(axis=1) # 预测结果
correct = ops.equal(pred, labels).reshape((-1, ))
correct_num += correct.sum().asnumpy()
total_num += correct.shape[0]
acc = correct_num / total_num # 准确率
return acc
## 开始循环训练
print("Start Training Loop ...")
for epoch in range(num_epochs):
curr_loss = train(data_loader_train, epoch)
curr_acc = evaluate(data_loader_val)
print("-" * 50)
print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % (
epoch+1, num_epochs, curr_loss, curr_acc
))
print("-" * 50)
## 保存当前预测准确率最高的模型
if curr_acc > best_acc:
best_acc = curr_acc
ms.save_checkpoint(network, best_ckpt_path)
print("=" * 80)
print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, "
f"save the best ckpt file in {best_ckpt_path}", flush=True)
## 模型预测可视化
## 定义了一个函数,用来加载最佳模型并对验证集进行预测,并将预测结果可视化显示
import matplotlib.pyplot as plt
def visualize_model(best_ckpt_path, dataset_val):
num_class = 10 # 对10类图像进行分类
net = resnet50(num_class)
## 加载模型参数
param_dict = ms.load_checkpoint(best_ckpt_path)
ms.load_param_into_net(net, param_dict)
## 加载验证集的数据进行验证
data = next(dataset_val.create_dict_iterator())
images = data["image"]
labels = data["label"]
## 预测图像类别
output = net(data['image'])
pred = np.argmax(output.asnumpy(), axis=1)
## 图像分类标签
classes = []
with open(data_dir + "/batches.meta.txt", "r") as f:
for line in f:
line = line.rstrip()
if line:
classes.append(line)
## 显示图像及图像的预测值
plt.figure()
for i in range(6):
plt.subplot(2, 3, i + 1)
## 若预测正确,显示为蓝色;若预测错误,显示为红色
color = 'blue' if pred[i] == labels.asnumpy()[i] else 'red'
plt.title('predict:{}'.format(classes[pred[i]]), color=color)
picture_show = np.transpose(images.asnumpy()[i], (1, 2, 0))
mean = np.array([0.4914, 0.4822, 0.4465])
std = np.array([0.2023, 0.1994, 0.2010])
picture_show = std * picture_show + mean
picture_show = np.clip(picture_show, 0, 1)
plt.imshow(picture_show)
plt.axis('off')
plt.show()
## 使用测试数据集进行验证
visualize_model(best_ckpt_path=best_ckpt_path, dataset_val=dataset_val)
解析
- 函数定义与初始化:
- 函数
make_layer
用于创建一个堆叠了多个残差块的网络层,接受参数包括上一个残差块的输出通道数、残差块类型、当前残差块的输入通道数、残差块的数量以及卷积步幅。 - 类
ResNet
定义了 ResNet 网络结构,包括初始化各层和前向传播逻辑。
- 函数
- 加载预训练模型:
- 函数
_resnet
用于加载预训练模型。 - 函数
resnet50
用于创建一个 ResNet50 模型,默认分类数为1000,并提供选择是否加载预训练模型的选项。
- 函数
- 训练与评估:
- 设置了学习率、优化器和损失函数,并定义了训练步骤
train_step
和验证步骤evaluate
。 train
函数用于在数据加载器中进行模型训练,evaluate
函数用于验证模型在验证集上的准确率。- 循环训练并保存最佳模型参数。
- 设置了学习率、优化器和损失函数,并定义了训练步骤
- 模型预测可视化:
- 函数
visualize_model
用于加载最佳模型并对验证集进行预测,将预测结果可视化显示。预测正确的显示为蓝色,预测错误的显示为红色。
- 函数
通过这些步骤,你可以实现对 CIFAR-10 数据集的训练,并使用 ResNet50 模型进行图像分类和预测可视化。预测结果显示在图形窗口中,能够直观地看到模型的预测效果以及错误预测。