Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Torchrun api server #71

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions vllm/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ def from_engine_args(
from vllm.executor.multiproc_gpu_executor import (
MultiprocessingGPUExecutorAsync)
executor_class = MultiprocessingGPUExecutorAsync
elif distributed_executor_backend == "torchrun":
from vllm.executor.torchrun_gpu_executor import (
TorchrunGPUExecutorAsync)
executor_class = TorchrunGPUExecutorAsync
else:
from vllm.executor.gpu_executor import GPUExecutorAsync
executor_class = GPUExecutorAsync
Expand Down
44 changes: 42 additions & 2 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import re
from contextlib import asynccontextmanager
from http import HTTPStatus
from typing import Optional, Set
import threading
from typing import List, Optional, Set

import fastapi
import requests
import uvicorn
from fastapi import Request
from fastapi.exceptions import RequestValidationError
Expand Down Expand Up @@ -39,6 +41,7 @@
logger = init_logger(__name__)

_running_tasks: Set[asyncio.Task] = set()
_aux_ports: List[int] = []


@asynccontextmanager
Expand Down Expand Up @@ -78,6 +81,23 @@ async def validation_exception_handler(_, exc):
return JSONResponse(err.model_dump(), status_code=HTTPStatus.BAD_REQUEST)


def send_aux_request(request, uri, port):
try:
res = requests.post(f"http://localhost:{port}/{uri}", json=request)
if res.status_code != 200:
print(f"Failed to reroute request to {port}")
exit(1)
except Exception as e:
print(f"Failed to reroute request to {port}: {e}")


def reroute(request, uri):
global _aux_ports
for port in _aux_ports:
threading.Thread(target=send_aux_request,
args=(request, uri, port)).start()


@app.get("/health")
async def health() -> Response:
"""Health check."""
Expand All @@ -100,6 +120,9 @@ async def show_version():
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest,
raw_request: Request):
if envs.LOCAL_RANK == 0:
reroute(await raw_request.json(), raw_request.url.path[1:])

generator = await openai_serving_chat.create_chat_completion(
request, raw_request)
if isinstance(generator, ErrorResponse):
Expand All @@ -115,6 +138,9 @@ async def create_chat_completion(request: ChatCompletionRequest,

@app.post("/v1/completions")
async def create_completion(request: CompletionRequest, raw_request: Request):
if envs.LOCAL_RANK == 0:
reroute(await raw_request.json(), raw_request.url.path[1:])

generator = await openai_serving_completion.create_completion(
request, raw_request)
if isinstance(generator, ErrorResponse):
Expand All @@ -129,6 +155,9 @@ async def create_completion(request: CompletionRequest, raw_request: Request):

@app.post("/v1/embeddings")
async def create_embedding(request: EmbeddingRequest, raw_request: Request):
if envs.LOCAL_RANK == 0:
reroute(await raw_request.json(), raw_request.url.path[1:])

generator = await openai_serving_embedding.create_embedding(
request, raw_request)
if isinstance(generator, ErrorResponse):
Expand Down Expand Up @@ -210,9 +239,20 @@ async def authentication(request: Request, call_next):
openai_serving_embedding = OpenAIServingEmbedding(engine, model_config,
served_model_names)
app.root_path = args.root_path
port = args.port
if (engine.engine.parallel_config.distributed_executor_backend
== "torchrun" and args.tensor_parallel_size > 1):
if envs.LOCAL_RANK != 0:
port = envs.AUX_PORT_START + envs.LOCAL_RANK - 1
else:
_aux_ports = [
x for x in range(
envs.AUX_PORT_START, envs.AUX_PORT_START +
args.tensor_parallel_size - 1)
]
uvicorn.run(app,
host=args.host,
port=args.port,
port=port,
log_level=args.uvicorn_log_level,
timeout_keep_alive=TIMEOUT_KEEP_ALIVE,
ssl_keyfile=args.ssl_keyfile,
Expand Down
5 changes: 5 additions & 0 deletions vllm/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
VLLM_INSTALL_PUNICA_KERNELS: bool = False
CMAKE_BUILD_TYPE: Optional[str] = None
VERBOSE: bool = False
AUX_PORT_START: int = 7400

# The begin-* and end* here are used by the documentation generator
# to extract the used env vars.
Expand Down Expand Up @@ -85,6 +86,10 @@
"VERBOSE":
lambda: bool(int(os.getenv('VERBOSE', '0'))),

# The start of auxillary port range to use for torchrun server mode
"AUX_PORT_START":
lambda: int(os.getenv("AUX_PORT_START", "7400")),

# Root directory for VLLM configuration files
# Note that this not only affects how vllm finds its configuration files
# during runtime, but also affects how vllm installs its configuration
Expand Down
3 changes: 3 additions & 0 deletions vllm/worker/model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import torch
import torch.nn as nn

from vllm import envs
from vllm.attention import AttentionMetadata, get_attn_backend
from vllm.config import (CacheConfig, DeviceConfig, LoadConfig, LoRAConfig,
ModelConfig, ParallelConfig, SchedulerConfig,
Expand Down Expand Up @@ -702,6 +703,8 @@ def execute_model(
seq_group_metadata_list: Optional[List[SequenceGroupMetadata]],
kv_caches: List[torch.Tensor],
) -> Optional[SamplerOutput]:
if self.parallel_config.distributed_executor_backend == "torchrun":
torch.cuda.set_device(f"cuda:{envs.LOCAL_RANK}")
(input_tokens, input_positions, attn_metadata, sampling_metadata,
lora_requests, lora_mapping, multi_modal_input
) = self.prepare_input_tensors(seq_group_metadata_list)
Expand Down
3 changes: 3 additions & 0 deletions vllm/worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""A GPU worker class."""
import gc
import os
import vllm.envs as envs
from typing import Any, Dict, List, Optional, Set, Tuple, Union

import torch
Expand Down Expand Up @@ -226,6 +227,8 @@ def execute_model(
self,
execute_model_req: Optional[ExecuteModelRequest] = None
) -> List[Union[SamplerOutput, PoolerOutput]]:
if self.parallel_config.distributed_executor_backend == "torchrun":
torch.cuda.set_device(f"cuda:{envs.LOCAL_RANK}")
if not self.is_driver_worker:
self._execute_model_non_driver()
return []
Expand Down
Loading