一个简单的LLM实现

1. 数据处理

定义词汇表,即输入文字总量大小,进行具体词汇到词汇表的映射
下面定义词汇表大小是12,不同词汇到表中序号的映射

# 创建词汇表映射
vocab = {"<pad>": 0, "<unk>": 1, "<start>": 2, "<end>": 3,
"今天": 4, "天气": 5, "真好": 6,
"适合": 7, "散步": 8, ",": 9 }

# 示例数据准备

def prepare_data():
# 创建模拟数据(数字序列)
sequences = [
    [2, 4, 5, 6, 3],  # <start>今天天气真好<end>
    [2, 7, 8, 9, 3],  # <start>适合散步,<end>
    [2, 4, 8, 9, 3], # <start>今天散步,<end>
    [2, 4, 5, 9, 3] # <start>今天天气,<end>
]
return NumberDataset(sequences)

NumberDataset将输入数据进行输入和预测的拆分

class NumberDataset(Dataset):
def __init__(self, sequences):
    self.sequences = sequences

def __len__(self):
    return len(self.sequences)

def __getitem__(self, idx):
    seq = self.sequences[idx]
    # 输入是前n-1个token,目标是后n-1个token
    return torch.LongTensor(seq[:-1]),torch.LongTensor(seq[1:])

这些数据放入DataLoader中,能够通过遍历训练数据来获得训练数据集

dataloader = DataLoader(d, batch_size=2, shuffle=True)
for batch in dataloader:
    print("batch is %s" % batch)

其中batch是训练数据,每次按照batch_size来提取训练数据。shuffle表示提取的训练数据进行无序的处理。

2. 定义简单的LLM

2.1 词嵌入(embeding)

embeding过程是将输入的词,根据embeding的维度来进行扩展,具体为

# 词嵌入层
self.token_embedding = nn.Embedding(vocab_size, embed_dim)

利用下面的函数来测试这个功能

# 创建一个 Embedding 层
# 这里假设词汇表大小为 12,每个词的嵌入维度为 8
token_embedding = nn.Embedding(12, 8)

# 准备输入数据
# 假设我们有一个包含 3 个词索引的序列
input_data = torch.tensor([1, 0, 1])

# 使用嵌入层对输入数据进行嵌入操作
embedded_output = token_embedding(input_data)

# 打印嵌入层的权重矩阵
print("嵌入层的权重矩阵:")
print(token_embedding.weight)

# 打印输入数据
print("\n输入数据:")
print(input_data)

# 打印嵌入后的输出
print("\n嵌入后的输出:")
print(embedded_output)

# 检查嵌入后的输出形状
print("\n嵌入后输出的形状:")
print(embedded_output.shape)

embeding的权重矩阵是12 8, 输入的每个词会展开成为1 12矩阵,embeing后成为1 * 8
输入 3 个词索引的序列, 输出为 3 *8

2.2 位置编码

Transformer模型不同于传统的RNN模型,不会有时间步或位置状态。因此,Transformer需要一种方法来捕捉序列中各个位置之间的关系。位置编码可能就是用来解决这个问题的一种方式。
用来识别每个词在样本句子中的位置信息,例如上面输入的语句中需要定义最多输入多少个字,这里定义为5, 然后每个字的位置就能够用一个15的向量表示,再经过embeding处理变成18的特征矩阵,具体定义如下

self.position_embedding = nn.Embedding(max_seq_len, embed_dim)

2.3 Transormer层

tranformer.jpg
Transformer中主要进行多头注意的运算;多头的含义可以理解为从多个方面对输入语言进行理解,多头的数量必须是embeding后向量可整除;
在每个注意力模型又对Q/K/V向量进行运算,先将Q和K进行融合计算,然后再联合V进行计算

multi_attention.jpg
这个图中提供了从embeding --> QKV构建 --> 多头注意力的向量变换过程。embeding后输入的每个词都会变为embeding维的向量,然后这个向量维度扩大3倍,再根据multi-head数量将向量进行多个multi-head维度上维度构建

每个维度上Q和K进行乘积运算后得到点积注意力,然后将注意力权重到V,最终合并多个注意力头;最终完成线性变换

最终输出的维度是还是embeding维

class MultiHeadSelfAttention(nn.Module):
def __init__(self, embed_dim, num_heads):
    """
    Args:
        embed_dim: 输入特征的维度(d_model)
        num_heads: 注意力头的数量
    """
    super().__init__()
    assert embed_dim % num_heads == 0, "embed_dim必须能被num_heads整除"

    self.embed_dim = embed_dim
    self.num_heads = num_heads
    self.head_dim = embed_dim // num_heads

    # 线性变换生成Q、K、V
    self.qkv_proj = nn.Linear(embed_dim, 3 * embed_dim)
    # 最终的输出线性变换
    self.out_proj = nn.Linear(embed_dim, embed_dim)

