diff --git a/llm/server/dockerfiles/Dockerfile_serving_cuda118_cudnn8 b/llm/server/dockerfiles/Dockerfile_serving_cuda118_cudnn8 index c701765e9829..f6f762e63726 100644 --- a/llm/server/dockerfiles/Dockerfile_serving_cuda118_cudnn8 +++ b/llm/server/dockerfiles/Dockerfile_serving_cuda118_cudnn8 @@ -1,31 +1,28 @@ FROM registry.baidubce.com/paddlepaddle/fastdeploy:llm-base-gcc12.3-cuda11.8-cudnn8-nccl2.15.5 WORKDIR /opt/output/ -COPY ./server/ /opt/output/Serving/ - ENV LD_LIBRARY_PATH="/usr/local/cuda-11.8/compat/:$LD_LIBRARY_PATH" -RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple -RUN python3 -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu118/ \ +RUN python3 -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu123/ \ && python3 -m pip install paddlenlp==3.0.0b0 \ - && python3 -m pip install --no-cache-dir sentencepiece pycryptodome tritonclient[all]==2.41.1 + && python3 -m pip install --no-cache-dir sentencepiece pycryptodome tritonclient[all]==2.41.1 \ + && python3 -m pip install --no-cache-dir --force-reinstall https://paddlepaddle-inference-banchmark.bj.bcebos.com/paddlenlp_ops-0.0.0-py3-none-any.whl \ + && apt-get clean && rm -rf /var/lib/apt/lists/* -RUN git clone https://gitee.com/paddlepaddle/PaddleNLP.git && cd PaddleNLP/csrc \ - && python3 setup_cuda.py build && python3 setup_cuda.py install --user \ - && cp -r /opt/output/PaddleNLP/paddlenlp /usr/local/lib/python3.10/dist-packages/ \ - && cp -r /root/.local/lib/python3.10/site-packages/* /usr/local/lib/python3.10/dist-packages/ \ - && rm -rf /opt/output/PaddleNLP +RUN mkdir -p /opt/source/ && cd /opt/source/ \ + && git clone https://github.com/PaddlePaddle/Paddle.git \ + && git clone https://github.com/PaddlePaddle/PaddleNLP.git \ + && cp -r /opt/source/PaddleNLP/paddlenlp /usr/local/lib/python3.10/dist-packages/ \ + && python3 -m pip install --no-cache-dir -r PaddleNLP/requirements.txt \ + && python3 -m pip install --no-cache-dir -r PaddleNLP/llm/server/server/requirements.txt -RUN python3 -m pip install -r /opt/output/Serving/requirements.txt && rm /opt/output/Serving/requirements.txt -RUN mv Serving/server /usr/local/lib/python3.10/dist-packages/ RUN mkdir -p /opt/output/Serving/llm_model/model/1 \ - && mv /opt/output/Serving/config/config.pbtxt /opt/output/Serving/llm_model/model/ \ - && rm -rf /opt/output/Serving/config/ -RUN echo "from server.triton_server import TritonPythonModel" >>/opt/output/Serving/llm_model/model/1/model.py + && cp /opt/source/PaddleNLP/llm/server/server/config/config.pbtxt /opt/output/Serving/llm_model/model/ \ + && cp /opt/source/PaddleNLP/llm/server/server/scripts/start_server.sh /opt/output/Serving/ \ + && cp /opt/source/PaddleNLP/llm/server/server/scripts/stop_server.sh /opt/output/Serving/ -RUN cd /opt/output/Serving/ \ - && cp scripts/start_server.sh . && cp scripts/stop_server.sh . \ - && rm -rf scripts +ENV PYTHONPATH="/opt/source/PaddleNLP/llm/server/server" +RUN echo "from server.triton_server import TritonPythonModel" >>/opt/output/Serving/llm_model/model/1/model.py ENV http_proxy="" ENV https_proxy="" diff --git a/llm/server/dockerfiles/Dockerfile_serving_cuda123_cudnn9 b/llm/server/dockerfiles/Dockerfile_serving_cuda123_cudnn9 index 4b0d1f002d98..ffe2517d3f0c 100644 --- a/llm/server/dockerfiles/Dockerfile_serving_cuda123_cudnn9 +++ b/llm/server/dockerfiles/Dockerfile_serving_cuda123_cudnn9 @@ -1,31 +1,28 @@ FROM registry.baidubce.com/paddlepaddle/fastdeploy:llm-base-gcc12.3-cuda12.3-cudnn9-nccl2.15.5 WORKDIR /opt/output/ -COPY ./server/ /opt/output/Serving/ - ENV LD_LIBRARY_PATH="/usr/local/cuda-12.3/compat/:$LD_LIBRARY_PATH" -RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple RUN python3 -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu123/ \ && python3 -m pip install paddlenlp==3.0.0b0 \ - && python3 -m pip install --no-cache-dir sentencepiece pycryptodome tritonclient[all]==2.41.1 + && python3 -m pip install --no-cache-dir sentencepiece pycryptodome tritonclient[all]==2.41.1 \ + && python3 -m pip install --no-cache-dir --force-reinstall https://paddlepaddle-inference-banchmark.bj.bcebos.com/paddlenlp_ops-0.0.0-py3-none-any.whl \ + && apt-get clean && rm -rf /var/lib/apt/lists/* -RUN git clone https://gitee.com/paddlepaddle/PaddleNLP.git && cd PaddleNLP/csrc \ - && python3 setup_cuda.py build && python3 setup_cuda.py install --user \ - && cp -r /opt/output/PaddleNLP/paddlenlp /usr/local/lib/python3.10/dist-packages/ \ - && cp -r /root/.local/lib/python3.10/site-packages/* /usr/local/lib/python3.10/dist-packages/ \ - && rm -rf /opt/output/PaddleNLP +RUN mkdir -p /opt/source/ && cd /opt/source/ \ + && git clone https://github.com/PaddlePaddle/Paddle.git \ + && git clone https://github.com/PaddlePaddle/PaddleNLP.git \ + && cp -r /opt/source/PaddleNLP/paddlenlp /usr/local/lib/python3.10/dist-packages/ \ + && python3 -m pip install --no-cache-dir -r PaddleNLP/requirements.txt \ + && python3 -m pip install --no-cache-dir -r PaddleNLP/llm/server/server/requirements.txt -RUN python3 -m pip install -r /opt/output/Serving/requirements.txt && rm /opt/output/Serving/requirements.txt -RUN mv Serving/server /usr/local/lib/python3.10/dist-packages/ RUN mkdir -p /opt/output/Serving/llm_model/model/1 \ - && mv /opt/output/Serving/config/config.pbtxt /opt/output/Serving/llm_model/model/ \ - && rm -rf /opt/output/Serving/config/ -RUN echo "from server.triton_server import TritonPythonModel" >>/opt/output/Serving/llm_model/model/1/model.py + && cp /opt/source/PaddleNLP/llm/server/server/config/config.pbtxt /opt/output/Serving/llm_model/model/ \ + && cp /opt/source/PaddleNLP/llm/server/server/scripts/start_server.sh /opt/output/Serving/ \ + && cp /opt/source/PaddleNLP/llm/server/server/scripts/stop_server.sh /opt/output/Serving/ -RUN cd /opt/output/Serving/ \ - && cp scripts/start_server.sh . && cp scripts/stop_server.sh . \ - && rm -rf scripts +ENV PYTHONPATH="/opt/source/PaddleNLP/llm/server/server" +RUN echo "from server.triton_server import TritonPythonModel" >>/opt/output/Serving/llm_model/model/1/model.py ENV http_proxy="" ENV https_proxy="" diff --git a/llm/server/server/server/data/processor.py b/llm/server/server/server/data/processor.py index 423fe6b61408..b27ae1bf2ca0 100644 --- a/llm/server/server/server/data/processor.py +++ b/llm/server/server/server/data/processor.py @@ -143,6 +143,9 @@ def process_request(self, request, max_seq_len=None): request["eos_token_ids"] = [] request["eos_token_ids"].extend(get_eos_token_id(self.tokenizer, self.config.generation_config)) + if "stop_seqs" not in request or (isinstance(request["stop_seqs"], (list, tuple)) and len(request["stop_seqs"]) == 0): + self.update_stop_seq(request) + if "input_ids" not in request or \ (isinstance(request["input_ids"], (list, tuple)) and len(request["input_ids"]) == 0): if "text" in request: @@ -282,7 +285,7 @@ def _load_tokenizer(self): """ if self.config.use_hf_tokenizer: from transformers import AutoTokenizer - return AutoTokenizer.from_pretrained(self.config.model_dir, use_fast=False, vocab_file=os.path.join(self.config.model_dir, "sentencepiece.bpe.model")) + return AutoTokenizer.from_pretrained(self.config.model_dir, use_fast=False) else: from paddlenlp.transformers import AutoTokenizer return AutoTokenizer.from_pretrained(self.config.model_dir) @@ -334,3 +337,43 @@ def get_pad_id(self): if isinstance(self.tokenizer, (LlamaTokenizer, Llama3Tokenizer)) and not self.tokenizer.pad_token_id: return self.tokenizer.eos_token return self.tokenizer.pad_token_id + + def pad_batch_data(self, insts, pad_id=0, return_seq_len=False, return_array=True, pad_style="right"): + """Pad the instances to the max sequence length in batch.""" + if len(insts) == 0: + padded_insts = np.array([[]], dtype=np.int64) if return_array else [[]] + if return_seq_len: + seq_len = np.array([], dtype=np.int64) if return_array else [] + return padded_insts, seq_len + return padded_insts + + max_len = max(map(len, insts)) + if pad_style == "left": + padded_insts = [[pad_id] * (max_len - len(inst)) + list(inst) for inst in insts] + else: + padded_insts = [list(inst) + [pad_id] * (max_len - len(inst)) for inst in insts] + if return_array: + padded_insts = np.array(padded_insts, dtype=np.int64).reshape([-1, max_len]) + + if return_seq_len: + seq_len = [len(inst) for inst in insts] + if return_array: + seq_len = np.array(seq_len, dtype=np.int64).reshape(-1, 1) + return padded_insts, seq_len + return padded_insts + + def update_stop_seq(self, request): + """ + Update stop sequences from request. + """ + stop_seqs = [] + for seq in request.get("stop_sequences", []): + if seq != self.tokenizer.eos_token_id: + stop_seqs.append(self.tokenizer.convert_tokens_to_ids(self.tokenizer.tokenize(seq))) + request["stop_seqs"], request["stop_seqs_len"] = self.pad_batch_data( + stop_seqs, + pad_id=-1, + return_seq_len=True, + return_array=False + ) + data_processor_logger.debug(f"processed request: {request['stop_seqs'], request['stop_seqs_len']}") diff --git a/llm/server/server/server/engine/config.py b/llm/server/server/server/engine/config.py index 6f0e1964e21f..3b9a88f0c94b 100644 --- a/llm/server/server/server/engine/config.py +++ b/llm/server/server/server/engine/config.py @@ -19,6 +19,7 @@ from paddlenlp.generation import GenerationConfig from server.utils import model_server_logger +from dataclasses import dataclass class Config: @@ -203,6 +204,27 @@ def get_model_config(self): model_config_json = json.load(open(self.model_config_path, 'r', encoding='utf-8')) return model_config_json + def get_speculate_config(self): + """ + get speculate_decoding related config + + Returns: + SpeculateConfig: the speculate related config + """ + speculate_config = SpeculateConfig() + model_cfg = self.get_model_config() + if model_cfg.get("speculate_method", "None") != "None": + speculate_config.speculate_method = str(model_cfg["speculate_method"]) + speculate_config.speculate_max_draft_token_num = model_cfg[ + "speculate_max_draft_token_num"] + speculate_config.speculate_max_ngram_size = model_cfg[ + "speculate_max_ngram_size"] + + if speculate_config.speculate_method not in ["None", "inference_with_reference"]: + model_server_logger.error(f"Unsupport speculate method: {speculate_config.speculate_method}") + + return speculate_config + def read_from_config(self): """ reset model config from json file @@ -234,3 +256,10 @@ def get_unique_name(self, name): def __str__(self) -> str: return json.dumps(self.__dict__, indent=4) + + +@dataclass +class SpeculateConfig: + speculate_method: str = "None" + speculate_max_draft_token_num: int = 1 + speculate_max_ngram_size: int = 1 \ No newline at end of file diff --git a/llm/server/server/server/engine/infer.py b/llm/server/server/server/engine/infer.py index 63e87e425058..2641e889949d 100644 --- a/llm/server/server/server/engine/infer.py +++ b/llm/server/server/server/engine/infer.py @@ -29,6 +29,7 @@ from paddlenlp_ops import step_paddle from server.data.processor import DataProcessor from server.engine.config import Config +from paddlenlp.experimental.transformers import InferenceWithReferenceProposer from server.utils import get_logger from task_queue_manager import TaskQueueManager @@ -46,12 +47,19 @@ def __init__(self, args): self.config = Config() self.model_cfg = self.config.get_model_config() + self.speculate_config = self.config.get_speculate_config() + self.is_speculate_decoding = self.speculate_config.speculate_method != "None" self.format_print_configuration() self.args.num_layers = self.get_value(self.model_cfg, ["num_hidden_layers", "num_layers"]) self.args.num_attention_heads = self.get_value(self.model_cfg, ["num_attention_heads", "n_head"]) self.args.hidden_size = self.model_cfg["hidden_size"] + self.reduce_dialogue_repetition = int(os.environ.get("REDUCE_DIALOGUE_REPETITION", 0)) + + self.max_stop_seqs_num = int(os.getenv("MAX_STOP_SEQS_NUM", 5)) + self.stop_seqs_max_len = int(os.getenv("STOP_SEQS_MAX_LEN", 8)) + self.nranks = dist.get_world_size() self.init_dist_env() self.rank = fleet.worker_index() @@ -62,6 +70,17 @@ def __init__(self, args): self.cache_kvs = {} self.init_inputs() + if self.is_speculate_decoding: + logger.info(f'Using speculate decoding, method: {self.speculate_config.speculate_method}.') + if self.speculate_config.speculate_method == "inference_with_reference": + self.proposer = InferenceWithReferenceProposer( + self.speculate_config.speculate_max_draft_token_num, + self.speculate_config.speculate_max_ngram_size, + self.args.max_batch_size, + self.args.max_seq_len) + else: + self.proposer = None + self.infer_queue = TaskQueueManager(rank=self.rank, mp_num=self.nranks, port=self.config.infer_port) model_rank_path = os.path.join(self.args.model_dir, f"rank_{self.rank}") @@ -246,6 +265,31 @@ def init_inputs(self): self.share_inputs['free_list_len'] = paddle.full( shape=[1], fill_value=self.free_list_len, dtype="int32") + self.share_inputs['stop_seqs_len'] = paddle.full(shape=[self.max_stop_seqs_num,], + fill_value=0, + dtype="int32") + self.share_inputs['stop_seqs'] = paddle.full(shape=[self.max_stop_seqs_num, self.stop_seqs_max_len], + fill_value=-1, + dtype="int64") + + if self.reduce_dialogue_repetition: + self.share_inputs["first_token_ids"] = paddle.full( + shape=[self.args.max_batch_size, 1], fill_value=-1, dtype="int64") + self.share_inputs["ori_seq_lens_encoder"] = paddle.full( + shape=[self.args.max_batch_size, 1], fill_value=0, dtype="int32") + # speculate decoding input + if self.is_speculate_decoding: + self.share_inputs["accept_tokens"] = paddle.full( + shape=[self.args.max_batch_size, self.speculate_config.speculate_max_draft_token_num + 1], fill_value=0, dtype="int64" + ) + self.share_inputs["accept_num"] = paddle.full(shape=[self.args.max_batch_size], fill_value=0, dtype="int32") + self.share_inputs["draft_tokens"] = paddle.full( + shape=[self.args.max_batch_size, self.speculate_config.speculate_max_draft_token_num + 1], fill_value=0, dtype="int64" + ) + self.share_inputs["actual_draft_token_num"] = paddle.full( + shape=[self.args.max_batch_size], fill_value=self.speculate_config.speculate_max_draft_token_num, dtype="int32" + ) + def dy_input_preprocess(self, tasks): """ dynamic insertion @@ -279,6 +323,10 @@ def dy_input_preprocess(self, tasks): self.share_inputs['max_length'][idx:idx + 1] = max_dec_len self.share_inputs['stop_flags'][idx:idx + 1] = False + if self.reduce_dialogue_repetition: + self.share_inputs['first_token_ids'][idx:idx + 1] = self.share_inputs['input_ids'][idx:idx + 1, :1] + self.share_inputs["ori_seq_lens_encoder"][idx:idx + 1] = length + if "infer_seed" in task: self.share_inputs['infer_seed'][idx:idx + 1] = task['infer_seed'] @@ -288,10 +336,29 @@ def dy_input_preprocess(self, tasks): self.share_inputs["block_tables"][idx:idx + 1, :encoder_block_num] = np.array( task['block_tables'], dtype="int32") + if "stop_seqs_len" in task: + stop_seqs_num = len(task["stop_seqs_len"]) + for i in range(stop_seqs_num, self.max_stop_seqs_num): + task["stop_seqs_len"].append(0) + self.share_inputs['stop_seqs_len'][:] = np.array( + task["stop_seqs_len"], dtype="int32") + self.share_inputs['stop_seqs'][:stop_seqs_num, :len(task['stop_seqs'][0])] = np.array( + task["stop_seqs"], dtype="int64") + + if self.is_speculate_decoding: + self.share_inputs["draft_tokens"][idx:idx + 1] = np.zeros([self.speculate_config.speculate_max_draft_token_num + 1]) + self.share_inputs["actual_draft_token_num"][idx:idx + 1] = np.array([self.speculate_config.speculate_max_draft_token_num]) + def step_cuda(self, seq_lens_this_time): """ step cuda """ + # whether speculate decoding + if self.is_speculate_decoding: + speculate_step_token_num = self.speculate_config.speculate_max_draft_token_num + 1 + else: + speculate_step_token_num = 0 + step_paddle(self.share_inputs['stop_flags'], seq_lens_this_time, self.share_inputs['step_seq_lens_encoder'], self.share_inputs['seq_lens_encoder'], @@ -304,7 +371,8 @@ def step_cuda(self, seq_lens_this_time): self.share_inputs['free_list'], self.share_inputs['free_list_len'], self.share_inputs['input_ids'], self.share_inputs['pre_ids'], self.share_inputs['step_idx'], self.share_inputs['next_tokens'], - self.args.block_size, self.args.enc_dec_block_num, self.args.first_token_id) + self.args.block_size, self.args.enc_dec_block_num, self.args.first_token_id, + speculate_step_token_num) def initialize_engine_ready_check_flag(self): """ @@ -429,6 +497,13 @@ def run(self): time.sleep(0.001) continue + if self.proposer is not None: + self.proposer.run( + self.share_inputs, + real_batch_size=seq_lens_this_time.shape[0], + seq_lens_this_time=seq_lens_this_time, + ) + self.infer_engine.predictor.run() self.share_inputs['infer_seed'].add_(infer_seed_increment) self.share_inputs['infer_seed'][:] %= self.MAX_INFER_SEED @@ -474,6 +549,11 @@ def _init_predictor(self): config.switch_ir_optim(False) config.enable_use_gpu(100, device_id) + pir_flag = int(os.environ.get("FLAGS_enable_pir_api", 0)) + if pir_flag == 1: + config.enable_new_executor() + config.enable_new_ir() + # distributed config if self.mp_degree > 1: trainer_endpoints = fleet.worker_endpoints() @@ -528,7 +608,7 @@ def parse_args(): """ parse args from command line """ - parser = argparse.ArgumentParser("Deploy LLM Inference") + parser = argparse.ArgumentParser("FastDeploy LLM Inference") parser.add_argument('-m', '--model_dir', type=str, diff --git a/llm/server/server/server/engine/token_processor.py b/llm/server/server/server/engine/token_processor.py index 507a3d43bdf9..1213a9384b77 100644 --- a/llm/server/server/server/engine/token_processor.py +++ b/llm/server/server/server/engine/token_processor.py @@ -20,8 +20,9 @@ from datetime import datetime import numpy as np -from paddlenlp_ops import get_output +from paddlenlp_ops import get_output, speculate_get_output from server.utils import datetime_diff, model_server_logger, monitor_logger +from paddlenlp.utils.env import MAX_DRAFT_TOKENS, SPECULATE_MAX_BSZ class TokenProcessor(object): @@ -37,7 +38,12 @@ def __init__(self, cfg): self.all_tokens = [[] for _ in range(self.cfg.max_batch_size)] self.tokens_counter = Counter() - self.output_tokens = paddle.full(shape=[self.cfg.max_batch_size + 2, 1], fill_value=2, dtype="int64") + + self.is_speculate_decoding = self.cfg.get_speculate_config().speculate_method != "None" + if self.is_speculate_decoding: + self.output_tokens = paddle.full(shape=[SPECULATE_MAX_BSZ * MAX_DRAFT_TOKENS + SPECULATE_MAX_BSZ + 2, 1], fill_value=2, dtype="int64") + else: + self.output_tokens = paddle.full(shape=[self.cfg.max_batch_size + 2, 1], fill_value=2, dtype="int64") self.worker = None self.record_time_interval = int(os.getenv("RECORD_TIME_INTERVAL", "600")) @@ -77,10 +83,14 @@ def process_sampling_results(self): try: rank_id = 0 is_blocking = True - get_output(self.output_tokens, rank_id, is_blocking) + if self.is_speculate_decoding: + speculate_get_output(self.output_tokens, rank_id, is_blocking) + else: + get_output(self.output_tokens, rank_id, is_blocking) if self.output_tokens[0, 0] == -2: continue + self._process_batch_output() except Exception as e: model_server_logger.info("while get input_data error: {0} {1}".format(e, str(traceback.format_exc()))) @@ -101,14 +111,14 @@ def postprocess(self, batch_result, exist_finished_task=False): with open(result_file, "a") as f: f.write("{}\n".format(result)) - def _get_single_result(self, i, task_id, token_id, task): + def _get_single_result(self, i, task_id, token_ids, task): """ processing single results Args: i (int): batch index task_id (str): task id - token_id (int): token id + token_ids (list): token id task (dict): task information Returns: @@ -121,7 +131,7 @@ def _get_single_result(self, i, task_id, token_id, task): result = { "req_id": task_id, "is_end": 0, - "token_ids": [token_id], + "token_ids": token_ids, "send_idx": self.tokens_counter[task_id], "inference_time_cost": inference_time_cost, "infer_seed": task["infer_seed"], @@ -137,26 +147,31 @@ def _get_single_result(self, i, task_id, token_id, task): result[key] = str(task[key]) # fill some extra information - if token_id in task["eos_token_ids"]: - result["is_end"] = 1 - result["token_ids"] = [] - result["tokens_all_num"] = len(self.all_tokens[i]) + 1 - result["tokens_all_ids"] = self.all_tokens[i] - - info_dict = {} - info_dict["req_id"] = task["req_id"] - info_dict["input_token_num"] = len(task["input_ids"]) - info_dict["output_token_num"] = len(self.all_tokens[i]) - if hasattr(task, "preprocess_start_time") and hasattr(task, "preprocess_end_time"): - info_dict["preprocess_cost_time"] = datetime_diff(task["preprocess_start_time"], - task["preprocess_end_time"]) - if hasattr(task, "preprocess_end_time") and hasattr(task, "schedule_start_time"): - info_dict["cache_waiting_cost_time"] = datetime_diff(task["preprocess_end_time"], - task["schedule_start_time"]) - info_dict["inference_time_cost"] = task["inference_time_cost"] - info_dict["version"] = "4.6" - info_dict["timestamp"] = time.time() - monitor_logger.info(f"{info_dict}") + result["token_ids"] = [] + for token_id in token_ids: + if token_id in task["eos_token_ids"]: + result["is_end"] = 1 + result["token_ids"] = [] + result["tokens_all_num"] = len(self.all_tokens[i]) + 1 + result["tokens_all_ids"] = self.all_tokens[i] + + info_dict = {} + info_dict["req_id"] = task["req_id"] + info_dict["input_token_num"] = len(task["input_ids"]) + info_dict["output_token_num"] = len(self.all_tokens[i]) + if hasattr(task, "preprocess_start_time") and hasattr(task, "preprocess_end_time"): + info_dict["preprocess_cost_time"] = datetime_diff(task["preprocess_start_time"], + task["preprocess_end_time"]) + if hasattr(task, "preprocess_end_time") and hasattr(task, "schedule_start_time"): + info_dict["cache_waiting_cost_time"] = datetime_diff(task["preprocess_end_time"], + task["schedule_start_time"]) + info_dict["inference_time_cost"] = task["inference_time_cost"] + info_dict["version"] = "OpenSource" + info_dict["timestamp"] = time.time() + monitor_logger.info(f"{info_dict}") + break + else: + result["token_ids"].append(token_id) return result @@ -177,7 +192,10 @@ def _process_batch_output(self): """ tokens = self.output_tokens.numpy() batch = self.output_tokens[1, 0] - tokens = tokens[2:batch + 2] + if not self.is_speculate_decoding: + tokens = tokens[2:batch + 2] + else: + accept_num = tokens[2:batch + 2] batch_result = list() exist_finished_task = False @@ -185,25 +203,31 @@ def _process_batch_output(self): if self.resource_manager.stop_flags[i]: continue - token_id = int(tokens[i, 0]) - if token_id < 0: + if not self.is_speculate_decoding: + token_ids = [int(tokens[i, 0])] + else: + token_ids = tokens[2 + SPECULATE_MAX_BSZ + i * MAX_DRAFT_TOKENS: 2 + SPECULATE_MAX_BSZ + i * MAX_DRAFT_TOKENS + accept_num[i, 0], 0].tolist() + + if any(token_id < 0 for token_id in token_ids): continue task = self.resource_manager.tasks_list[i] task_id = task["req_id"] - result = self._get_single_result(i, task_id, token_id, task) - - self.tokens_counter[task_id] += 1 - if token_id not in task["eos_token_ids"]: - self.all_tokens[i].append(token_id) - - self.number_of_output_tokens += 1 - if token_id in task["eos_token_ids"]: - self._recycle_resources(task_id, i, task) - model_server_logger.info("req_id: {0} finished".format(task_id)) - model_server_logger.info(f"{self.resource_manager.info()}") - exist_finished_task = True + result = self._get_single_result(i, task_id, token_ids, task) + + for token_id in token_ids: + self.tokens_counter[task_id] += 1 + if token_id not in task["eos_token_ids"]: + self.all_tokens[i].append(token_id) + + self.number_of_output_tokens += 1 + if token_id in task["eos_token_ids"]: + self._recycle_resources(task_id, i, task) + model_server_logger.info("req_id: {0} finished".format(task_id)) + model_server_logger.info(f"{self.resource_manager.info()}") + exist_finished_task = True + break batch_result.append(result) self.postprocess(batch_result, exist_finished_task) @@ -228,7 +252,10 @@ def process_sampling_results(self): while self._is_running: try: rank_id = 0 - get_output(self.output_tokens, rank_id, self._is_blocking) + if self.is_speculate_decoding: + speculate_get_output(self.output_tokens, rank_id, self._is_blocking) + else: + get_output(self.output_tokens, rank_id, self._is_blocking) if self.output_tokens[0, 0] == -2: continue diff --git a/llm/server/server/server/http_server/api.py b/llm/server/server/server/http_server/api.py index df9c066284f4..2e01ae039dba 100644 --- a/llm/server/server/server/http_server/api.py +++ b/llm/server/server/server/http_server/api.py @@ -31,6 +31,7 @@ class Req(BaseModel): req_id: str = Field(default_factory=lambda: str(uuid.uuid4())) input_ids: Optional[List[int]] = None text: Optional[str] = None + stop_sequences: Optional[List] = None messages: Optional[List] = None max_dec_len: Optional[int] = None seq_len: Optional[int] = None diff --git a/llm/server/server/server/triton_server.py b/llm/server/server/server/triton_server.py index 601a1b017907..02be0b4e8aa8 100644 --- a/llm/server/server/server/triton_server.py +++ b/llm/server/server/server/triton_server.py @@ -98,11 +98,35 @@ def _push_mode_sender_thread(self): except Exception as e: model_server_logger.error("Unexcepted error happend: {}, {}".format(e, str(traceback.format_exc()))) + def _cache_special_tokens(self, batch_result): + for i in range(len(batch_result)): + is_end = batch_result[i].get("is_end", 0) + token_ids = batch_result[i]["token_ids"] + if is_end != 1: + if batch_result[i]["req_id"] not in self.token_buffer: + self.token_buffer[batch_result[i]["req_id"]] = list() + self.score_buffer[batch_result[i]["req_id"]] = list() + self.token_buffer[batch_result[i]["req_id"]].extend(token_ids) + self.score_buffer[batch_result[i]["req_id"]].extend(batch_result[i].get("token_scores", [])) + batch_result[i]["token_ids"] = [] + if "token_scores" in batch_result[i]: + batch_result[i]["token_scores"] = [] + else: + if batch_result[i]["req_id"] in self.token_buffer: + batch_result[i]["token_ids"] = self.token_buffer[batch_result[i] + ["req_id"]] + batch_result[i]["token_ids"] + del self.token_buffer[batch_result[i]["req_id"]] + if "token_scores" in batch_result[i]: + batch_result[i]["token_scores"] = self.score_buffer[batch_result[i] + ["req_id"]] + batch_result[i]["token_scores"] + del self.score_buffer[batch_result[i]["req_id"]] + def postprocess(self, batch_result, exist_finished_task=False): """ single postprocess for triton """ try: + self._cache_special_tokens(batch_result) self.cached_generated_tokens.put(batch_result) except Exception as e: model_server_logger.info( @@ -168,7 +192,7 @@ def initialize(self, args): base_config = Config() self.cfg = TritonConfig(base_config) - self.cfg.print(file="log/deploy_init.info") + self.cfg.print(file="log/fastdeploy_init.info") # init engine self.token_processor = TritonTokenProcessor(self.cfg, self)