diff --git a/nanovllm/config.py b/nanovllm/config.py index ce9446b..5d9dcc0 100644 --- a/nanovllm/config.py +++ b/nanovllm/config.py @@ -9,6 +9,7 @@ class Config: max_num_seqs: int = 512 max_model_len: int = 4096 gpu_memory_utilization: float = 0.9 + tensor_parallel_size: int = 1 enforce_eager: bool = False hf_config: AutoConfig | None = None eos: int = -1 @@ -17,4 +18,4 @@ class Config: def __post_init__(self): assert self.model - assert self.kvcache_block_size % 256 == 0 \ No newline at end of file + assert self.kvcache_block_size % 256 == 0 diff --git a/nanovllm/engine/llm_engine.py b/nanovllm/engine/llm_engine.py index 07a53c2..c29ef65 100644 --- a/nanovllm/engine/llm_engine.py +++ b/nanovllm/engine/llm_engine.py @@ -1,6 +1,8 @@ +import atexit from time import perf_counter from tqdm.auto import tqdm from transformers import AutoConfig, AutoTokenizer +import torch.multiprocessing as mp from nanovllm.config import Config from nanovllm.sampling_params import SamplingParams @@ -19,10 +21,24 @@ class LLMEngine: Sequence.block_size = config.kvcache_block_size config.hf_config = AutoConfig.from_pretrained(config.model) config.max_model_len = min(config.max_model_len, config.hf_config.max_position_embeddings) + self.ps = [] + self.events = [] + for i in range(1, config.tensor_parallel_size): + event = mp.Event() + process = mp.Process(target=ModelRunner, args=(config, i, event)) + process.start() + self.ps.append(process) + self.events.append(event) + self.model_runner = ModelRunner(config, 0, self.events) self.tokenizer = AutoTokenizer.from_pretrained(config.model, use_fast=True) config.eos = self.tokenizer.eos_token_id - self.model_runner = ModelRunner(config) self.scheduler = Scheduler(config) + atexit.register(self.exit) + + def exit(self): + self.model_runner.call("exit") + for p in self.ps: + p.join() def add_request(self, prompt: str | list[int], sampling_params: SamplingParams): if isinstance(prompt, str): @@ -32,7 +48,7 @@ class LLMEngine: def step(self): seqs, is_prefill = self.scheduler.schedule() - token_ids = self.model_runner.run(seqs, is_prefill) + token_ids = self.model_runner.call("run", seqs, is_prefill) self.scheduler.postprocess(seqs, token_ids) outputs = [(seq.seq_id, seq.completion_token_ids) for seq in seqs if seq.is_finished] num_tokens = sum(len(seq) for seq in seqs) if is_prefill else -len(seqs) diff --git a/nanovllm/engine/model_runner.py b/nanovllm/engine/model_runner.py index 0978e32..33f429d 100644 --- a/nanovllm/engine/model_runner.py +++ b/nanovllm/engine/model_runner.py @@ -1,4 +1,8 @@ +import pickle import torch +import torch.distributed as dist +from multiprocess.synchronize import Event +from multiprocess.shared_memory import SharedMemory from nanovllm.config import Config from nanovllm.engine.sequence import Sequence @@ -11,12 +15,17 @@ from nanovllm.utils.loader import load_model class ModelRunner: - def __init__(self, config: Config): + def __init__(self, config: Config, rank: int, event: Event | list[Event]): self.config = config hf_config = config.hf_config self.block_size = config.kvcache_block_size self.enforce_eager = config.enforce_eager + self.world_size = config.tensor_parallel_size + self.rank = rank + self.event = event + dist.init_process_group("nccl", "tcp://localhost:2333", world_size=self.world_size, rank=rank) + torch.cuda.set_device(rank) default_dtype = torch.get_default_dtype() torch.set_default_dtype(hf_config.torch_dtype) torch.set_default_device("cuda") @@ -29,14 +38,66 @@ class ModelRunner: torch.set_default_device("cpu") torch.set_default_dtype(default_dtype) + if self.world_size > 1: + if rank == 0: + self.shm = SharedMemory(name="nanovllm", create=True, size=2**20) + dist.barrier() + else: + dist.barrier() + self.shm = SharedMemory(name="nanovllm") + self.loop() + + def exit(self): + if self.world_size > 1: + self.shm.close() + if self.rank == 0: + self.shm.unlink() + # dist.destroy_process_group() + + def loop(self): + while True: + method_name, args = self.read_shm() + method = getattr(self, method_name, None) + assert callable(method) + method(*args) + if method_name == "exit": + break + + def read_shm(self): + assert self.world_size > 1 and self.rank + self.event.wait() + n = int.from_bytes(self.shm.buf[0:4], "little") + method_name, *args = pickle.loads(self.shm.buf[4:n+4]) + self.event.clear() + return method_name, args + + def write_shm(self, method_name, *args): + assert self.world_size > 1 and not self.rank + data = pickle.dumps([method_name, *args]) + n = len(data) + assert n + 4 <= self.shm.size + self.shm.buf[0:4] = n.to_bytes(4, "little") + self.shm.buf[4:n+4] = data + for event in self.event: + event.set() + + def call(self, method_name, *args): + assert self.rank == 0 + if self.world_size > 1: + self.write_shm(method_name, *args) + method = getattr(self, method_name, None) + assert callable(method) + return method(*args) + def allocate_kv_cache(self, gpu_memory_utilization): config = self.config hf_config = config.hf_config total, used, _ = get_gpu_memory() free = total * gpu_memory_utilization - used - block_bytes = 2 * hf_config.num_hidden_layers * self.block_size * hf_config.num_key_value_heads * hf_config.head_dim * hf_config.torch_dtype.itemsize + num_kv_heads = hf_config.num_key_value_heads // dist.get_world_size() + block_bytes = 2 * hf_config.num_hidden_layers * self.block_size * num_kv_heads * hf_config.head_dim * hf_config.torch_dtype.itemsize config.num_kvcache_blocks = int(free) // block_bytes - self.kv_cache = torch.zeros(2, hf_config.num_hidden_layers, config.num_kvcache_blocks, self.block_size, hf_config.num_key_value_heads, hf_config.head_dim) + self.kv_cache = torch.zeros(2, hf_config.num_hidden_layers, config.num_kvcache_blocks, self.block_size, num_kv_heads, hf_config.head_dim) layer_id = 0 for module in self.model.modules(): if hasattr(module, "k_cache") and hasattr(module, "v_cache"): @@ -148,7 +209,7 @@ class ModelRunner: input_ids, positions = self.prepare_prefill(seqs) if is_prefill else self.prepare_decode(seqs) temperatures = self.prepare_sample(seqs) logits = self.run_model(input_ids, positions, is_prefill) - token_ids = self.sampler(logits, temperatures).tolist() + token_ids = self.sampler(logits, temperatures).tolist() if self.rank == 0 else None reset_context() return token_ids diff --git a/nanovllm/engine/scheduler.py b/nanovllm/engine/scheduler.py index 6d1eb53..c5735fe 100644 --- a/nanovllm/engine/scheduler.py +++ b/nanovllm/engine/scheduler.py @@ -14,8 +14,6 @@ class Scheduler: self.block_manager = BlockManager(config.num_kvcache_blocks, config.kvcache_block_size) self.waiting: deque[Sequence] = deque() self.running: deque[Sequence] = deque() - self.num_finished = 0 - self.num_tokens = 0 def is_finished(self): return not self.waiting and not self.running @@ -67,11 +65,9 @@ class Scheduler: self.waiting.appendleft(seq) def postprocess(self, seqs: list[Sequence], token_ids: list[int]) -> list[bool]: - self.num_tokens += len(token_ids) for seq, token_id in zip(seqs, token_ids): seq.append_token(token_id) if (not seq.ignore_eos and token_id == self.eos) or seq.num_completion_tokens == seq.max_tokens: seq.status = SequenceStatus.FINISHED self.block_manager.deallocate(seq) self.running.remove(seq) - self.num_finished += 1 diff --git a/nanovllm/engine/sequence.py b/nanovllm/engine/sequence.py index 2216a86..00866b4 100644 --- a/nanovllm/engine/sequence.py +++ b/nanovllm/engine/sequence.py @@ -75,7 +75,7 @@ class Sequence: self.num_tokens += 1 def __getstate__(self): - state = super().__getstate__() + state = vars(self).copy() if self.num_completion_tokens: state.pop("token_ids") return state diff --git a/nanovllm/layers/embed_head.py b/nanovllm/layers/embed_head.py index 72b6e01..1ab7043 100644 --- a/nanovllm/layers/embed_head.py +++ b/nanovllm/layers/embed_head.py @@ -14,8 +14,8 @@ class VocabParallelEmbedding(nn.Module): embedding_dim: int, ): super().__init__() - self.tp_rank = 0 # get_tensor_model_parallel_rank() - self.tp_size = 1 # get_tensor_model_parallel_world_size() + self.tp_rank = dist.get_rank() + self.tp_size = dist.get_world_size() assert num_embeddings % self.tp_size == 0 self.num_embeddings = num_embeddings self.num_embeddings_per_partition = self.num_embeddings // self.tp_size @@ -39,7 +39,7 @@ class VocabParallelEmbedding(nn.Module): x = mask * (x - self.vocab_start_idx) y = F.embedding(x, self.weight) if self.tp_size > 1: - y = mask * y + y = mask.unsqueeze(1) * y dist.all_reduce(y) return y @@ -65,8 +65,8 @@ class ParallelLMHead(VocabParallelEmbedding): last_indices = context.cu_seqlens_q[1:] - 1 x = x[last_indices].contiguous() logits = F.linear(x, self.weight, self.bias) - # if self.tp_size > 1: - # all_logits = [torch.empty_like(logits) for _ in range(self.tp_size)] - # dist.gather(logits, all_logits, 0) - # logits = torch.cat(all_logits, -1) - return logits if self.tp_rank == 0 else None \ No newline at end of file + if self.tp_size > 1: + all_logits = [torch.empty_like(logits) for _ in range(self.tp_size)] if self.tp_rank == 0 else None + dist.gather(logits, all_logits, 0) + logits = torch.cat(all_logits, -1) if self.tp_rank == 0 else None + return logits \ No newline at end of file diff --git a/nanovllm/layers/linear.py b/nanovllm/layers/linear.py index d5133f1..0ae6eed 100755 --- a/nanovllm/layers/linear.py +++ b/nanovllm/layers/linear.py @@ -21,8 +21,8 @@ class LinearBase(nn.Module): self.input_size = input_size self.output_size = output_size self.tp_dim = tp_dim - self.tp_rank = 0 # get_tensor_model_parallel_rank() - self.tp_size = 1 # get_tensor_model_parallel_world_size() + self.tp_rank = dist.get_rank() + self.tp_size = dist.get_world_size() def forward(self, x: torch.Tensor) -> torch.Tensor: raise NotImplementedError @@ -65,7 +65,6 @@ class ColumnParallelLinear(LinearBase): self.input_size_per_partition = input_size self.output_size_per_partition = divide(output_size, self.tp_size) self.output_partition_sizes = [self.output_size_per_partition] - # If QKV or MergedColumn, use output size of each partition. if hasattr(self, "output_sizes"): self.output_partition_sizes = [ divide(output_size, self.tp_size) @@ -101,8 +100,6 @@ class MergedColumnParallelLinear(ColumnParallelLinear): bias: bool = False, ): self.output_sizes = output_sizes - tp_size = 1 # get_tensor_model_parallel_world_size() - assert all(output_size % tp_size == 0 for output_size in output_sizes) super().__init__(input_size, sum(output_sizes), bias=bias) def weight_loader(self, param: nn.Parameter, loaded_weight: torch.Tensor, loaded_shard_id: int): @@ -110,7 +107,7 @@ class MergedColumnParallelLinear(ColumnParallelLinear): shard_offset = sum(self.output_sizes[:loaded_shard_id]) // self.tp_size shard_size = self.output_sizes[loaded_shard_id] // self.tp_size param_data = param_data.narrow(self.tp_dim, shard_offset, shard_size) - # loaded_weight = loaded_weight.narrow(self.tp_dim, self.tp_rank * shard_size, shard_size) + loaded_weight = loaded_weight.chunk(self.tp_size, self.tp_dim)[self.tp_rank] assert param_data.size() == loaded_weight.size() param_data.copy_(loaded_weight) @@ -131,8 +128,7 @@ class QKVParallelLinear(ColumnParallelLinear): if total_num_kv_heads is None: total_num_kv_heads = total_num_heads self.total_num_kv_heads = total_num_kv_heads - # Divide the weight matrix along the last dimension. - tp_size = 1 # get_tensor_model_parallel_world_size() + tp_size = dist.get_world_size() self.num_heads = divide(self.total_num_heads, tp_size) self.num_kv_heads = divide(self.total_num_kv_heads, tp_size) input_size = self.hidden_size @@ -158,7 +154,7 @@ class QKVParallelLinear(ColumnParallelLinear): shard_size = self.num_kv_heads * self.head_size shard_offset = self.num_heads * self.head_size + self.num_kv_heads * self.head_size param_data = param_data.narrow(self.tp_dim, shard_offset, shard_size) - # loaded_weight = loaded_weight.narrow(self.tp_dim, self.tp_rank * shard_size, shard_size) + loaded_weight = loaded_weight.chunk(self.tp_size, self.tp_dim)[self.tp_rank] assert param_data.size() == loaded_weight.size() param_data.copy_(loaded_weight) diff --git a/nanovllm/llm.py b/nanovllm/llm.py index 1c3efe9..4f51a44 100644 --- a/nanovllm/llm.py +++ b/nanovllm/llm.py @@ -2,4 +2,4 @@ from nanovllm.engine.llm_engine import LLMEngine class LLM(LLMEngine): - pass \ No newline at end of file + pass diff --git a/nanovllm/models/qwen3.py b/nanovllm/models/qwen3.py index f65475f..c306be5 100755 --- a/nanovllm/models/qwen3.py +++ b/nanovllm/models/qwen3.py @@ -1,5 +1,6 @@ import torch from torch import nn +import torch.distributed as dist from transformers import Qwen3Config from nanovllm.layers.activation import SiluAndMul @@ -26,7 +27,7 @@ class Qwen3Attention(nn.Module): ) -> None: super().__init__() self.hidden_size = hidden_size - tp_size = 1 # get_tensor_model_parallel_world_size() + tp_size = dist.get_world_size() self.total_num_heads = num_heads assert self.total_num_heads % tp_size == 0 self.num_heads = self.total_num_heads // tp_size