LLM 推理性能调优:从显存瓶颈到吞吐优化,大模型服务的工程化加速
LLM 推理性能调优:从显存瓶颈到吞吐优化,大模型服务的工程化加速
一、LLM 推理的性能瓶颈:显存墙与计算墙的双重制约
大模型推理的性能受两个物理约束制约。显存墙:模型权重必须加载到 GPU 显存中才能推理,7B 模型需要约 14GB 显存,70B 模型需要约 140GB 显存,单张 A100(80GB)无法承载。计算墙:自回归生成每个 token 都需要读取全部模型权重,计算密度低,GPU 的计算单元利用率不足。
更具体地,推理过程分为两个阶段。预填充(Prefill)阶段:处理输入 prompt 的所有 token,计算 KV Cache,这一步是计算密集型。解码(Decode)阶段:逐个生成输出 token,每步读取 KV Cache 和模型权重,这一步是显存带宽密集型。两个阶段的瓶颈不同,优化策略也不同。
实际生产中,推理服务的性能指标是吞吐量(tokens/s)和首 token 延迟(TTFT)。优化目标是在满足延迟 SLA 的前提下最大化吞吐量。
二、推理性能优化的技术栈
LLM 推理性能优化需要在模型层、引擎层和系统层三个层面协同进行。
flowchart TD A[LLM 推理性能优化] --> B[模型层优化] A --> C[引擎层优化] A --> D[系统层优化] B --> B1[量化: INT8/INT4 降低显存占用] B --> B2[蒸馏: 小模型替代大模型] B --> B3[剪枝: 移除冗余参数] C --> C1[KV Cache 优化: PagedAttention] C --> C2[连续批处理: Continuous Batching] C --> C3[前缀缓存: 共享 Prompt 的 KV Cache] C --> C4[投机解码: 小模型预测+大模型验证] D --> D1[张量并行: 模型切分到多 GPU] D --> D2[流水线并行: 层级切分] D --> D3[显存卸载: GPU↔CPU 数据搬运] style B fill:#e8f5e9 style C fill:#e1f5fe style D fill:#fff3e02.1 KV Cache 与 PagedAttention
# paged_attention.py — PagedAttention 的 KV Cache 管理 # 设计意图:将 KV Cache 按固定大小的 Block 管理,类似操作系统的虚拟内存, # 解决传统 KV Cache 预分配导致的显存浪费和碎片问题 from dataclasses import dataclass, field from typing import Optional import math @dataclass class KVBlock: """KV Cache 的一个 Block,固定大小""" block_id: int block_size: int = 16 # 每个 Block 存储 16 个 token 的 KV ref_count: int = 0 # 引用计数,支持共享(前缀缓存) is_free: bool = True @dataclass class KVBlockTable: """单个序列的 KV Block 映射表""" sequence_id: int blocks: list[int] = field(default_factory=list) # Block ID 列表 num_tokens: int = 0 # 当前已使用的 token 数 class PagedAttentionManager: def __init__(self, num_blocks: int, block_size: int = 16): self.block_size = block_size self.blocks: dict[int, KVBlock] = {} self.block_tables: dict[int, KVBlockTable] = {} self.free_blocks: list[int] = [] # 初始化所有 Block for i in range(num_blocks): self.blocks[i] = KVBlock(block_id=i, block_size=block_size) self.free_blocks.append(i) def allocate(self, sequence_id: int, num_tokens: int) -> list[int]: """为序列分配 KV Cache Block""" num_blocks_needed = math.ceil(num_tokens / self.block_size) if len(self.free_blocks) < num_blocks_needed: # 显存不足,尝试驱逐低优先级序列 self._evict_sequences(num_blocks_needed - len(self.free_blocks)) allocated = [] for _ in range(num_blocks_needed): if not self.free_blocks: raise RuntimeError("KV Cache 显存不足,无法分配新 Block") block_id = self.free_blocks.pop() block = self.blocks[block_id] block.is_free = False block.ref_count += 1 allocated.append(block_id) self.block_tables[sequence_id] = KVBlockTable( sequence_id=sequence_id, blocks=allocated, num_tokens=num_tokens, ) return allocated def append_tokens(self, sequence_id: int, num_new_tokens: int): """为已有序列追加 token,可能需要分配新 Block""" table = self.block_tables.get(sequence_id) if not table: return table.num_tokens += num_new_tokens needed_blocks = math.ceil(table.num_tokens / self.block_size) current_blocks = len(table.blocks) if needed_blocks > current_blocks: extra_needed = needed_blocks - current_blocks for _ in range(extra_needed): if not self.free_blocks: self._evict_sequences(1) block_id = self.free_blocks.pop() self.blocks[block_id].is_free = False self.blocks[block_id].ref_count += 1 table.blocks.append(block_id) def free(self, sequence_id: int): """释放序列的 KV Cache""" table = self.block_tables.pop(sequence_id, None) if not table: return for block_id in table.blocks: block = self.blocks[block_id] block.ref_count -= 1 if block.ref_count <= 0: block.is_free = True self.free_blocks.append(block_id) def _evict_sequences(self, num_blocks_needed: int): """驱逐低优先级序列释放 Block""" # 简化策略:按序列已生成 token 数排序,优先驱逐最长的 sorted_tables = sorted( self.block_tables.values(), key=lambda t: t.num_tokens, reverse=True, ) freed = 0 for table in sorted_tables: if freed >= num_blocks_needed: break freed += len(table.blocks) self.free(table.sequence_id)2.2 连续批处理
# continuous_batching.py — 连续批处理调度器 # 设计意图:不同于静态批处理等待所有序列完成才释放资源, # 连续批处理在序列完成后立即插入新请求,显著提升 GPU 利用率 import time from dataclasses import dataclass from typing import Optional @dataclass class InferenceRequest: request_id: str prompt_tokens: list[int] max_output_tokens: int generated_tokens: list[int] = None is_completed: bool = False arrival_time: float = 0.0 def __post_init__(self): if self.generated_tokens is None: self.generated_tokens = [] if self.arrival_time == 0.0: self.arrival_time = time.time() class ContinuousBatchScheduler: def __init__(self, max_batch_size: int = 32): self.max_batch_size = max_batch_size self.waiting_queue: list[InferenceRequest] = [] self.running_batch: list[InferenceRequest] = [] def add_request(self, request: InferenceRequest): """添加推理请求到等待队列""" self.waiting_queue.append(request) def schedule(self) -> list[InferenceRequest]: """调度下一批推理请求""" # 移除已完成的请求,释放批次槽位 self.running_batch = [ req for req in self.running_batch if not req.is_completed ] # 计算可用槽位 available_slots = self.max_batch_size - len(self.running_batch) # 从等待队列中取请求填充槽位 new_requests = [] while available_slots > 0 and self.waiting_queue: request = self.waiting_queue.pop(0) new_requests.append(request) available_slots -= 1 self.running_batch.extend(new_requests) return self.running_batch def mark_completed(self, request_id: str): """标记请求完成""" for req in self.running_batch: if req.request_id == request_id: req.is_completed = True break def get_stats(self) -> dict: return { "waiting": len(self.waiting_queue), "running": len(self.running_batch), "completed_in_batch": sum(1 for r in self.running_batch if r.is_completed), }三、量化与投机解码
3.1 量化策略选择
# quantization_config.py — 量化策略配置 # 设计意图:根据延迟要求和精度容忍度选择量化方案, # INT4 最大化吞吐但精度损失较大,INT8 是平衡选择 from dataclasses import dataclass from enum import Enum class QuantizationMethod(Enum): FP16 = "fp16" # 无量化,基线 INT8_WEIGHT = "int8_w" # 仅权重量化为 INT8 INT8_FULL = "int8_full" # 权重和激活都 INT8 INT4_GPTQ = "int4_gptq" # GPTQ 4-bit 量化 INT4_AWQ = "int4_awq" # AWQ 4-bit 量化 @dataclass class QuantizationConfig: method: QuantizationMethod group_size: int = 128 # 量化分组大小 desc_act: bool = False # GPTQ 的激活排序 vmapped_only: bool = False # 仅量化 V 投影 @staticmethod def recommend(model_size_b: float, latency_sla_ms: float) -> 'QuantizationConfig': """根据模型大小和延迟 SLA 推荐量化方案""" if model_size_b <= 7: # 小模型:INT8 足够,精度损失小 return QuantizationConfig(method=QuantizationMethod.INT8_WEIGHT) elif model_size_b <= 30: # 中等模型:INT8 或 INT4-AWQ if latency_sla_ms < 200: return QuantizationConfig(method=QuantizationMethod.INT4_AWQ) return QuantizationConfig(method=QuantizationMethod.INT8_WEIGHT) else: # 大模型:必须 INT4 才能在有限 GPU 上运行 return QuantizationConfig( method=QuantizationMethod.INT4_AWQ, group_size=128, )3.2 投机解码
# speculative_decoding.py — 投机解码实现 # 设计意图:用小模型快速生成候选 token,大模型并行验证, # 接受正确的 token,拒绝错误的 token,加速生成过程 from typing import Optional class SpeculativeDecoder: def __init__(self, draft_model, target_model, max_spec_tokens: int = 5): self.draft_model = draft_model # 小模型(草稿模型) self.target_model = target_model # 大模型(目标模型) self.max_spec_tokens = max_spec_tokens def generate(self, prompt_tokens: list[int], max_tokens: int) -> list[int]: generated = [] while len(generated) < max_tokens: # 步骤 1:草稿模型快速生成 K 个候选 token draft_tokens = self.draft_model.generate( prompt_tokens + generated, max_tokens=self.max_spec_tokens, ) # 步骤 2:目标模型并行验证 K 个 token # 一次前向传播同时计算 K+1 个位置的概率 target_probs = self.target_model.forward( prompt_tokens + generated + draft_tokens, ) # 步骤 3:逐个验证候选 token accepted = 0 for i, draft_token in enumerate(draft_tokens): target_prob = target_probs[len(generated) + i] draft_prob = self.draft_model.get_prob( prompt_tokens + generated + draft_tokens[:i], draft_token, ) # 接受条件:目标模型的概率 >= 草稿模型的概率 # 或按概率比例随机接受 acceptance_ratio = target_prob / max(draft_prob, 1e-10) if acceptance_ratio >= 1.0: # 确定接受 generated.append(draft_token) accepted += 1 else: # 按概率接受 import random if random.random() < acceptance_ratio: generated.append(draft_token) accepted += 1 else: # 拒绝:从目标模型的分布中采样一个 token corrected_token = self._sample_from_target(target_prob) generated.append(corrected_token) break # 如果所有候选都被接受,额外生成一个 token if accepted == len(draft_tokens): bonus_token = self._sample_from_target( target_probs[len(generated)] ) generated.append(bonus_token) return generated[:max_tokens] def _sample_from_target(self, probs) -> int: """从目标模型的概率分布中采样""" # 简化实现 return 0四、边界分析与架构权衡
量化精度损失:INT4 量化可能导致模型输出质量下降,尤其在数学推理和代码生成等精确性要求高的场景。AWQ 通过保护重要权重减少精度损失,但仍需在目标数据集上评测。建议对核心业务场景进行量化前后的对比评测。
PagedAttention 的实现复杂度:PagedAttention 需要修改注意力计算内核,使用 Block 索引替代连续内存访问。这需要编写自定义 CUDA 内核,开发和维护成本高。生产环境建议直接使用 vLLM 等已实现 PagedAttention 的推理引擎。
投机解码的加速比:投机解码的加速比取决于草稿模型与目标模型的一致性。如果草稿模型的候选 token 经常被拒绝,投机解码反而会增加延迟(因为验证步骤需要额外计算)。草稿模型的选择需要在速度和一致性之间权衡。
张量并行的通信开销:多 GPU 张量并行需要在每层计算后进行 AllReduce 同步,通信延迟随 GPU 数量增加而增加。超过 8 卡时,通信开销可能成为瓶颈。需要使用 NVLink 等高带宽互联技术降低通信延迟。
五、总结
LLM 推理性能优化需要在模型层、引擎层和系统层协同进行。模型量化降低显存占用和带宽需求,PagedAttention 消除 KV Cache 碎片,连续批处理提升 GPU 利用率,投机解码加速自回归生成。落地建议:优先使用 vLLM 等成熟推理引擎,已集成 PagedAttention 和连续批处理;7B 以下模型使用 INT8 量化,30B 以上模型使用 INT4-AWQ 量化;投机解码适用于草稿模型与目标模型一致性高的场景;多 GPU 部署优先使用张量并行,配合 NVLink 降低通信开销。
