您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 
 订阅
Transformer 一起动手编码学原理
 
作者:伯涵

   次浏览      
2023-9-18
 
编辑推荐:
本文主要介绍了如何动手写一个Transformer。希望对您的学习有所帮助 。
本文来自于微信公众号阿里开发者,由火龙果软件Alice编辑、推荐。

作为工程同学,学习Transformer中,不动手写一个,总感觉理解不扎实。纸上得来终觉浅,绝知此事要躬行,抽空多debug几遍!

注:不涉及算法解释,仅是从工程代码实现去加强理解。

一、准备知识

以预测二手房价格的模型,先来理解机器学习中的核心概念。

1.1、手撸版

1、准备训练数据

# 样本数num_examples = 1000# 特征数:房屋大小、新旧num_input = 2
# 线性回归模型,2个W参数值:2,-3.4true_w = [2, -3.4]#
1个b参数值
true_b = 4.2
# 随机生成样本:特征值features = torch.
randn(num_examples, num_input, dtype=torch.float32)
# print(features[0])
# 随机生成样本:y值labels = true_w[0] * features[:,
0] + true_w[1] * features[:, 1] + true_b
# print(labels[0])
# 随机生成一批数据,给y值再随机化下_temp = torch.tensor(np.
random.normal(0, 0.01, size=labels.size()),
dtype=torch.float32)
labels = labels + _temp# print(labels[0])

2、定义模型

# 模型参数 w:w1、w2,b:b,设了初始值w = torch.tensor
(np.random.normal(0, 0.01, (num_input, 1)), dtype=torch.float32)
b = torch.zeros(1, dtype=torch.float32)# 设置要求计算梯度
w.requires_grad_(requires_grad=True)

b.requires_grad_(requires_grad=True)

# 线性回归模型:定义 torch.mm(X, w) + b,mm是矩阵乘法
(m:multiply缩写)
def linreg(X, w, b): return torch.mm(X, w) + b
# 定义损失函数:y_pred=预测值、y=真实值,最
简单的 平方误差
def squared_loss(y_pred, y):
return (y_pred - y.view(y_pred.size())) ** 2 / 2

# 定义梯度下降算法:sgd# params是参数(data 是参数
值、grad 是梯度值),lr是学习率,batch是数量
#
为什么要除batch?因为这个梯度值,是在batch个数据上累加算
def sgd(params, lr, batch): for param in params:
# 注意这里更改param时用的param.data

param.data -= lr * param.grad / batch

3、训练模型

# 初始学习率lr = 0.03# 训练轮次epoch = 5# 网络:一层,
2个输入(2个参数 w1、w2),1个b参数
net = linreg#
损失函数
loss = squared_loss# 1个批次大小batch_size = 10
# 训练模型一共需要epoch个迭代周期for epoch in
range(epoch):
# 在每一个迭代周期中,所有
样本都要跑,从 0~1000,每个batch=10
for
X, y in data_iter(batch_size, features,
labels):
# ls是这个batch的损失求和
ls = loss(net(X, w, b), y).sum()

# 求梯度值
ls.backward()
# 更新参数
sgd([w, b], lr, batch_size)
# 不要忘了梯度清零,下一次要用
w.grad.data.zero_()
b.grad.data.zero_()
# 在计算这轮训练完后,打印 loss,可以
观察在不断变小
train_l = loss(net(features, w, b),
labels)
print('epoch %d, loss %f'
% (epoch + 1, train_l.mean().item()))

 

1.2、pytorch版

1、准备训练数据

同上

2、定义模型

# 定义模型:线性回归class LinearNet(nn.Module):    
def __init__(self, n_feature):

super(LinearNet, self).__init__()
# 线
性规划:n_feature=几个w,1=1个b
self
.linear = nn.Linear(n_feature, 1)

# forward 定义【前向传播】 def forward
(self, x)
:
y = self.linear(x) return y
# 实例化模型,初始化参数值net = LinearNet(num_input)
init.normal_(net[0].weight, mean=0, std=0.01)
init.constant_(net[0].bias, val=0)

# 定义损失函数loss = nn.MSELoss()
# 定义梯度下降算法(lr是学习率)optimizer
= optim.SGD(net.parameters(), lr=0.03)

 

3、训练模型

