基于GRU模型的带编-解码器和注意力机制的英译法任务实现

模型整体结构示意

1.基础准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 用于正则表达式
import re
# 用于构建网络结构和函数的torch工具包
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
# torch中预定义的优化方法工具包
import torch.optim as optim
import time
# 用于随机生成数据
import random
import matplotlib.pyplot as plt

# 设备选择, 我们可以选择在cuda或者cpu上运行你的代码
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 起始标志
SOS_token = 0
# 结束标志
EOS_token = 1
# 最大句子长度不能超过10个 (包含标点)
MAX_LENGTH = 10
# 数据文件路径
data_path = './data/eng-fra-v2.txt'

# 文本清洗工具函数
def normalizeString(s):
"""字符串规范化函数, 参数s代表传入的字符串"""
s = s.lower().strip()
# 在.!?前加一个空格 这里的\1表示第一个分组 正则中的\num
s = re.sub(r"([.!?])", r" \1", s)
# s = re.sub(r"([.!?])", r" ", s)
# 使用正则表达式将字符串中 不是 大小写字母和正常标点的都替换成空格
s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
return s

2.数据预处理

构建数据获取源对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def my_getdata():

# 1 按行读文件 open().read().strip().split(\n)
my_lines = open(data_path, encoding='utf-8').read().strip().split('\n')
print('my_lines--->', len(my_lines))

# 2 按行清洗文本 构建语言对 my_pairs
my_pairs = [[normalizeString(s) for s in l.split('\t')] for l in my_lines]
print('len(pairs)--->', len(my_pairs))

# 打印前4条数据
print(my_pairs[:4])

# 打印第8000条的英文 法文数据
print('my_pairs[8000][0]--->', my_pairs[8000][0])
print('my_pairs[8000][1]--->', my_pairs[8000][1])

# 3 遍历语言对 构建英语单词字典 法语单词字典
# 3-1 english_word2index english_word_n french_word2index french_word_n
english_word2index = {"SOS": 0, "EOS": 1}
english_word_n = 2

french_word2index = {"SOS": 0, "EOS": 1}
french_word_n = 2

# 遍历语言对 获取英语单词字典 法语单词字典
for pair in my_pairs:
for word in pair[0].split(' '):
if word not in english_word2index:
english_word2index[word] = english_word_n
english_word_n += 1

for word in pair[1].split(' '):
if word not in french_word2index:
french_word2index[word] = french_word_n
french_word_n += 1

# 3-2 english_index2word french_index2word
english_index2word = {v:k for k, v in english_word2index.items()}
french_index2word = {v:k for k, v in french_word2index.items()}

print('len(english_word2index)-->', len(english_word2index))
print('len(french_word2index)-->', len(french_word2index))
print('english_word_n--->', english_word_n, 'french_word_n-->', french_word_n)

return english_word2index, english_index2word, english_word_n, french_word2index, french_index2word, french_word_n, my_pairs
english_word2index, english_index2word, english_word_n, french_word2index, french_index2word, french_word_n, my_pairs=my_getdata()

构建数据处理迭代对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class MyPairsDataset(Dataset):
def __init__(self, my_pairs):
# 样本x
self.my_pairs = my_pairs

# 样本条目数
self.sample_len = len(my_pairs)

# 获取样本条数
def __len__(self):
return self.sample_len

# 获取第几条 样本数据
def __getitem__(self, index):

# 对index异常值进行修正 [0, self.sample_len-1]
index = min(max(index, 0), self.sample_len-1)

# 按索引获取 数据样本 x y
x = self.my_pairs[index][0]
y = self.my_pairs[index][1]

# 样本x 文本数值化
x = [english_word2index[word] for word in x.split(' ')]
x.append(EOS_token)
tensor_x = torch.tensor(x, dtype=torch.long, device=device)

# 样本y 文本数值化
y = [french_word2index[word] for word in y.split(' ')]
y.append(EOS_token)
tensor_y = torch.tensor(y, dtype=torch.long, device=device)
# 注意 tensor_x tensor_y都是一维数组,通过DataLoader拿出数据是二维数据
# print('tensor_y.shape===>', tensor_y.shape, tensor_y)

# 返回结果
return tensor_x, tensor_y

3.基于GRU的编码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class EncoderRNN(nn.Module):
def __init__(self, input_size, hidden_size):

# input_size 编码器 词嵌入层单词数 eg:2803
# hidden_size 编码器 词嵌入层每个单词的特征数 eg 256
super(EncoderRNN, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size

# 实例化nn.Embedding层
self.embedding = nn.Embedding(input_size, hidden_size)

# 实例化nn.GRU层 注意参数batch_first=True
self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)

def forward(self, input, hidden):