def forward(self, x):
    """
    Args:
        x: 输入张量形状 [batch_size, seq_len, embed_dim]
    Returns:
        输出张量形状 [batch_size, seq_len, embed_dim]
    """
    batch_size, seq_len, embed_dim = x.size()

    # 生成Q、K、V (每个的形状都是 [batch_size, seq_len, embed_dim])
    qkv = self.qkv_proj(x)

    print("qkv is %s" % qkv)

    # 分割Q、K、V并重塑形状以分离注意力头
    # 重塑后形状: [batch_size, seq_len, num_heads, 3*head_dim]
    qkv = qkv.reshape(batch_size, seq_len, self.num_heads, 3 * self.head_dim)

    print("reshape qkv is %s" % qkv)

    # 分解为Q、K、V (每个的形状 [batch_size, num_heads, seq_len, head_dim])
    q, k, v = torch.chunk(qkv, 3, dim=-1)
    q = q.permute(0, 2, 1, 3)  # [batch_size, num_heads, seq_len, head_dim]
    k = k.permute(0, 2, 1, 3)
    v = v.permute(0, 2, 1, 3)

    # 计算缩放点积注意力
    scale = self.head_dim ** -0.5
    scores = torch.matmul(q, k.transpose(-2, -1)) * scale  # [batch_size, num_heads, seq_len, seq_len]
    attn_weights = F.softmax(scores, dim=-1)

    # 应用注意力权重到V
    context = torch.matmul(attn_weights, v)  # [batch_size, num_heads, seq_len, head_dim]

    # 合并多个注意力头
    context = context.permute(0, 2, 1, 3)  # [batch_size, seq_len, num_heads, head_dim]
    context = context.reshape(batch_size, seq_len, embed_dim)  # 合并最后两个维度

    # 最终线性变换
    output = self.out_proj(context)
    return output

使用如下测试程序可以看到,输入和输出的形状是一样,最终变换的目的是再多维空间中将特征分隔出来

if __name__ == "__main__":
# 超参数
embed_dim = 64  # 输入特征维度
num_heads = 4  # 注意力头数量
seq_len = 3  # 输入序列长度
batch_size = 1  # 批大小

# 创建模块实例
mhsa = MultiHeadSelfAttention(embed_dim, num_heads)

# 生成随机输入(模拟一个batch的输入序列)
x = torch.randn(batch_size, seq_len, embed_dim)


# 前向传播
output = mhsa(x)

print("输入形状:", x.shape)
print(x)
print("输出形状:", output.shape)
print(output)

输入形状: torch.Size([1, 3, 64])
输出形状: torch.Size([1, 3, 64])



以上是一个transformer blok的实现过程;

class TransformerBlock(nn.Module):
"""Transformer解码器块(简化版,没有交叉注意力)"""

def __init__(self, embed_dim, num_heads):
    super().__init__()
    # 自注意力层
    self.self_attn = MultiHeadSelfAttention(embed_dim, num_heads)
    # 前馈神经网络
    self.ffn = nn.Sequential(
        nn.Linear(embed_dim, 4 * embed_dim),  # 扩展维度
        nn.GELU(),
        nn.Linear(4 * embed_dim, embed_dim)  # 恢复维度
    )
    # 层归一化
    self.norm1 = nn.LayerNorm(embed_dim)
    self.norm2 = nn.LayerNorm(embed_dim)

def forward(self, x):
    # 残差连接 + 层归一化
    x = x + self.self_attn(self.norm1(x))  # 自注意力
    x = x + self.ffn(self.norm2(x))  # 前馈网络
    return x



将多个transformer block做组合叠加,即完成了编码器部分的内容

编码器总体上分为四步:

  • 词嵌入层

    • 将离散的token索引转换为连续向量表示
  • 位置编码层

    • 为序列中的每个位置生成位置信息
    • 这里使用可学习的位置嵌入(与原始Transformer的固定编码不同)
  • Transformer层

    • 核心结构:多头自注意力 + 前馈网络
    • 残差连接和层归一化帮助梯度流动
    • 多个堆叠层(通常6-100层)实现深层特征提取
  • 线性输出层

    • 将隐藏层输出映射回词表空间
    • 通过softmax得到下一个token的概率分布

输出的结果是在字符表空间中每一个字符出现的概率

初始化代码如下:

class SimpleLLM(nn.Module):
def __init__(self, vocab_size, embed_dim, num_heads, num_layers, max_seq_len):
    """
    超参数说明:
    vocab_size: 词表大小
    embed_dim: 词向量维度(d_model)
    num_heads: 多头注意力的头数
    num_layers: Transformer层数
    max_seq_len: 最大序列长度
    """
    super().__init__()
    self.vocab_size = vocab_size
    self.embed_dim = embed_dim

    # 1. 词嵌入层
    self.token_embedding = nn.Embedding(vocab_size, embed_dim)
    # 2. 位置编码层
    self.position_embedding = nn.Embedding(max_seq_len, embed_dim)

    # 3. Transformer层(解码器结构)
    self.transformer_layers = nn.ModuleList([
        TransformerBlock(embed_dim, num_heads)
        for _ in range(num_layers)
    ])

    # 4. 最后的线性输出层
    self.lm_head = nn.Linear(embed_dim, vocab_size)