num_epochs = 3for epoch in range(1, num_epochs + 1):    for X,
y in data_iter:
output = net(X) # 算损
失值
l_sum = loss(output, y.view(-1, 1))
# 更新梯度
l_sum.backward() # 更新w和b
optimizer.step()
# 梯度清零
optimizer.zero_grad()
print('epoch %d,
loss: %f'
% (epoch, l_sum.item()))

batch 读取数据

# 读取数据 by batch & 随机batch_size = 10
data_set
= Data.TensorDataset(features, labels)
data_iter = Data.DataLoader(data_set, batch_size,
shuffle=True)

 

在用torch框架,进一步要理解:

a、nn.Module 和 forward() 函数:表示神经网络中的一个层,覆写 init 和 forward 方法(提问:实现2层网络怎么做?)

b、debug 观察下数据变化,和 手撸版 对比着去理解

二、手写Transformer

上面这是一个"逻辑"示意图。印象中,第1次看时,以为 "inputs & outputs" 是并行计算、但上面又有依赖,很糊涂。

这不是一个技术架构图,上图有很多概念,attention、multi-head等。先尝试转换为面向对象的类图,如下:

2.1、例子:翻译

# 定义词典source_vocab = {'E': 0, '我': 1, '吃': 2, '肉'
: 3}
target_vocab = {'E': 0, 'I': 1, 'eat': 2, 'meat': 3, 'S': 4}
# 样本数据encoder_input = torch.LongTensor([[1, 2, 3,
0]]).to(device) # 我 吃 肉 E, E代表结束词
decoder_input
= torch.LongTensor([[4, 1, 2, 3]]).to(device) # S I eat meat,
S代表开始词, 并右移一位,用于并行训练
target =
torch.LongTensor([[1, 2, 3, 0]]).to(device)
# I eat meat E, 翻译目标

2.2、定义模型

按照上面类图,我们来一点点实现

1、Attention

class ScaledDotProductAttention(nn.Module): 
def __init__(self):

super(ScaledDotProductAttention, self).__init__()

def forward(self, Q, K, V, attn_mask):
# Q、K、V,此时是已经乘过 W(q)、W(k)、
W(v) 矩阵
# 如下图,但不用一个个算,矩阵
乘法一次搞定
scores = torch.matmul
(Q, K.transpose(-1, -2)) / np.sqrt(d_k)

# 遮盖区的值设为近0,表示E结尾 or decoder 自我
顺序遮盖,注意力丢弃
scores.masked_fill_
(attn_mask, -1e9)
# softmax后(遮盖区变为0)
attn = nn.Softmax(dim=-1)(scores)
# 乘积意义:给V带上了注意力信息。prob就是下图z
(矩阵计算不用在v1+v2)。
prob = torch.matmul(attn, V)
return prob

 

2、MultiHeadAttention

通用变量定义

d_model = 6  # embedding size# d_model = 3  # 
embedding size
d_ff = 12 # feedforward nerual
network dimension
d_k = d_v = 3 # dimension
of k(same as q) and v
n_heads = 2 #
number of heads in multihead attention
#
n_heads = 1 # number of heads in multihead
attention【注:为debug更简单,可以先改为1个head】
p_drop = 0.1 # propability of dropoutdevice = "cpu"

 

注1:按惯性会想,会有多个head、串行循环计算,不是,多个head是一个张量输入

注2:FF 全连接、残差连接、归一化,35、38 行业代码,pytorch框架带来的简化

class MultiHeadAttention(nn.Module):    def __init__
(self)
:
super(MultiHeadAttention, self)
.__init__()
self.n_heads = n_heads
self
.W_Q = nn.Linear(d_model, d_k * n_heads,
bias=False)
self.W_K = nn.Linear
(d_model, d_k * n_heads, bias=False)

self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
self.fc = nn.Linear(d_v * n_heads,
d_model, bias=False) # ff 全连接

self
.layer_norm = nn.LayerNorm(d_model) #
normal 归一化

def forward(self, input_Q, input_K, input_V,
attn_mask)
:
# input_Q:1*4*6,每批1句
* 每句4个词 * 每词6长度编码
# residual 先
临时保存下:原始值,后面做残差连接加法

residual, batch = input_Q, input_Q.size(0)

# 乘上 W 矩阵。注:W 就是要训练的参数
# 注意:维度从2维变成3维,增加 head 维度,
也是一次性并行计算
Q = self.W_Q(input_Q)
# 乘以 W(6*6) 变为 1*4*6