# 数据经过词嵌入层 数据形状 [1,6] --> [1,6,256]
output = self.embedding(input)

# 数据经过gru层 数据形状 gru([1,6,256],[1,1,256]) --> [1,6,256] [1,1,256]
output, hidden = self.gru(output, hidden)
return output, hidden
# output 提供了输入序列的详细表示,可用于注意力机制。
# hidden 是整个输入序列的压缩表示,用于初始化解码器。
def inithidden(self):
# 将隐层张量初始化成为1x1xself.hidden_size大小的张量
return torch.zeros(1, 1, self.hidden_size, device=device)

4 构建基于GRU和Attention的解码器

使用软注意力机制的类点积缩放模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class AttnDecoderRNN(nn.Module):
def __init__(self, output_size, hidden_size, dropout_p=0.1, max_length=MAX_LENGTH):

# output_size 编码器 词嵌入层单词数 eg:4345
# hidden_size 编码器 词嵌入层每个单词的特征数 eg 256
# dropout_p 置零比率,默认0.1,
# max_length 最大长度10
super(AttnDecoderRNN, self).__init__()
self.output_size = output_size
self.hidden_size = hidden_size
self.dropout_p = dropout_p
self.max_length = max_length

# 定义nn.Embedding层 nn.Embedding(4345,256)
self.embedding = nn.Embedding(self.output_size, self.hidden_size)

# 定义线性层1:求q的注意力权重分布
self.attn = nn.Linear(self.hidden_size * 2, self.max_length)

# 定义线性层2:q+注意力结果表示融合后,在按照指定维度输出
self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)

# 定义dropout层
self.dropout = nn.Dropout(self.dropout_p)

# 定义gru层
self.gru = nn.GRU(self.hidden_size, self.hidden_size, batch_first=True)

# 定义out层 解码器按照类别进行输出(256,4345)
self.out = nn.Linear(self.hidden_size, self.output_size)

# 实例化softomax层 数值归一化 以便分类
self.softmax = nn.LogSoftmax(dim=-1)

def forward(self, input, hidden, encoder_outputs):
# input代表q [1,1] 二维数据 hidden代表k [1,1,256] encoder_outputs代表v [10,256]

# 数据经过词嵌入层
# 数据形状 [1,1] --> [1,1,256]
embedded = self.embedding(input)

# 使用dropout进行随机丢弃,防止过拟合
embedded = self.dropout(embedded)

# 1 求查询张量q的注意力权重分布, attn_weights[1,10]
attn_weights = F.softmax(
self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)

# 2 求查询张量q的注意力结果表示 bmm运算, attn_applied[1,1,256]
# [1,1,10],[1,10,256] ---> [1,1,256]
attn_applied = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0))

# 3 q 与 attn_applied 融合,再按照指定维度输出 output[1,1,256]
output = torch.cat((embedded[0], attn_applied[0]), 1)
output = self.attn_combine(output).unsqueeze(0)

# 查询张量q的注意力结果表示 使用relu激活
output = F.relu(output)

# 查询张量经过gru、softmax进行分类结果输出
# 数据形状[1,1,256],[1,1,256] --> [1,1,256], [1,1,256]
output, hidden = self.gru(output, hidden)
# 数据形状[1,1,256]->[1,256]->[1,4345]
output = self.softmax(self.out(output[0]))

# 返回解码器分类output[1,4345],最后隐层张量hidden[1,1,256] 注意力权重张量attn_weights[1,10]
return output, hidden, attn_weights

def inithidden(self):
# 将隐层张量初始化成为1x1xself.hidden_size大小的张量
return torch.zeros(1, 1, self.hidden_size, device=device)

5 模型构建与训练

1
2
3
4
5
6
7
8
# 基础模型训练参数
mylr = 2e-4
epochs = 2
# 设置teacher_forcing比率为0.5
teacher_forcing_ratio = 0.5

print_interval_num = 1000
plot_interval_num = 100

训练迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# from torch.utils.tensorboard import SummaryWriter
def Train_Iters(x, y, my_encoderrnn, my_attndecoderrnn, myadam_encode, myadam_decode, mycrossentropyloss):
# # 初始化SummaryWriter
# writer = SummaryWriter()

# 1 编码 encode_output, encode_hidden = my_encoderrnn(x, encode_hidden)
encode_hidden = my_encoderrnn.inithidden().to(device)
x=x.to(device)
# 模型可视化
# writer.add_graph(my_encoderrnn, (x, encode_hidden))
encode_output, encode_hidden = my_encoderrnn(x, encode_hidden) # 一次性送数据
# [1,6],[1,1,256] --> [1,6,256],[1,1,256]

