LLM工作流程
High Contrast
Dark Mode
Light Mode
Sepia
Forest
2 min read310 words

LLM工作流程

理解LLM如何从输入到输出,是掌握其实际应用的关键。

LLM工作流程概览

graph TD A[用户输入] --> B[Token化] B --> C[嵌入层] C --> D[位置编码] D --> E[Transformer层 x N] E --> F[输出Logits] F --> G[Softmax] G --> H[采样/解码] H --> I[输出Token] I --> J[去Token化] J --> K[最终文本] style A fill:#e1f5ff style K fill:#c8e6c9

详细步骤解析

步骤1: Token化(Tokenization)

将文本转换为Token ID序列。

import tiktoken
# 示例
text = "Hello, how are you?"
# 使用GPT-4的tokenizer
enc = tiktoken.encoding_for_model("gpt-4")
# Tokenize
token_ids = enc.encode(text)
tokens = [enc.decode_single_token_bytes(t).decode('utf-8', errors='ignore')
for t in token_ids]
print(f"原文: {text}")
print(f"Token IDs: {token_ids}")
print(f"Tokens: {tokens}")
# 输出示例:
# 原文: Hello, how are you?
# Token IDs: [15496, 11, 1263, 389, 345, 30]
# Tokens: ['Hello', ',', ' how', ' are', ' you', '?']

步骤2: 嵌入(Embedding)

将Token ID转换为高维向量。

import torch
import torch.nn as nn
vocab_size = 50000
d_model = 768  # GPT-2-small
embedding = nn.Embedding(vocab_size, d_model)
# Token IDs: [batch_size, seq_len]
token_ids = torch.tensor([[15496, 11, 1263, 389, 345, 30]])
# 嵌入: [batch_size, seq_len, d_model]
embeddings = embedding(token_ids)
print(f"Token IDs形状: {token_ids.shape}")  # [1, 6]
print(f"嵌入形状: {embeddings.shape}")      # [1, 6, 768]

步骤3: 位置编码(Positional Encoding)

添加位置信息。

import numpy as np
def get_positional_encoding(seq_len, d_model):
"""生成位置编码"""
position = np.arange(seq_len)[:, np.newaxis]
div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
pe = np.zeros((seq_len, d_model))
pe[:, 0::2] = np.sin(position * div_term)
pe[:, 1::2] = np.cos(position * div_term)
return torch.FloatTensor(pe)
# 添加位置编码
seq_len = token_ids.size(1)
pos_encoding = get_positional_encoding(seq_len, d_model)
# 广播添加
embeddings_with_pos = embeddings + pos_encoding.unsqueeze(0)
print(f"位置编码形状: {pos_encoding.shape}")  # [6, 768]

步骤4: Transformer层(Stack of Transformers)

多层Transformer处理。

import torch
import torch.nn as nn
class TransformerLayer(nn.Module):
"""单层Transformer"""
def __init__(self, d_model, num_heads, d_ff):
super().__init__()
self.attention = nn.MultiheadAttention(d_model, num_heads)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.ffn = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Linear(d_ff, d_model)
)
self.dropout = nn.Dropout(0.1)
def forward(self, x, mask=None):
# 自注意力
attn_out, _ = self.attention(x, x, x, attn_mask=mask)
x = self.norm1(x + self.dropout(attn_out))
# 前馈网络
ffn_out = self.ffn(x)
x = self.norm2(x + self.dropout(ffn_out))
return x
class TransformerStack(nn.Module):
"""多层Transformer堆叠"""
def __init__(self, d_model, num_heads, d_ff, num_layers):
super().__init__()
self.layers = nn.ModuleList([
TransformerLayer(d_model, num_heads, d_ff)
for _ in range(num_layers)
])
def forward(self, x, mask=None):
for layer in self.layers:
x = layer(x, mask)
return x
# 示例
num_layers = 12  # GPT-2-small
num_heads = 12
d_ff = d_model * 4
stack = TransformerStack(d_model, num_heads, d_ff, num_layers)
# 输入: [batch_size, seq_len, d_model]
transformer_output = stack(embeddings_with_pos)
print(f"Transformer输出形状: {transformer_output.shape}")
# [1, 6, 768]

步骤5: 输出投影(Output Projection)

将隐藏状态映射到词汇表。

# 输出层: 将隐藏状态映射到词汇表大小
output_layer = nn.Linear(d_model, vocab_size)
# Logits: [batch_size, seq_len, vocab_size]
logits = output_layer(transformer_output)
print(f"Logits形状: {logits.shape}")
# [1, 6, 50000] - 每个位置对每个token的预测分数

步骤6: Softmax与采样

将Logits转换为概率分布。

