导入必要的库
import numpy as np import torch import matplotlib.pyplot as plt
定义模型类MyRNN,继承自torch.nn.Module
class MyRNN(torch.nn.Module):
def __init__(self, n_vocab, n_hid, n_layers=1):
super(MyRNN, self).__init__()
self.n_vocab = n_vocab # 词表大小
self.n_hid = n_hid # 隐藏单元个数
self.n_layers = n_layers # RNN层数
# 定义一个RNN层
self.rnn = torch.nn.RNN(self.n_vocab, self.n_hid, self.n_layers)
self.rnn = torch.nn.GRU(self.n_vocab, self.n_hid, self.n_layers) # 可以选择GRU作为RNN层
# 定义一个全连接层将隐藏状态映射到词表大小的空间中去
self.linear = torch.nn.Linear(self.n_hid, self.n_vocab)
def forward(self, X, state):
X = X.to(torch.float32) # 将输入转换为float32类型,方便计算梯度和更新参数
Y, state = self.rnn(X, state) # 输入X和初始状态state,输出Y和最后一个时间步的状态state
# 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数)
# 然后将其输入全连接层并得到输出(形状为(时间步数*批量大小,词表大小))
output = self.linear(Y.reshape((-1, Y.shape[-1])))
return output, state # 返回输出和最后一个时间步的状态
def begin_state(self, batch_size):
# 返回一个全零张量作为初始状态,形状为(n_layers, batch_size, n_hid)
return torch.zeros((self.n_layers, batch_size, self.n_hid))
定义获取随机批量数据的函数
def get_random_batch(batch_size,n_steps,n_tokens):
# 随机生成一个起始位置
start_pos = np.random.randint(0, n_tokens-n_steps)
# 生成n_steps个随机数
x_pos = np.arange(start_pos,start_pos+n_steps)
y_pos = x_pos + 1
# 生成batch_size个这样的随机序列作为一组训练数据
X_batch = [x_pos]*batch_size
Y_batch = [y_pos]*batch_size
return np.asarray(X_batch),np.asarray(Y_batch)
定义用于训练的数据迭代器函数
def train_data_iter(n_batch):
for ii in range(n_batch):
X_rand_batch, Y_rand_batch = get_random_batch(batch_size,n_steps, n_tokens)
one_hot_matrix = np.eye(len(idx2token)) # 构造词向量矩阵,将每个词转换成一个one-hot向量
X_enc = one_hot_matrix[corpus[X_rand_batch]]
Y_enc = one_hot_matrix[corpus[Y_rand_batch]]
yield torch.from_numpy(X_enc), torch.from_numpy(Y_enc) # 返回X和Y的one-hot表示
初始化超参数及模型实例、损失函数和优化器
batch_size = 10 # 批量大小 n_steps = 100 # 时间步数 n_hid = 500 # 隐藏单元个数
model = MyRNN(n_vocab, n_hid) # 实例化模型类 criteria = torch.nn.CrossEntropyLoss() # 定义损失函数为交叉熵损失 optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 定义优化器为Adam,并设置学习率为0.001
训练模型并记录损失和精度
train_loss_list = [] # 记录每个批次的训练损失 train_acc_list = [] # 记录每个批次的训练精度
model.train() # 将模型设为训练状态,启用dropout等方法
for batch_id, (X,Y) in enumerate(train_data_iter(1000)): # 进行1000个批次的训练
state = model.begin_state(batch_size) # 获取初始状态
Y_hat, state = model(X, state) # 前向计算得到预测值Y_hat
Y_target = Y.reshape((-1, Y.shape[-1])) # 将真实标签reshape成形状为(时间步数*批量大小,词表大小)
loss = criteria(Y_hat, Y_target) # 计算交叉熵损失
optimizer.zero_grad() # 梯度清零,防止梯度累加影响性能
loss.backward() # 反向传播求梯度
optimizer.step() # 更新参数
correct_batch = (Y_hat.argmax(1) == Y_target.argmax(1)).type(torch.float).sum().item() # 计算当前批次的精度
train_acc_batch = 100 * correct_batch / (n_steps * batch_size) # 计算当前批次的精度
train_loss_list.append(loss.item()) # 将当前批次的训练损失添加到列表中
train_acc_list.append(train_acc_batch) # 将当前批次的训练精度添加到列表中
if batch_id % 100 == 0:
print("batch_id:%d, loss:%f, acc:%f" % (batch_id, loss.item(), train_acc_batch)) # 每100个批次输出一下损失和精度
将训练后的困惑度计算并输出
train_loss = np.asarray(train_loss_list) train_acc = np.asarray(train_acc_list)
print(“训练后的困惑度:%f” % np.exp(train_loss[-1]))
绘制训练损失和精度曲线
plt.figure() plt.plot(np.arange(len(train_loss)), np.exp(train_loss)) plt.grid(True) plt.xlabel(“n_updates”) plt.ylabel(“perplexity”)
plt.figure() plt.plot(np.arange(len(train_acc)), train_acc) plt.grid(True) plt.xlabel(“n_updates”) plt.ylabel(“ACC”)