# 2 解码参数准备和解码
# 解码参数1 encode_output_c [10,256]
encode_output_c = torch.zeros(MAX_LENGTH, my_encoderrnn.hidden_size, device=device)
for idx in range(x.shape[1]):
encode_output_c[idx] = encode_output[0, idx]

# 解码参数2
decode_hidden = encode_hidden

# 解码参数3
input_y = torch.tensor([[SOS_token]], device=device)

myloss = 0.0
y_len = y.shape[1]

# ### 张量可视化
# writer.add_graph(my_attndecoderrnn, (input_y, decode_hidden, encode_output_c))

use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
if use_teacher_forcing:
for idx in range(y_len):
# 数据形状数据形状 [1,1],[1,1,256],[10,256] ---> [1,4345],[1,1,256],[1,10]
output_y, decode_hidden, attn_weight = my_attndecoderrnn(input_y, decode_hidden, encode_output_c)
target_y = y[0][idx].view(1)
myloss = myloss + mycrossentropyloss(output_y, target_y)
input_y = y[0][idx].view(1, -1)
else:
for idx in range(y_len):
# 数据形状数据形状 [1,1],[1,1,256],[10,256] ---> [1,4345],[1,1,256],[1,10]
output_y, decode_hidden, attn_weight = my_attndecoderrnn(input_y, decode_hidden, encode_output_c)
target_y = y[0][idx].view(1)
myloss = myloss + mycrossentropyloss(output_y, target_y)

topv, topi = output_y.topk(1)
if topi.squeeze().item() == EOS_token:
break
input_y = topi.detach()

# 梯度清零
myadam_encode.zero_grad()
myadam_decode.zero_grad()

# 反向传播
myloss.backward()

# 梯度更新
myadam_encode.step()
myadam_decode.step()

# 返回 损失列表myloss.item()/y_len
return myloss.item() / y_len

训练模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

def Train_seq2seq():

# 初始化SummaryWriter
writer = SummaryWriter()

# 实例化 mypairsdataset对象 实例化 mydataloader
mypairsdataset = MyPairsDataset(my_pairs)
mydataloader = DataLoader(dataset=mypairsdataset, batch_size=1, shuffle=True)

# 实例化编码器 my_encoderrnn 实例化解码器 my_attndecoderrnn
my_encoderrnn = EncoderRNN(2803, 256).to(device)
my_attndecoderrnn = AttnDecoderRNN(output_size=4345, hidden_size=256, dropout_p=0.1, max_length=10).to(device)



# 实例化编码器优化器 myadam_encode 实例化解码器优化器 myadam_decode
myadam_encode = optim.Adam(my_encoderrnn.parameters(), lr=mylr)
myadam_decode = optim.Adam(my_attndecoderrnn.parameters(), lr=mylr)

# 实例化损失函数 mycrossentropyloss = nn.NLLLoss()
mycrossentropyloss = nn.NLLLoss()

# 定义模型训练的参数
plot_loss_list = []

# 外层for循环 控制轮数 for epoch_idx in range(1, 1+epochs):
for epoch_idx in range(1, 1+epochs):

print_loss_total, plot_loss_total = 0.0, 0.0
starttime = time.time()

# 内层for循环 控制迭代次数
for item, (x, y) in enumerate(mydataloader, start=1):
x=x.to(device)
y=y.to(device)

# 调用内部训练函数
myloss = Train_Iters(x, y, my_encoderrnn, my_attndecoderrnn, myadam_encode, myadam_decode, mycrossentropyloss)
# break
print_loss_total += myloss
plot_loss_total += myloss

# 计算打印屏幕间隔损失-每隔1000次
if item % print_interval_num ==0 :
print_loss_avg = print_loss_total / print_interval_num
# 将总损失归0
print_loss_total = 0
# 打印日志,日志内容分别是:训练耗时,当前迭代步,当前进度百分比,当前平均损失
print('轮次%d 损失%.6f 时间:%d' % (epoch_idx, print_loss_avg, time.time() - starttime))

# 计算画图间隔损失-每隔100次
if item % plot_interval_num == 0:
# 通过总损失除以间隔得到平均损失
plot_loss_avg = plot_loss_total / plot_interval_num
# 将平均损失添加plot_loss_list列表中
plot_loss_list.append(plot_loss_avg)
writer.add_scalar('Train_loss', plot_loss_avg, epoch_idx * len(mydataloader) + item)
# 总损失归0
plot_loss_total = 0
# 每个轮次保存模型
torch.save(my_encoderrnn.state_dict(), './my_encoderrnn_%d.pth' % epoch_idx)
torch.save(my_attndecoderrnn.state_dict(), './my_attndecoderrnn_%d.pth' % epoch_idx)

# 所有轮次训练完毕 画损失图
# plt.figure()
# plt.plot(plot_loss_list)
# plt.savefig('./s2sq_loss.png')
# plt.show()