def sample_next_token(logits, temperature=1.0, top_k=None, top_p=0.9):
"""
采样下一个token
Args:
logits: [vocab_size]
temperature: 控制随机性
top_k: 只保留top-k个token
top_p: 核采样
Returns:
采样的token ID
"""
# 1. 应用温度
logits = logits / temperature
# 2. Top-K过滤
if top_k is not None:
top_k_logits, top_k_indices = torch.topk(logits, top_k)
logits = torch.full_like(logits, -float('inf'))
logits.scatter_(0, top_k_indices, top_k_logits)
# 3. 核采样(Nucleus Sampling)
if top_p < 1.0:
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1)
# 移除累积概率超过top_p的tokens
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
indices_to_remove = sorted_indices[sorted_indices_to_remove]
logits[indices_to_remove] = -float('inf')
# 4. Softmax
probs = torch.softmax(logits, dim=-1)
# 5. 采样
next_token = torch.multinomial(probs, num_samples=1)
return next_token.item()
# 示例:采样最后一个位置的token
last_logits = logits[0, -1, :]  # [vocab_size]
next_token_id = sample_next_token(last_logits, temperature=0.8)
print(f"采样的Token ID: {next_token_id}")
print(f"采样的Token: {enc.decode([next_token_id])}")

完整生成流程

def generate_text(model, tokenizer, prompt, max_tokens=100,
temperature=0.7, top_k=50, top_p=0.9):
"""
生成文本
Args:
model: LLM模型
tokenizer: Tokenizer
prompt: 提示文本
max_tokens: 最大生成token数
temperature: 温度参数
top_k: Top-K采样
top_p: 核采样参数
Returns:
生成的完整文本
"""
# 1. Tokenize提示
input_ids = tokenizer.encode(prompt)
generated = input_ids.copy()
# 2. 生成循环
for _ in range(max_tokens):
# 2.1 前向传播
input_tensor = torch.tensor([generated])
logits = model(input_tensor)
# 2.2 获取最后一个位置的logits
last_logits = logits[0, -1, :]
# 2.3 采样
next_token = sample_next_token(
last_logits,
temperature=temperature,
top_k=top_k,
top_p=top_p
)
# 2.4 添加到生成序列
generated.append(next_token)
# 2.5 检查结束标记
if next_token == tokenizer.eot_token:
break
# 3. Decode
generated_text = tokenizer.decode(generated)
return generated_text
# 使用示例
prompt = "Once upon a time"
generated = generate_text(model, tokenizer, prompt, max_tokens=50)
print(f"提示: {prompt}")
print(f"生成: {generated[len(prompt):]}")

LLM完整架构图

graph TB subgraph "输入处理" A[文本输入] --> B[Token化器] B --> C[Token ID序列] end subgraph "嵌入层" C --> D[Token嵌入] C --> E[位置编码] D --> F[相加] E --> F end subgraph "Transformer堆叠" F --> G[层1] G --> H[层2] H --> I[层N] end subgraph "输出处理" I --> J[线性投影] J --> K[Softmax] K --> L[采样/解码] L --> M[输出Token] M --> N[去Token化] N --> O[最终文本] end style A fill:#e1f5ff style O fill:#c8e6c9

推理优化技术

1. KV Cache

缓存Key和Value,避免重复计算。

# 标准推理(每步重新计算)
for step in range(max_tokens):
# 重新计算整个序列
logits = model(input_ids[:, :step+1])
# ...
# KV Cache推理(只计算新token)
for step in range(max_tokens):
# 只计算最后一个token
logits = model(input_ids[:, step:step+1], past_kv=past_kv)
# 缓存KV
past_kv = model.cache_kv()

2. 批处理推理

同时处理多个请求。

# 批处理输入
prompts = ["Hello", "Good morning", "How are you"]
input_ids_batch = [tokenizer.encode(p) for p in prompts]
# 批处理推理
logits_batch = model(input_ids_batch)  # 自动填充

实际调用示例

使用LangChain

from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage
# 初始化模型
llm = ChatOpenAI(
model="gpt-4",
temperature=0.7,
max_tokens=500
)
# 生成文本
messages = [HumanMessage(content="写一个简短的故事")]
response = llm.invoke(messages)
print(response.content)

使用Ollama本地模型

from langchain_community.llms import Ollama
# 本地模型
llm = Ollama(model="mistral")
# 生成
response = llm.invoke("用Python写一个快速排序")
print(response)

学习要点

✅ LLM通过Token→嵌入→Transformer→Logits→概率→采样生成文本 ✅ 每个步骤都有明确的数学计算 ✅ 采样策略控制生成的多样性和质量 ✅ KV Cache等优化技术加速推理 ✅ 可以通过温度、Top-K、Top-P控制生成风格


下一步: 学习 提示词工程 开始实际应用 💬