1. 数据处理
定义词汇表,即输入文字总量大小,进行具体词汇到词汇表的映射
下面定义词汇表大小是12,不同词汇到表中序号的映射
# 创建词汇表映射
vocab = {"<pad>": 0, "<unk>": 1, "<start>": 2, "<end>": 3,
"今天": 4, "天气": 5, "真好": 6,
"适合": 7, "散步": 8, ",": 9 }
# 示例数据准备
def prepare_data():
# 创建模拟数据(数字序列)
sequences = [
[2, 4, 5, 6, 3], # <start>今天天气真好<end>
[2, 7, 8, 9, 3], # <start>适合散步,<end>
[2, 4, 8, 9, 3], # <start>今天散步,<end>
[2, 4, 5, 9, 3] # <start>今天天气,<end>
]
return NumberDataset(sequences)
NumberDataset将输入数据进行输入和预测的拆分
class NumberDataset(Dataset):
def __init__(self, sequences):
self.sequences = sequences
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq = self.sequences[idx]
# 输入是前n-1个token,目标是后n-1个token
return torch.LongTensor(seq[:-1]),torch.LongTensor(seq[1:])
这些数据放入DataLoader中,能够通过遍历训练数据来获得训练数据集
dataloader = DataLoader(d, batch_size=2, shuffle=True)
for batch in dataloader:
print("batch is %s" % batch)
其中batch是训练数据,每次按照batch_size来提取训练数据。shuffle表示提取的训练数据进行无序的处理。
2. 定义简单的LLM
2.1 词嵌入(embeding)
embeding过程是将输入的词,根据embeding的维度来进行扩展,具体为
# 词嵌入层
self.token_embedding = nn.Embedding(vocab_size, embed_dim)
利用下面的函数来测试这个功能
# 创建一个 Embedding 层
# 这里假设词汇表大小为 12,每个词的嵌入维度为 8
token_embedding = nn.Embedding(12, 8)
# 准备输入数据
# 假设我们有一个包含 3 个词索引的序列
input_data = torch.tensor([1, 0, 1])
# 使用嵌入层对输入数据进行嵌入操作
embedded_output = token_embedding(input_data)
# 打印嵌入层的权重矩阵
print("嵌入层的权重矩阵:")
print(token_embedding.weight)
# 打印输入数据
print("\n输入数据:")
print(input_data)
# 打印嵌入后的输出
print("\n嵌入后的输出:")
print(embedded_output)
# 检查嵌入后的输出形状
print("\n嵌入后输出的形状:")
print(embedded_output.shape)
embeding的权重矩阵是12 8, 输入的每个词会展开成为1 12矩阵,embeing后成为1 * 8
输入 3 个词索引的序列, 输出为 3 *8
2.2 位置编码
Transformer模型不同于传统的RNN模型,不会有时间步或位置状态。因此,Transformer需要一种方法来捕捉序列中各个位置之间的关系。位置编码可能就是用来解决这个问题的一种方式。
用来识别每个词在样本句子中的位置信息,例如上面输入的语句中需要定义最多输入多少个字,这里定义为5, 然后每个字的位置就能够用一个15的向量表示,再经过embeding处理变成18的特征矩阵,具体定义如下
self.position_embedding = nn.Embedding(max_seq_len, embed_dim)
2.3 Transormer层
Transformer中主要进行多头注意的运算;多头的含义可以理解为从多个方面对输入语言进行理解,多头的数量必须是embeding后向量可整除;
在每个注意力模型又对Q/K/V向量进行运算,先将Q和K进行融合计算,然后再联合V进行计算
这个图中提供了从embeding --> QKV构建 --> 多头注意力的向量变换过程。embeding后输入的每个词都会变为embeding维的向量,然后这个向量维度扩大3倍,再根据multi-head数量将向量进行多个multi-head维度上维度构建
每个维度上Q和K进行乘积运算后得到点积注意力,然后将注意力权重到V,最终合并多个注意力头;最终完成线性变换
最终输出的维度是还是embeding维
class MultiHeadSelfAttention(nn.Module):
def __init__(self, embed_dim, num_heads):
"""
Args:
embed_dim: 输入特征的维度(d_model)
num_heads: 注意力头的数量
"""
super().__init__()
assert embed_dim % num_heads == 0, "embed_dim必须能被num_heads整除"
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
# 线性变换生成Q、K、V
self.qkv_proj = nn.Linear(embed_dim, 3 * embed_dim)
# 最终的输出线性变换
self.out_proj = nn.Linear(embed_dim, embed_dim)
def forward(self, x):
"""
Args:
x: 输入张量形状 [batch_size, seq_len, embed_dim]
Returns:
输出张量形状 [batch_size, seq_len, embed_dim]
"""
batch_size, seq_len, embed_dim = x.size()
# 生成Q、K、V (每个的形状都是 [batch_size, seq_len, embed_dim])
qkv = self.qkv_proj(x)
print("qkv is %s" % qkv)
# 分割Q、K、V并重塑形状以分离注意力头
# 重塑后形状: [batch_size, seq_len, num_heads, 3*head_dim]
qkv = qkv.reshape(batch_size, seq_len, self.num_heads, 3 * self.head_dim)
print("reshape qkv is %s" % qkv)
# 分解为Q、K、V (每个的形状 [batch_size, num_heads, seq_len, head_dim])
q, k, v = torch.chunk(qkv, 3, dim=-1)
q = q.permute(0, 2, 1, 3) # [batch_size, num_heads, seq_len, head_dim]
k = k.permute(0, 2, 1, 3)
v = v.permute(0, 2, 1, 3)
# 计算缩放点积注意力
scale = self.head_dim ** -0.5
scores = torch.matmul(q, k.transpose(-2, -1)) * scale # [batch_size, num_heads, seq_len, seq_len]
attn_weights = F.softmax(scores, dim=-1)
# 应用注意力权重到V
context = torch.matmul(attn_weights, v) # [batch_size, num_heads, seq_len, head_dim]
# 合并多个注意力头
context = context.permute(0, 2, 1, 3) # [batch_size, seq_len, num_heads, head_dim]
context = context.reshape(batch_size, seq_len, embed_dim) # 合并最后两个维度
# 最终线性变换
output = self.out_proj(context)
return output
使用如下测试程序可以看到,输入和输出的形状是一样,最终变换的目的是再多维空间中将特征分隔出来
if __name__ == "__main__":
# 超参数
embed_dim = 64 # 输入特征维度
num_heads = 4 # 注意力头数量
seq_len = 3 # 输入序列长度
batch_size = 1 # 批大小
# 创建模块实例
mhsa = MultiHeadSelfAttention(embed_dim, num_heads)
# 生成随机输入(模拟一个batch的输入序列)
x = torch.randn(batch_size, seq_len, embed_dim)
# 前向传播
output = mhsa(x)
print("输入形状:", x.shape)
print(x)
print("输出形状:", output.shape)
print(output)
输入形状: torch.Size([1, 3, 64])
输出形状: torch.Size([1, 3, 64])
以上是一个transformer blok的实现过程;
class TransformerBlock(nn.Module):
"""Transformer解码器块(简化版,没有交叉注意力)"""
def __init__(self, embed_dim, num_heads):
super().__init__()
# 自注意力层
self.self_attn = MultiHeadSelfAttention(embed_dim, num_heads)
# 前馈神经网络
self.ffn = nn.Sequential(
nn.Linear(embed_dim, 4 * embed_dim), # 扩展维度
nn.GELU(),
nn.Linear(4 * embed_dim, embed_dim) # 恢复维度
)
# 层归一化
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
def forward(self, x):
# 残差连接 + 层归一化
x = x + self.self_attn(self.norm1(x)) # 自注意力
x = x + self.ffn(self.norm2(x)) # 前馈网络
return x
将多个transformer block做组合叠加,即完成了编码器部分的内容
编码器总体上分为四步:
词嵌入层
- 将离散的token索引转换为连续向量表示
位置编码层
- 为序列中的每个位置生成位置信息
- 这里使用可学习的位置嵌入(与原始Transformer的固定编码不同)
Transformer层
- 核心结构:多头自注意力 + 前馈网络
- 残差连接和层归一化帮助梯度流动
- 多个堆叠层(通常6-100层)实现深层特征提取
线性输出层
- 将隐藏层输出映射回词表空间
- 通过softmax得到下一个token的概率分布
输出的结果是在字符表空间中每一个字符出现的概率
初始化代码如下:
class SimpleLLM(nn.Module):
def __init__(self, vocab_size, embed_dim, num_heads, num_layers, max_seq_len):
"""
超参数说明:
vocab_size: 词表大小
embed_dim: 词向量维度(d_model)
num_heads: 多头注意力的头数
num_layers: Transformer层数
max_seq_len: 最大序列长度
"""
super().__init__()
self.vocab_size = vocab_size
self.embed_dim = embed_dim
# 1. 词嵌入层
self.token_embedding = nn.Embedding(vocab_size, embed_dim)
# 2. 位置编码层
self.position_embedding = nn.Embedding(max_seq_len, embed_dim)
# 3. Transformer层(解码器结构)
self.transformer_layers = nn.ModuleList([
TransformerBlock(embed_dim, num_heads)
for _ in range(num_layers)
])
# 4. 最后的线性输出层
self.lm_head = nn.Linear(embed_dim, vocab_size)
相应的前向传播函数如下:
def forward(self, input_ids):
"""
输入参数:
input_ids: 输入token的索引序列 [batch_size, seq_len]
输出:
logits: 预测每个位置的下一个token概率 [batch_size, seq_len, vocab_size]
"""
batch_size, seq_len = input_ids.shape
# 生成位置编码的索引
positions = torch.arange(0, seq_len).expand(batch_size, seq_len)
# 词嵌入 + 位置编码
token_embeds = self.token_embedding(input_ids) # [B, S, D]
pos_embeds = self.position_embedding(positions) # [B, S, D]
x = token_embeds + pos_embeds
# 逐层通过Transformer块
for layer in self.transformer_layers:
x = layer(x)
# 输出投影到词表空间
logits = self.lm_head(x) # [B, S, V]
return logits
编码器的总体目标是输入一个词向量,来预测下一个的词的概率
4. 模型的训练
模型训练的思路是根据输入参数进行前向传播
前向传播结果与target比较计算lost
根据lost进行后向传播
# 训练函数
def train_model(model, dataloader, config):
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=config.lr)
for epoch in range(config.epochs):
total_loss = 0
for inputs, targets in dataloader:
optimizer.zero_grad()
# 前向传播
logits = model(inputs) # [B, S, V]
# 计算损失(只计算最后一个位置的预测)
loss = criterion(logits[:, -1, :], targets[:, -1])
# 反向传播
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch + 1}/{config.epochs}, Loss: {total_loss / len(dataloader):.4f}")
5. 模型的推理
推理过程分为Prefill和decode阶段,
- Prefill逐个对输入的词完成在词汇表中index查找,并将输入句子转换为连续向量,将连续向量输入到模型完成完整的前向传播,处理输入序列中素有token
- Decode根据前向传播计算出来的下一个token预测,放入到输入序列中,然后再次进行推理,直到满足停止条件
下面的代码是对输入句子完成到词汇表的index转换
# 实现一个简化的Tokenizer(示例用)
class SimpleTokenizer:
def __init__(self, vocab):
self.vocab = vocab # 假设vocab是字典{word: id}
self.inverse_vocab = {v: k for k, v in vocab.items()}
def encode(self, text):
# 简单按空格分割(实际应使用BPE等算法)
return [self.vocab.get(word, self.vocab["<unk>"]) for word in text.split()]
def decode(self, ids):
return " ".join([self.inverse_vocab.get(i, "<unk>") for i in ids])
下面的代码中进行下一个词的预测,整体过程包含了Prefill和Decode的过程
def generate_text(model, input_text, tokenizer, max_length=20, temperature=1.0):
"""
文本生成函数
Args:
model: 训练好的LLM模型
input_text: 输入的起始文本(字符串)
tokenizer: 文本到token的转换器(这里简化实现)
max_length: 生成文本的最大长度
temperature: 控制生成随机性的温度参数
Returns:
生成的完整文本(字符串)
"""
model.eval() # 切换到评估模式
generated_tokens = []
# 1. 将输入文本转换为token IDs
input_ids = tokenizer.encode(input_text)
if len(input_ids) == 0:
input_ids = [tokenizer.vocab["<start>"]] # 添加起始符
# 2. 自回归生成循环
for _ in range(max_length):
# 创建输入张量(保持序列长度不超过max_seq_len)
inputs = torch.tensor([input_ids[-model.max_seq_len:]], dtype=torch.long)
# 3. 前向传播获取预测logits
with torch.no_grad():
logits = model(inputs) # [1, seq_len, vocab_size]
# 4. 获取最后一个位置的预测结果
next_token_logits = logits[0, -1, :] / temperature
# 5. 通过softmax转换为概率分布
probs = F.softmax(next_token_logits, dim=-1)
# 6. 从分布中采样下一个token(这里使用top-k采样)
top_k = 5
top_probs, top_indices = torch.topk(probs, top_k)
next_token = top_indices[torch.multinomial(top_probs, 1)].item()
# 7. 终止条件判断(假设0是结束符)
if next_token == tokenizer.vocab["<end>"]:
break
# 8. 将新token添加到序列中
input_ids.append(next_token)
generated_tokens.append(next_token)
# 9. 将token IDs转换回文本
return tokenizer.decode(input_ids + generated_tokens)