# return plot_loss_list
Train_seq2seq()

6 模型评估

构建模型评估函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 模型评估代码与模型预测代码类似,需要注意使用with torch.no_grad()
# 模型预测时,第一个时间步使用SOS_token作为输入 后续时间步采用预测值作为输入,也就是自回归机制
def Seq2Seq_Evaluate(x, my_encoderrnn, my_attndecoderrnn):
with torch.no_grad():
# 1 编码:一次性的送数据
encode_hidden = my_encoderrnn.inithidden()
encode_output, encode_hidden = my_encoderrnn(x, encode_hidden)

# 2 解码参数准备
# 解码参数1 固定长度中间语义张量c
encoder_outputs_c = torch.zeros(MAX_LENGTH, my_encoderrnn.hidden_size, device=device)
x_len = x.shape[1]
for idx in range(x_len):
encoder_outputs_c[idx] = encode_output[0, idx]

# 解码参数2 最后1个隐藏层的输出 作为 解码器的第1个时间步隐藏层输入
decode_hidden = encode_hidden

# 解码参数3 解码器第一个时间步起始符
input_y = torch.tensor([[SOS_token]], device=device)

# 3 自回归方式解码
# 初始化预测的词汇列表
decoded_words = []
# 初始化attention张量
decoder_attentions = torch.zeros(MAX_LENGTH, MAX_LENGTH)
for idx in range(MAX_LENGTH): # note:MAX_LENGTH=10
output_y, decode_hidden, attn_weights = my_attndecoderrnn(input_y, decode_hidden, encoder_outputs_c)
# 预测值作为为下一次时间步的输入值
topv, topi = output_y.topk(1)
decoder_attentions[idx] = attn_weights

# 如果输出值是终止符,则循环停止
if topi.squeeze().item() == EOS_token:
decoded_words.append('<EOS>')
break
else:
decoded_words.append(french_index2word[topi.item()])

# 将本次预测的索引赋值给 input_y,进行下一个时间步预测
input_y = topi.detach()

# 返回结果decoded_words, 注意力张量权重分布表(把没有用到的部分切掉)
return decoded_words, decoder_attentions[:idx + 1]

加载模型并评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 加载模型
PATH1 = './gpumodel/my_encoderrnn.pth'
PATH2 = './gpumodel/my_attndecoderrnn.pth'
def dm_test_Seq2Seq_Evaluate():
# 实例化dataset对象
mypairsdataset = MyPairsDataset(my_pairs)
# 实例化dataloader
mydataloader = DataLoader(dataset=mypairsdataset, batch_size=1, shuffle=True)

# 实例化模型
input_size = english_word_n
hidden_size = 256 # 观察结果数据 可使用8
my_encoderrnn = EncoderRNN(input_size, hidden_size)
# my_encoderrnn.load_state_dict(torch.load(PATH1))
my_encoderrnn.load_state_dict(torch.load(PATH1, map_location=lambda storage, loc: storage), False)
print('my_encoderrnn模型结构--->', my_encoderrnn)

# 实例化模型
input_size = french_word_n
hidden_size = 256 # 观察结果数据 可使用8
my_attndecoderrnn = AttnDecoderRNN(input_size, hidden_size)
# my_attndecoderrnn.load_state_dict(torch.load(PATH2))
my_attndecoderrnn.load_state_dict(torch.load(PATH2, map_location=lambda storage, loc: storage), False)
print('my_decoderrnn模型结构--->', my_attndecoderrnn)

my_samplepairs = [
['i m impressed with your french .', 'je suis impressionne par votre francais .'],
['i m more than a friend .', 'je suis plus qu une amie .'],
['she is beautiful like her mother .', 'elle est belle comme sa mere .']
]
print('my_samplepairs--->', len(my_samplepairs))

for index, pair in enumerate(my_samplepairs):
x = pair[0]
y = pair[1]

# 样本x 文本数值化
tmpx = [english_word2index[word] for word in x.split(' ')]
tmpx.append(EOS_token)
tensor_x = torch.tensor(tmpx, dtype=torch.long, device=device).view(1, -1)

# 模型预测
decoded_words, attentions = Seq2Seq_Evaluate(tensor_x, my_encoderrnn, my_attndecoderrnn)
# print('decoded_words->', decoded_words)
output_sentence = ' '.join(decoded_words)

print('\n')
print('>', x)
print('=', y)
print('<', output_sentence)


基于GRU模型的带编-解码器和注意力机制的英译法任务实现
https://linxkon.github.io/基于GRU模型的构建带编-解码器和注意力机制的英译法任务实现.html
作者
linxkon
发布于
2023年1月12日
许可协议