相应的前向传播函数如下:

    def forward(self, input_ids):
    """
    输入参数:
    input_ids: 输入token的索引序列 [batch_size, seq_len]
    输出:
    logits: 预测每个位置的下一个token概率 [batch_size, seq_len, vocab_size]
    """
    batch_size, seq_len = input_ids.shape

    # 生成位置编码的索引
    positions = torch.arange(0, seq_len).expand(batch_size, seq_len)

    # 词嵌入 + 位置编码
    token_embeds = self.token_embedding(input_ids)  # [B, S, D]
    pos_embeds = self.position_embedding(positions)  # [B, S, D]
    x = token_embeds + pos_embeds

    # 逐层通过Transformer块
    for layer in self.transformer_layers:
        x = layer(x)

    # 输出投影到词表空间
    logits = self.lm_head(x)  # [B, S, V]
    return logits

编码器的总体目标是输入一个词向量,来预测下一个的词的概率

4. 模型的训练

模型训练的思路是根据输入参数进行前向传播
前向传播结果与target比较计算lost
根据lost进行后向传播

# 训练函数
def train_model(model, dataloader, config):
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=config.lr)

for epoch in range(config.epochs):
    total_loss = 0
    for inputs, targets in dataloader:
        optimizer.zero_grad()

        # 前向传播
        logits = model(inputs)  # [B, S, V]

        # 计算损失(只计算最后一个位置的预测)
        loss = criterion(logits[:, -1, :], targets[:, -1])

        # 反向传播
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch + 1}/{config.epochs}, Loss: {total_loss / len(dataloader):.4f}")

5. 模型的推理

推理过程分为Prefill和decode阶段,

  • Prefill逐个对输入的词完成在词汇表中index查找,并将输入句子转换为连续向量,将连续向量输入到模型完成完整的前向传播,处理输入序列中素有token
  • Decode根据前向传播计算出来的下一个token预测,放入到输入序列中,然后再次进行推理,直到满足停止条件

下面的代码是对输入句子完成到词汇表的index转换

# 实现一个简化的Tokenizer(示例用)

class SimpleTokenizer:

def __init__(self, vocab):
    self.vocab = vocab  # 假设vocab是字典{word: id}
    self.inverse_vocab = {v: k for k, v in vocab.items()}

def encode(self, text):
    # 简单按空格分割(实际应使用BPE等算法)
    return [self.vocab.get(word, self.vocab["<unk>"]) for word in text.split()]

def decode(self, ids):
    return " ".join([self.inverse_vocab.get(i, "<unk>") for i in ids])

下面的代码中进行下一个词的预测,整体过程包含了Prefill和Decode的过程

def generate_text(model, input_text, tokenizer, max_length=20, temperature=1.0):
"""
文本生成函数
Args:
    model: 训练好的LLM模型
    input_text: 输入的起始文本(字符串)
    tokenizer: 文本到token的转换器(这里简化实现)
    max_length: 生成文本的最大长度
    temperature: 控制生成随机性的温度参数
Returns:
    生成的完整文本(字符串)
"""
model.eval()  # 切换到评估模式
generated_tokens = []

# 1. 将输入文本转换为token IDs
input_ids = tokenizer.encode(input_text)
if len(input_ids) == 0:
    input_ids = [tokenizer.vocab["<start>"]]  # 添加起始符

# 2. 自回归生成循环
for _ in range(max_length):
    # 创建输入张量(保持序列长度不超过max_seq_len)
    inputs = torch.tensor([input_ids[-model.max_seq_len:]], dtype=torch.long)

    # 3. 前向传播获取预测logits
    with torch.no_grad():
        logits = model(inputs)  # [1, seq_len, vocab_size]

    # 4. 获取最后一个位置的预测结果
    next_token_logits = logits[0, -1, :] / temperature

    # 5. 通过softmax转换为概率分布
    probs = F.softmax(next_token_logits, dim=-1)

    # 6. 从分布中采样下一个token(这里使用top-k采样)
    top_k = 5
    top_probs, top_indices = torch.topk(probs, top_k)
    next_token = top_indices[torch.multinomial(top_probs, 1)].item()

    # 7. 终止条件判断(假设0是结束符)
    if next_token == tokenizer.vocab["<end>"]:
        break

    # 8. 将新token添加到序列中
    input_ids.append(next_token)
    generated_tokens.append(next_token)

# 9. 将token IDs转换回文本
return tokenizer.decode(input_ids + generated_tokens)

decode.jpg

本文为作者phdgang发布,未经允许禁止转载!
下一篇
评论
暂无评论 >_<
加入评论