先放出 transformer 的整体结构图,以便复习,接下来就一个模块一个模块的实现它。
在这里插入图片描述


1. Embedding

在这里插入图片描述

Embedding 部分主要由两部分组成,即 Input Embedding 和 Positional Encoding,位置编码记录了每一个词出现的位置。通过加入位置编码可以提高模型的准确率,因为同一个词出现在不同位置可能代表了不同意思,这直接影响了最终的结果,所以要考虑位置因素。

位置编码公式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_angles(pos, i, d):
return pos / np.power(10000, (2 * (i//2)) / np.float32(d))


def positional_encoding(position, d):
theta = get_angles(np.arange(position)[:, np.newaxis], np.arange(d)[np.newaxis, :], d)

theta[:, 0::2] = np.sin(theta[:, 0::2])
theta[:, 1::2] = np.cos(theta[:, 1::2])

return theta[np.newaxis, ...] # shape: [1, position, d]


class Embedding(nn.Module):
def __init__(self, cfg):
super(Embedding, self).__init__()
self.dim = cfg.hidden_dim
self.device = cfg.device
self.word_em = nn.Embedding(num_embeddings=cfg.vocab_size, embedding_dim=self.dim).to(self.device)
self.position_em = positional_encoding(cfg.max_len, self.dim)

def forward(self, input_ids):
seq_length = input_ids.size(1)

we = self.word_em(input_ids)
we *= torch.sqrt(self.dim).to(self.device)
pe = self.position_em[:, :seq_length, :]

return we + pe

其中 cfg 文件用来存一些超参数,device 是选择使用 CPU / GPU 的设备编号。


2. Masking

为一些标记添加遮罩,这里主要用于一下两种情况:

  1. 对填充标记 [PAD] 进行遮罩,确保模型不会将填充作为输入。
  2. 前瞻遮挡(look-ahead mask)用于遮挡一个序列中的后续部分,比如说要预测第三个词,那么前面的 [CLS] 标记以及第一第二个词被保留用于预测,而其它位置的词则应该被遮住,不能输入模型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def create_padding_mask(seq):
seq.eq_(0)
# 添加额外的维度来将填充加到注意力对数(logits)。
return seq[:, np.newaxis, np.newaxis, :] # shape: [batch_size, 1, 1, seq_len]


def create_look_ahead_mask(seq_len):
return torch.triu(torch.ones(seq_len, seq_len), 1) # shape: [seq_len, seq_len]


def create_masks(src_ids, trg_ids):
en_padding_mask = create_padding_mask(src_ids)
de_padding_mask = create_padding_mask(src_ids)

look_ahead_mask = create_look_ahead_mask(trg_ids.shape[1])
de_trg_padding_mask = create_padding_mask(trg_ids)
combined_mask = torch.maximum(de_trg_padding_mask, look_ahead_mask)

return en_padding_mask, combined_mask, de_padding_mask

3. Attention

3.1 Scaled Dot-Product Attention (按比缩放的点积注意力)

在这里插入图片描述
点积注意力机制的公式为:

点积注意力被缩小了深度的平方根倍。这样做是因为对于较大的深度值,点积的大小会增大,从而推动 Softmax 函数往仅有很小的梯度的方向靠拢,这样可能会导致梯度消失。

例如,假设 Q 和 K 的均值为0,方差为1。它们的矩阵乘积将有均值为0,方差为 dk。因此,dk 的平方根被用于缩放(而非其他数值),因为,Q 和 K 的矩阵乘积的均值本应该为 0,方差本应该为1,这样会获得一个更平缓的 Softmax。

在 Softmax 之前要记得加入遮罩,遮罩乘以 -1e9 来添加一个无穷小的数,使得 Softmax 之后的输出在被遮住的部分接近于 0,这样就可以忽略该位置的信息。

3.2 Multi-Head Attention

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class MultiHeadAttention(nn.Module):
def __init__(self, cfg):
super(MultiHeadAttention, self).__init__()
self.n_head = cfg.n_head
self.dim = cfg.hidden_dim
self.device = cfg.device

self.wq = nn.Linear(self.dim, self.dim).to(self.device)
self.wk = nn.Linear(self.dim, self.dim).to(self.device)
self.wv = nn.Linear(self.dim, self.dim).to(self.device)

self.softmax = nn.Softmax(dim=3)
self.f = nn.Linear(self.dim, self.dim).to(cfg.device)

def split(self, tensor):
a, b, c = tensor.size()
d = c // self.n_head
return tensor.view(a, b, self.n_head, d).permute(0, 2, 1, 3)

def concat(self, tensor):
a, b, c, d = tensor.size()
return tensor.view(a, c, b * d)

def attention(self, k, q, v, mask):
_, _, _, d = k.size()
kt = torch.transpose(k, 2, 3)
s = (q @ kt) / math.sqrt(d) # Scale 操作,防止之后 Softmax 时梯度消失

if mask is not None:
s += (mask * -1e9);

s = self.softmax(s)
v = s @ v
return v

def forward(self, v, k, q, mask):
k, q, v = self.wk(v), self.wq(k), self.wv(q)
k, q, v = self.split(k), self.split(q), self.split(v)
output = self.attention(k, q, v, mask)
output = self.concat(output)
return self.f(output)


4. Add & Norm

在这里插入图片描述

残差连接层有助于避免深度网络中的梯度消失问题。

1
2
3
4
5
6
7
8
9
10
11
class AddNorm(nn.Module):
def __init__(self, cfg):
super(AddNorm, self).__init__()
self.dim = cfg.hidden_dim

self.norm = nn.LayerNorm(self.dim).to(cfg.device)
self.dropout = nn.Dropout(cfg.drop_out) # 正则化,参数为正则化概率

def forward(self, input, x):
input = self.dropout(input)
return self.norm(input + x)

5. Point wise feed forward network (点式前馈网络)

在这里插入图片描述
点式前馈网络由两层全联接层组成,两层之间有一个 ReLU 激活函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
class FeedForward(nn.Module):
def __init__(self, cfg):
super(FeedForward, self).__init__()
self.dim = cfg.hidden_dim

self.f1 = nn.Linear(self.dim, self.dim).to(cfg.device)
self.relu = nn.ReLU().to(cfg.device)
self.f2 = nn.Linear(self.dim, self.dim).to(cfg.device)

def forward(self, x):
x = self.f1(x)
x = self.relu(x)
return self.f2(x)

6. Encoder and Decoder

6.1 Encoder Layer

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class EncoderLayer(nn.Module):
def __init__(self, cfg):
super(Encoder, self).__init__()
self.attention = MultiHeadAttention(cfg)
self.fforward = FeedForward(cfg)

self.addnorm1 = AddNorm(cfg)
self.addnorm2 = AddNorm(cfg)

def forward(self, x, mask):
attn = self.attention(x, x, x, mask)
attn = self.addnorm1(attn, x)

ffw = self.fforward(attn)
ffw = self.addnorm2(ffw, attn)
return ffw

6.2 Encoder

编码器包括

  1. Embedding
  2. N 个编码器层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Encoder(nn.Module):
def __init__(self, cfg):
super(Encoder, self).__init__()
self.num_layers = cfg.encoder_layer_count

self.embedding = Embedding(cfg)
self.layers = [EncoderLayer(cfg) for _ in range(self.num_layers)]
self.dropout = nn.Dropout(cfg.drop_out)

def forward(self, x, mask):
x = self.embedding(x)
x = self.dropout(x)
for i in range(self.num_layers):
x = self.layers[i](x, mask)
return x

6.3 Decoder Layer

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class DecoderLayer(nn.Module):
def __init__(self, cfg):
super(Decoder, self).__init__()
self.attention1 = MultiHeadAttention(cfg)
self.attention2 = MultiHeadAttention(cfg)

self.fforward = FeedForward(cfg)

self.addnorm1 = AddNorm(cfg)
self.addnorm2 = AddNorm(cfg)
self.addnorm3 = AddNorm(cfg)

def forward(self, x, encoder_out, look_ahead_mask, padding_mask):
attn1 = self.attention1(x, x, x, look_ahead_mask)
ffw1 = self.addnorm1(attn1, x)

attn2 = self.attention2(encoder_out, encoder_out, ffw1, padding_mask)
ffw2 = self.addnorm2(attn2, ffw1)

output = self.fforward(ffw2)
output = self.addnorm3(output, ffw2)

return output

6.4 Decoder

解码器包括

  1. Embedding
  2. N 个解码器层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Decoder(nn.Module):
def __init__(self, cfg):
super(Decoder, self).__init__()
self.dim = cfg.dim
self.num_layers = cfg.decoder_layer_count

self.embedding = Embedding(cfg)
self.layers = [Decoder(cfg) for _ in range(self.num_layers)]
self.dropout = nn.Dropout(cfg.drop_out)

def forward(self, x, encode_output, look_ahead_mask, padding_mask):
x = self.embedding(x)
x = self.dropout(x)
for i in range(self.num_layers):
x = self.layers[i](x, encode_output, look_ahead_mask, padding_mask)
return x

7. Transformer

1
2
3
4
5
6
7
8
9
10
11
class Transformer(nn.Module):
def __init__(self, cfg):
super(Transformer, self).__init__()
self.encoder = Encoder(cfg)
self.decoder = Decoder(cfg)
self.linear = nn.Linear(cfg.hidden_dim, cfg.vocab_size).to(cfg.device)

def forward(self, src_ids, trg_ids, en_padding_mask, look_ahead_mask, de_padding_mask):
en_output = self.encoder(src_ids, en_padding_mask)
de_output = self.decoder(trg_ids, en_output, look_ahead_mask, de_padding_mask)
return self.linear(de_output)

8. 优化器 与 损失函数

根据论文中的公式,将 Adam 优化器与自定义的学习速率调度程序(scheduler)配合使用。

warmup_steps 为 4000 时的学习率变化如图:

在这里插入图片描述
由于目标序列是 padding 过的,因此在计算损失函数时,应该使用 mask 遮去 [PAD] 的部分。

1
2
3
4
5
6
7
8
def loss_function(y, pred):
mask = torch.logical_not(torch.eq(y, 0))
loss = criterion(real, pred) # 损失函数

mask = mask.float()
loss *= mask

return torch.mean(loss)

9. Tokenizer

顺便学一下训练一个自己的 tokenizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
tokenizer = Tokenizer(models.WordPiece(unk_token="[UNK]")) # 用 [UNK] 替换未知字符
tokenizer.normalizer = normalizers.Sequence( # 按一定规则标准化:NFD:统一 Unicode 编码
[normalizers.NFD(), normalizers.Lowercase()]
)

tokenizer.pre_tokenizer = pre_tokenizers.WhitespaceSplit() # 使用 空格 来分割
special_tokens = ["[UNK]", "[PAD]", "[CLS]", "[SEP]", "[MASK]"]
trainer = trainers.WordPieceTrainer(vocab_size=10000, special_tokens=special_tokens) # 分成 10000 类

tokenizer.train(['text.txt'], trainer=trainer) # 用于训练的文本
tokenizer.save('my_tokenizer.json')

# 使用
tokenizer = Tokenizer.from_file("my_tokenizer.json") # 调用自己的 tokenizer

s = "给他们来点小小的蜀国震撼"
tokens = tokenizer.encode(s)
print(tokens.ids, tokens.tokens)