support tensor parallel

This commit is contained in:
cheunglei
2025-06-15 01:31:24 +08:00
parent b6136383c9
commit 53b3ef2e32
9 changed files with 102 additions and 31 deletions

View File

@@ -9,6 +9,7 @@ class Config:
max_num_seqs: int = 512
max_model_len: int = 4096
gpu_memory_utilization: float = 0.9
tensor_parallel_size: int = 1
enforce_eager: bool = False
hf_config: AutoConfig | None = None
eos: int = -1

View File

@@ -1,6 +1,8 @@
import atexit
from time import perf_counter
from tqdm.auto import tqdm
from transformers import AutoConfig, AutoTokenizer
import torch.multiprocessing as mp
from nanovllm.config import Config
from nanovllm.sampling_params import SamplingParams
@@ -19,10 +21,24 @@ class LLMEngine:
Sequence.block_size = config.kvcache_block_size
config.hf_config = AutoConfig.from_pretrained(config.model)
config.max_model_len = min(config.max_model_len, config.hf_config.max_position_embeddings)
self.ps = []
self.events = []
for i in range(1, config.tensor_parallel_size):
event = mp.Event()
process = mp.Process(target=ModelRunner, args=(config, i, event))
process.start()
self.ps.append(process)
self.events.append(event)
self.model_runner = ModelRunner(config, 0, self.events)
self.tokenizer = AutoTokenizer.from_pretrained(config.model, use_fast=True)
config.eos = self.tokenizer.eos_token_id
self.model_runner = ModelRunner(config)
self.scheduler = Scheduler(config)
atexit.register(self.exit)
def exit(self):
self.model_runner.call("exit")
for p in self.ps:
p.join()
def add_request(self, prompt: str | list[int], sampling_params: SamplingParams):
if isinstance(prompt, str):
@@ -32,7 +48,7 @@ class LLMEngine:
def step(self):
seqs, is_prefill = self.scheduler.schedule()
token_ids = self.model_runner.run(seqs, is_prefill)
token_ids = self.model_runner.call("run", seqs, is_prefill)
self.scheduler.postprocess(seqs, token_ids)
outputs = [(seq.seq_id, seq.completion_token_ids) for seq in seqs if seq.is_finished]
num_tokens = sum(len(seq) for seq in seqs) if is_prefill else -len(seqs)

View File