Q = Q.view(batch, -1, n_heads, d_k).
transpose(1, 2) # 切开为2个Head 变为 1*2*4*3
1批 2个Head 4词 3编码
K = self.
W_K(input_K).view(batch, -1, n_heads, d_k).
transpose(1, 2)
V = self.W_V(input_V).view(batch,
-1, n_heads, d_v).transpose(1, 2)

# 1*2*4*4,2个Head的4*4,最后一
列为true
# 因为最后一列是 E 结束符
attn_mask = attn_mask.unsqueeze(1).repeat
(1, n_heads, 1, 1)

# 返回1*2*4*3,2个头,4*3为
带上关注关系的4词
prob = ScaledDotProductAttention
()(Q, K, V, attn_mask)

# 把2头重新拼接起来,变为 1*4*6
prob = prob.transpose(1, 2).
contiguous()
prob = prob.view(batch, -
1
, n_heads * d_v).contiguous()

# 全连接层:对多头注意力的输出进行
线性变换,从而更好地提取信息

output = self.fc(prob)

# 残差连接 & 归一化 res =
self.layer_norm(residual + output) # return 1*4*6
return res

3、Encoder

在 attention 概念中,有很关键的 "遮盖" 概念,先不细究,你debug一遍会更理解

def get_attn_pad_mask(seq_q, seq_k):  # 本质是结尾E做注意
力遮盖,返回 1*4*4,最后一列为True
batch, len_q = seq_q.size()
# 1, 4
batch, len_k = seq_k.size() # 1, 4
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
# 为0则为true,变为f,f,f,true,意思是把0这个
结尾标志为true
return pad_attn_mask.
expand(batch, len_q, len_k) # 扩展为1*4*4,最
后一列为true,表示抹掉结尾对应的注意力


def get_attn_subsequent_mask(seq): # decoder的自我
顺序注意力遮盖,右上三角形区为true的遮盖

attn_shape = [seq.size(0), seq.size(1),
seq.size(1)]
subsequent_mask = np.triu(np.ones(attn_shape),
k=1)
subsequent_mask = torch.from_numpy
(subsequent_mask)
return subsequent_mask

 

class Encoder(nn.Module):    def __init__(self
)
:
super(Encoder, self).__init__()
self.source_embedding = nn.Embedding
(len(source_vocab), d_model)

self
.attention = MultiHeadAttention()

def forward(self, encoder_input):
# input 1 * 4,1句话4个单词
#
1 * 4 * 6,将每个单词的整数字编码
扩展到6个浮点数编码

embedded = self.
source_embedding(encoder_input)

# 1 * 4 * 4 矩阵,最后一列为true,
表示忽略结尾词的注意力机制

mask = get_attn_pad_mask(encoder_input,
encoder_input)
# 1*4*6,带上关注
力的4个词矩阵
encoder_output = self
.attention(embedded, embedded, embedded, mask)
return encoder_output

 

4、Decoder

class Decoder(nn.Module):
def __init__(self): super
(Decoder, self).__init__()
self.target_
embedding =
nn.Embedding(len(target_vocab),
d_model)
self.attention =
MultiHeadAttention()

# 三入参形状分别为 1*4, 1*4, 1*4*6,前
两者未被embedding,注意后面这个是 encoder_output

def forward(self, decoder_input,
encoder_input, encoder_output)
:

# 编码为1*4*6
decoder_embedded =
self.target_embedding(decoder_input)

# 1*4*4 全为false,表示没有结尾词
decoder_self_attn_mask = get_attn_pad
_mask(decoder_input, decoder_input)

# 1*4*4 右上三角区为1,其余为0

decoder_subsequent_mask = get_attn_subsequent
_mask
(decoder_input)
# 1*4*4 右上三
角区为true,其余为false

decoder_self_mask = torch.gt
(decoder_self_attn_mask + decoder_subsequent_mask, 0)

# 1*4*6 带上注意力的4词矩阵【注:decoder里面,第1个attention】
decoder_output = self.
attention(decoder_embedded, decoder_
embedded, decoder_embedded, decoder_self_mask)

# 1*4*4 最后一列为true,表示E结尾词
decoder_encoder_attn_mask = get_attn_pad_mask
(decoder_input, encoder_input)
#
输入均为 1*4*6,Q表示"S I eat meat"、K表示"我吃肉E"
、V表示 "我吃肉E"
#【注:decoder里面,第2个attention】
decoder_output = self.attention
(decoder_output, encoder_output, encoder_
output, decoder_encoder_attn_mask)
return decoder_output

 

