个人用深度学习代码模板:模型训练框架、数据处理、绘图等(待更新)
模型训练基本框架
import random
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms, datasets
from torch.utils.data import Dataset, DataLoader
目录
1 设置超参数
# 固定随机种子
def random_seeds(seed=3407):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
random_seeds()
batch_size = 32
num_workers = 2
learning_rate = 0.01
momentum = 0.9
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 100
display_iter = 50
snapshot_iter = 20
2 数据处理
2.1 划分数据
csv文件(trainLabels.csv)中存储了各标签
import collections
import math
import os
import shutil
def read_csv_labels(filename):
"""读取csv文件中的标签,返回一个字典"""
with open(filename, 'r') as f:
lines = f.readline()[1:] # 跳过文件头行(列名)
tokens = [line.rstrip().split(',') for line in lines]
return dict(((name, label) for name, label in tokens))
def copyfile(filename, target_dir):
"""将⽂件复制到⽬标⽬录"""
os.makedirs(target_dir, exist_ok=True)
shutil.copy(filename, target_dir)
def reorg_train_valid(data_dir, labels, valid_ratio):
"""将验证集从原始的训练集中拆分出来"""
n = collections.Counter(labels.values()).most_common()[-1][1] # 训练数据集中样本最少的类别中的样本数
n_valid_per_label = max(1, math.floor(n * valid_ratio)) # 验证集中每个类别的样本数
label_count = {}
for train_file in os.listdir(os.path.join(data_dir, "train")):
label = labels[train_file.split('.')[0]]
filename = os.path.join(data_dir, "train", train_file)
copyfile(filename, os.path.join(data_dir, "train_valid_test", "train_valid", label))
if label not in label_count or label_count[label] < n_valid_per_label:
copyfile(filename, os.path.join(data_dir, "train_valid_test", "valid", label))
label_count[label] = label_count.get(label, 0) + 1
else:
copyfile(filename, os.path.join(data_dir, "train_valid_test", "train", label))
return n_valid_per_label
def reorg_test(data_dir):
"""在预测期间整理测试集,以⽅便读取"""
for test_file in os.listdir(os.path.join(data_dir, "test")):
copyfile(os.path.join(data_dir, "test", test_file),
os.path.join(data_dir, "train_valid_test", "test", "unknown"))
def reorg_csv_data(data_dir, labels_csv_name, valid_ratio):
labels = read_csv_labels(os.path.join(data_dir, labels_csv_name))
reorg_train_valid(data_dir, labels, valid_ratio)
reorg_test(data_dir)
if __name__ == '__main__':
data_dir = "../data/cifar-10/"
labels_csv_name = "trainLabels.csv"
reorg_csv_data(data_dir, labels_csv_name, 0.1)
2.2 自定义数据集Dataset
所有自定义数据集都继承Dataset类,并实现以下三个函数
__init__
:用于接收外部参数,比如文件路径等,并完成数据集加载__getitem__
:根据索引读取数据集中的元素,进行一定转换,返回单个样本及其标签__len__
:返回数据集的大小
示例(其中csv文件给出了图片名称对应的标签,索引列标签为image_index
)
class ImageDataset(Dataset):
def __init__(self, data_dir, info_csv, image_list, transform=None):
"""
:param data_dir: path to image directory
:param info_csv: path to the csv file containing image indexes with corresponding Labels
:param image_list: path to the txt file contains image names to training/validation set
:param transform: optional transform to be applied on a sample
"""
self.data_dir = data_dir
self.image_file = open(image_list).readlines()
self.label_info = pd.read_csv(info_csv)
self.transform = transform
def __getitem__(self, index):
"""
:param index: the index of item
:return: image and its Label
"""
image_name = self.image_file[index].strip('\n')
raw_label = self.label_info.loc[self.label_info['image_index'] == image_name]
label = raw_label.iloc[:, 0]
image_name = os.path.join(self.data_dir, image_name)
image = Image.open(image_name).convert('RGB')
if self.transform is not None:
image = self.transform(image)
return image, label
def __len__(self):
return len(self.image_file)
2.3 数据加载器DataLoader
数据加载器DataLoader结合了数据集和取样器,并且可以提供多个线程处理数据集。训练模型时用它来把训练数据分成多组,迭代时每次抛出一个元组,分别为inputs和targets的tensor。
DataLoader(dataset, batch_size=1, drop_last=False, shuffle=None, num_workers=0)
- dataset:封装好的数据集,取值为tuple型,装有样本和标签。
- batch_size:批量,每次循环时取出的数据量大小
- drop_last:当数据集无法整除batch_size时,为True则最后一批会被丢掉,为False则最后一批会被保留,该批数量会变少。
- shuffle:是否随机返回batch,默认不随机。(训练时需要随机来提高训练精度,验证和测试时不需要)
- num_workers:进程数
train_loader = DataLoader(train_data, batch_size=args.batch_size, shuffle=True)
valid_loader = DataLoader(valid_data, batch_size=args.batch_size, shuffle=False)
2.4 读取、展示数据集
# 设置transform
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
# 定义训练集
train_data = datasets.ImageFolder(root="[train data]", transform=transform)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
# 定义验证集
valid_data = datasets.ImageFolder(root="[valid data]", transform=transform)
valid_loader = DataLoader(valid_data, batch_size=batch_size, shuffle=True)
# 定义测试集
test_data = datasets.ImageFolder(root="[test data]", transform=transform)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)
# 显示图片
def imshow(img):
img = img / 2 + 0.5 # unnormalize
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0))) # 通道转换
data_iter = iter(train_loader)
images, labels = next(data_iter)
imshow(torchvision.utils.make_grid(images))
print("".join("%5s" % classes[labels[j]] for j in range(4)))
3 建立模型
class Model(nn.Module):
def __init__(self):
super().__init__()
# ...
def forward(self, x):
# ...
# 定义模型,加载预训练权重
model = Model().to(device)
# model.load_state_dict(torch.load("model.pth"))
4 设置损失函数、优化器
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
5 模型训练、测试
5.1 训练函数
# 训练
def train(model, optimizer, criterion, train_loader, device):
model.train()
total_loss = 0
# 遍历训练集
for inputs, targets in train_loader:
inputs, targets = inputs.to(device), targets.to(device)
# 前向传播,计算损失,反向传播,更新参数
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets) # l(y_hat, y)
loss.backward()
optimizer.step()
# 统计损失
total_loss += loss.item()
# 计算平均损失
avg_loss = total_loss / len(train_loader)
return avg_loss
5.2 测试函数
# 验证或测试
def test(model, criterion, dataloader, device):
model.eval()
total_loss = 0
total_correct = 0
with torch.no_grad():
# 遍历测试集
for inputs, targets in dataloader:
inputs, targets = inputs.to(device), targets.to(device)
# 前向传播,计算损失
outputs = model(inputs)
loss = criterion(outputs, targets)
# 统计损失
total_loss += loss.item()
# 计算准确率
_, predicted = torch.max(outputs.data, dim=1)
total_correct += (predicted == targets).sum().item()
# 计算平均损失
avg_loss = total_loss / len(dataloader)
# 计算准确率
accuracy = total_correct / len(dataloader.dataset)
return avg_loss, accuracy
5.3 训练过程
# 训练和验证
for epoch in range(0, num_epochs):
print("Training epoch:", epoch + 1)
train_loss = train(model, optimizer, criterion, train_loader, device)
valid_loss, valid_accuracy = test(model, criterion, valid_loader, device)
# 显示训练集和验证集的损失
print(f"Epoch: {epoch + 1}, Train Loss: {train_loss:.5f}, " +
f"Validation Loss: {valid_loss:.5f}, Validation Accuracy: {valid_accuracy:.5f}")
# 保存权重
if (epoch + 1) % snapshot_iter == 0:
torch.save(model.state_dict(), f"model_{epoch + 1}.pth")
# 测试
test_loss, test_accuracy = test(model, criterion, test_loader, device)
print(f"Test Loss: {test_loss:.5f}, Test Accuracy: {test_accuracy:.5f}")
6 绘图
import matplotlib.pyplot as plt
import numpy as np
import torch
def set_axes(axes, xlabel, ylabel, xscale, yscale, xlim, ylim, legend):
"""设置matplotlib的轴"""
axes.set_xlabel(xlabel)
axes.set_ylabel(ylabel)
axes.set_xscale(xscale)
axes.set_yscale(yscale)
axes.set_xlim(xlim)
axes.set_ylim(ylim)
if legend:
axes.legend(legend)
axes.grid()
def plot(X: any, Y: any, xlabel=None, ylabel=None, legend=None,
xlim=None, ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None):
"""绘制曲线"""
if legend is None:
legend = []
plt.figure(figsize=figsize)
axes = axes if axes else plt.gca()
def has_one_axis(X):
return (hasattr(X, "ndim") and X.ndim == 1
or isinstance(X, list) and not hasattr(X[0], "__len__"))
if has_one_axis(X):
X = [X]
if has_one_axis(Y):
Y = [Y]
if len(X) != len(Y):
X = X * len(Y)
axes.cla()
for x, y, fmt in zip(X, Y, fmts):
if len(x):
axes.plot(x, y, fmt)
else:
axes.plot(y, fmt)
set_axes(axes, xlabel, ylabel, xscale, yscale, xlim, ylim, legend)
plt.show()
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5):
"""批量显示图片"""
figsize = (num_cols * scale, num_rows * scale)
_, axes = plt.subplots(num_rows, num_cols, figsize=figsize)
axes = axes.flatten()
for i, (ax, img) in enumerate(zip(axes, imgs)):
if torch.is_tensor(img):
# 图片张量
ax.imshow(img.numpy())
else:
# PIL图片
ax.imshow(img)
ax.axes.get_xaxis().set_visible(False)
ax.axes.get_yaxis().set_visible(False)
if titles:
ax.set_title(titles[i])
return axes
if __name__ == '__main__':
x = np.arange(1, 10, 0.1)
y = [np.sin(x), np.cos(x)]
plot(x, y, xlabel='x', ylabel='f(x)')
x = np.arange(1, 10, 0.1)
y = [np.sin(x), np.cos(x)]
plot(x, y, xlabel='x', ylabel='f(x)')
7 其他
7.1 计时器
import time
import numpy as np
class Timer:
"""记录运行时间"""
def __init__(self):
self.tik = None
self.times = []
self.start()
def start(self):
"""启动计时器"""
self.tik = time.time()
def stop(self):
"""停止计时器,并记录时间"""
self.times.append(time.time() - self.tik)
return self.times[-1]
def avg(self):
"""返回平均时间"""
return sum(self.times) / len(self.times)
def sum(self):
"""返回时间总和"""
return sum(self.times)
def cumsum(self):
"""返回存储累计时间的列表"""
return np.array(self.times).cumsum().tolist()
7.2 读取yaml文件
yaml文件中,参数呈现层级结构,如下例所示
device: 'cpu' data: train_path: 'data/train' test_path: 'test/train' num: 1000
def read_yaml(path):
"""读取yaml文件中的参数,返回字典"""
f = open(path, 'r', encoding='utf-8')
string = f.read() # 读入yaml文件中的内容
f.close()
arg_dict = yaml.safe_load(string) # 加载yaml格式的内容并转换为字典
return dict
path = 'config.yaml'
arg_dict = read_yaml(path)
device = arg_dict['device']
train_path = arg_dict['data']['train_path']
666