@@ -1,4 +1,8 @@
import pickle
import torch
import torch.distributed as dist
from multiprocess.synchronize import Event
from multiprocess.shared_memory import SharedMemory
from nanovllm.config import Config
from nanovllm.engine.sequence import Sequence
@@ -11,12 +15,17 @@ from nanovllm.utils.loader import load_model
class ModelRunner:
def __init__(self, config: Config):
def __init__(self, config: Config, rank: int, event: Event | list[Event]):
self.config = config
hf_config = config.hf_config
self.block_size = config.kvcache_block_size
self.enforce_eager = config.enforce_eager
self.world_size = config.tensor_parallel_size
self.rank = rank
self.event = event
dist.init_process_group("nccl", "tcp://localhost:2333", world_size=self.world_size, rank=rank)
torch.cuda.set_device(rank)
default_dtype = torch.get_default_dtype()
torch.set_default_dtype(hf_config.torch_dtype)
torch.set_default_device("cuda")
@@ -29,14 +38,66 @@ class ModelRunner:
torch.set_default_device("cpu")
torch.set_default_dtype(default_dtype)
if self.world_size > 1:
if rank == 0:
self.shm = SharedMemory(name="nanovllm", create=True, size=2**20)
dist.barrier()
else:
dist.barrier()
self.shm = SharedMemory(name="nanovllm")
self.loop()
def exit(self):
if self.world_size > 1:
self.shm.close()
if self.rank == 0:
self.shm.unlink()
# dist.destroy_process_group()
def loop(self):
while True:
method_name, args = self.read_shm()
method = getattr(self, method_name, None)
assert callable(method)
method(*args)
if method_name == "exit":
break
def read_shm(self):
assert self.world_size > 1 and self.rank
self.event.wait()
n = int.from_bytes(self.shm.buf[0:4], "little")
method_name, *args = pickle.loads(self.shm.buf[4:n+4])
self.event.clear()
return method_name, args
def write_shm(self, method_name, *args):
assert self.world_size > 1 and not self.rank
data = pickle.dumps([method_name, *args])
n = len(data)
assert n + 4 <= self.shm.size
self.shm.buf[0:4] = n.to_bytes(4, "little")
self.shm.buf[4:n+4] = data
for event in self.event:
event.set()
def call(self, method_name, *args):
assert self.rank == 0
if self.world_size > 1:
self.write_shm(method_name, *args)
method = getattr(self, method_name, None)
assert callable(method)
return method(*args)
def allocate_kv_cache(self, gpu_memory_utilization):
config = self.config
hf_config = config.hf_config
total, used, _ = get_gpu_memory()
free = total * gpu_memory_utilization - used
block_bytes = 2 * hf_config.num_hidden_layers * self.block_size * hf_config.num_key_value_heads * hf_config.head_dim * hf_config.torch_dtype.itemsize
num_kv_heads = hf_config.num_key_value_heads // dist.get_world_size()
block_bytes = 2 * hf_config.num_hidden_layers * self.block_size * num_kv_heads * hf_config.head_dim * hf_config.torch_dtype.itemsize
config.num_kvcache_blocks = int(free) // block_bytes
self.kv_cache = torch.zeros(2, hf_config.num_hidden_layers, config.num_kvcache_blocks, self.block_size, hf_config.num_key_value_heads, hf_config.head_dim)
self.kv_cache = torch.zeros(2, hf_config.num_hidden_layers, config.num_kvcache_blocks, self.block_size, num_kv_heads, hf_config.head_dim)
layer_id = 0
for module in self.model.modules():
if hasattr(module, "k_cache") and hasattr(module, "v_cache"):
@@ -148,7 +209,7 @@ class ModelRunner:
input_ids, positions = self.prepare_prefill(seqs) if is_prefill else self.prepare_decode(seqs)
temperatures = self.prepare_sample(seqs)
logits = self.run_model(input_ids, positions, is_prefill)
token_ids = self.sampler(logits, temperatures).tolist()
token_ids = self.sampler(logits, temperatures).tolist() if self.rank == 0 else None
reset_context()
return token_ids

View File

@@ -14,8 +14,6 @@ class Scheduler:
self.block_manager = BlockManager(config.num_kvcache_blocks, config.kvcache_block_size)
self.waiting: deque[Sequence] = deque()
self.running: deque[Sequence] = deque()
self.num_finished = 0
self.num_tokens = 0
def is_finished(self):
return not self.waiting and not self.running
@@ -67,11 +65,9 @@ class Scheduler:
self.waiting.appendleft(seq)
def postprocess(self, seqs: list[Sequence], token_ids: list[int]) -> list[bool]:
self.num_tokens += len(token_ids)
for seq, token_id in zip(seqs, token_ids):
seq.append_token(token_id)
if (not seq.ignore_eos and token_id == self.eos) or seq.num_completion_tokens == seq.max_tokens:
seq.status = SequenceStatus.FINISHED
self.block_manager.deallocate(seq)
self.running.remove(seq)
self.num_finished += 1

View File

@@ -75,7 +75,7 @@ class Sequence:
self.num_tokens += 1
def __getstate__(self):
state = super().__getstate__()
state = vars(self).copy()
if self.num_completion_tokens:
state.pop("token_ids")
return state

View File

@@ -14,8 +14,8 @@ class VocabParallelEmbedding(nn.Module):
embedding_dim: int,
):
super().__init__()
self.tp_rank = 0 # get_tensor_model_parallel_rank()
self.tp_size = 1 # get_tensor_model_parallel_world_size()
self.tp_rank = dist.get_rank()
self.tp_size = dist.get_world_size()
assert num_embeddings % self.tp_size == 0
self.num_embeddings = num_embeddings
self.num_embeddings_per_partition = self.num_embeddings // self.tp_size
@@ -39,7 +39,7 @@ class VocabParallelEmbedding(nn.Module):
x = mask * (x - self.vocab_start_idx)
y = F.embedding(x, self.weight)
if self.tp_size > 1:
y = mask * y
y = mask.unsqueeze(1) * y
dist.all_reduce(y)
return y
@@ -65,8 +65,8 @@ class ParallelLMHead(VocabParallelEmbedding):
last_indices = context.cu_seqlens_q[1:] - 1
x = x[last_indices].contiguous()
logits = F.linear(x, self.weight, self.bias)
# if self.tp_size > 1:
# all_logits = [torch.empty_like(logits) for _ in range(self.tp_size)]
# dist.gather(logits, all_logits, 0)
# logits = torch.cat(all_logits, -1)
return logits if self.tp_rank == 0 else None
if self.tp_size > 1:
all_logits = [torch.empty_like(logits) for _ in range(self.tp_size)] if self.tp_rank == 0 else None
dist.gather(logits, all_logits, 0)
logits = torch.cat(all_logits, -1) if self.tp_rank == 0 else None
return logits