5、Transformer

class Transformer(nn.Module):    def __init__(self):    
super(Transformer, self).__init__()

self
.encoder = Encoder()
self.decoder =
Decoder()
self.fc = nn.Linear
(d_model, len(target_vocab), bias=False)

def forward(self, encoder_input, decoder_input)
:
# 入 1*4,出 1*4*6,作用:"我吃肉E",并带上三词间的关注力信息
encoder_output = self.encoder(encoder_input)

# 入 1*4, 1*4, 1*4*6=encoder_output

decoder_output = self.decoder(decoder_input,
encoder_input, encoder_output)
# 预测
出4个词,每个词对应到词典中5个词的概率,如下

# tensor([[[ 0.0755, -0.2646, 0.1279, -0.3735, -0.2351],
[-1.2789, 0.6237, -0.6452, 1.1632, 0.6479]]]
decoder_logits = self.fc(decoder_output)
res = decoder_logits.view(-1,
decoder_logits.size(-1))
return res

 

2.3、训练模型

model = Transformer().to(device)criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-1)
for epoch in range(10): # 输出4*5,代表预测出4个词
,每个词对应到词典中5个词的概率
output = model
(oencder_input, decoder_input)
# 和目标词
I eat meat E做差异计算
loss =
criterion(output, target.view(-1))

print('Epoch:', '%04d' % (epoch + 1),
'loss ='
, '{:.6f}'.format(loss))
# 这个3
个操作:清零梯度、算法梯度、更新参数
optimizer.zero_grad
()
loss.backward() optimizer.step()

 

2.4、使用模型

# 预测目标是5个单词target_len = len(target_vocab)  #
1*4*6 输入"我吃肉E",先算【自注意力】
encoder_output =
model.encoder(encoder_input)
# 1*5 全是0,
表示EEEEE
decoder_input = torch.zeros(1,
target_len).type_as(encoder_input.data)
#
表示S开始字符
next_symbol = 4
# 5个单词逐个预测【注意:是一个个追加词,
不断往后预测的】
for i in range(target_len):
# 譬如i=0第一轮,decoder输入为SEEEE,
第二轮为S I EEE,把预测 I 给拼上去,
继续循环
decoder_input[0][i] = next_symbol
# decoder 输出 decoder_output =
model.decoder(decoder_input, encoder_input,
encoder_output)
# 负责将解码器的输出映射到目
标词汇表,每个元素表示对应目标词汇的分数

# 取出最大的五个词的下标,譬如[1, 3, 3, 3, 3]
表示 i,meat,meat,meat,meat
logits =
model.fc(decoder_output).squeeze(0)

prob = logits.max(dim=1, keepdim=False)[1]
next_symbol = prob.data[i].item() # 只取当前i
for k, v in target_vocab.items():
if v == next_symbol:
print('第', i, '轮:', k) break
if next_symbol == 0: # 遇到结尾了,那就完成翻译 break

 

 

   
次浏览       
相关文章

编译原理--C语言C文件和头文件的关系
用 C 语言开发一门编程语言 — 抽象语法树
C语言 | 嵌入式C语言编程规范
详解C语言数组越界及其避免方法
 
相关文档

C语言-指针讲解
详解C语言中的回调函数
ARM下C语言编程解析
设计模式的C语言实现
相关课程

C++高级编程
C++并行编程与操作
C++ 11,14,17,20新特性
C/C++开发基础

最新活动计划
SysML和EA系统设计与建模 1-16[北京]
企业架构师(业务、应用、技术) 1-23[北京]
大语言模型(LLM)Fine Tune 2-22[在线]
MBSE(基于模型的系统工程)2-27[北京]
OpenGauss数据库调优实践 3-11[北京]
UAF架构体系与实践 3-25[北京]
 
 
最新文章
编译原理--C语言C文件和头文件的关系
用 C 语言开发一门编程语言 — 抽象语法树
C语言 | 嵌入式C语言编程规范
详解C语言数组越界及其避免方法
最新课程
C++高级编程
C++并行编程与操作
C++ 11,14,17,20新特性
C/C++开发基础
成功案例
某航天科工单位 C++新特性与开发进阶
北京 C#高级开发技术
四方电气集团 嵌入式高级C语言编程
北大方正 C语言单元测试实践