View File

@@ -21,8 +21,8 @@ class LinearBase(nn.Module):
self.input_size = input_size
self.output_size = output_size
self.tp_dim = tp_dim
self.tp_rank = 0 # get_tensor_model_parallel_rank()
self.tp_size = 1 # get_tensor_model_parallel_world_size()
self.tp_rank = dist.get_rank()
self.tp_size = dist.get_world_size()
def forward(self, x: torch.Tensor) -> torch.Tensor:
raise NotImplementedError
@@ -65,7 +65,6 @@ class ColumnParallelLinear(LinearBase):
self.input_size_per_partition = input_size
self.output_size_per_partition = divide(output_size, self.tp_size)
self.output_partition_sizes = [self.output_size_per_partition]
# If QKV or MergedColumn, use output size of each partition.
if hasattr(self, "output_sizes"):
self.output_partition_sizes = [
divide(output_size, self.tp_size)
@@ -101,8 +100,6 @@ class MergedColumnParallelLinear(ColumnParallelLinear):
bias: bool = False,
):
self.output_sizes = output_sizes
tp_size = 1 # get_tensor_model_parallel_world_size()
assert all(output_size % tp_size == 0 for output_size in output_sizes)
super().__init__(input_size, sum(output_sizes), bias=bias)
def weight_loader(self, param: nn.Parameter, loaded_weight: torch.Tensor, loaded_shard_id: int):
@@ -110,7 +107,7 @@ class MergedColumnParallelLinear(ColumnParallelLinear):
shard_offset = sum(self.output_sizes[:loaded_shard_id]) // self.tp_size
shard_size = self.output_sizes[loaded_shard_id] // self.tp_size
param_data = param_data.narrow(self.tp_dim, shard_offset, shard_size)
# loaded_weight = loaded_weight.narrow(self.tp_dim, self.tp_rank * shard_size, shard_size)
loaded_weight = loaded_weight.chunk(self.tp_size, self.tp_dim)[self.tp_rank]
assert param_data.size() == loaded_weight.size()
param_data.copy_(loaded_weight)
@@ -131,8 +128,7 @@ class QKVParallelLinear(ColumnParallelLinear):
if total_num_kv_heads is None:
total_num_kv_heads = total_num_heads
self.total_num_kv_heads = total_num_kv_heads
# Divide the weight matrix along the last dimension.
tp_size = 1 # get_tensor_model_parallel_world_size()
tp_size = dist.get_world_size()
self.num_heads = divide(self.total_num_heads, tp_size)
self.num_kv_heads = divide(self.total_num_kv_heads, tp_size)
input_size = self.hidden_size
@@ -158,7 +154,7 @@ class QKVParallelLinear(ColumnParallelLinear):
shard_size = self.num_kv_heads * self.head_size
shard_offset = self.num_heads * self.head_size + self.num_kv_heads * self.head_size
param_data = param_data.narrow(self.tp_dim, shard_offset, shard_size)
# loaded_weight = loaded_weight.narrow(self.tp_dim, self.tp_rank * shard_size, shard_size)
loaded_weight = loaded_weight.chunk(self.tp_size, self.tp_dim)[self.tp_rank]
assert param_data.size() == loaded_weight.size()
param_data.copy_(loaded_weight)

View File

@@ -1,5 +1,6 @@
import torch
from torch import nn
import torch.distributed as dist
from transformers import Qwen3Config
from nanovllm.layers.activation import SiluAndMul
@@ -26,7 +27,7 @@ class Qwen3Attention(nn.Module):
) -> None:
super().__init__()
self.hidden_size = hidden_size
tp_size = 1 # get_tensor_model_parallel_world_size()
tp_size = dist.get_world_size()
self.total_num_heads = num_heads
assert self.total_num_heads % tp_size == 0
self.num_heads = self.total_num_heads // tp_size