From f2d4861ba5eeb562b4897d98843d5cbac65ab59e Mon Sep 17 00:00:00 2001 From: qinyiqun Date: Thu, 18 Jun 2026 09:22:18 +0000 Subject: [PATCH] Add MoE inference and expert parallel support - add reusable MoE router, dispatcher, runner, and expert abstractions - enable Qwen3 MoE fused inference with TP-local expert parallel routing - add graph-safe MoE workspace handling and EP backend selection through engine config - preserve legacy MoE path for existing DeepSeek V2 code --- csrc/config/model_config.cpp | 4 + csrc/config/model_config.hpp | 14 ++ csrc/engine/compiler/paged_compiler.cpp | 11 +- csrc/engine/compiler/paged_compiler.hpp | 1 + .../compiler/static_batching_compiler.cpp | 10 +- .../compiler/static_batching_compiler.hpp | 1 + csrc/engine/infer_engine.cpp | 20 +- csrc/engine/infer_engine.hpp | 2 +- csrc/engine/rank_worker.cpp | 8 +- csrc/engine/rank_worker.hpp | 3 +- .../text_decoder_layer.hpp | 17 +- csrc/layers/common_modules.hpp | 2 - csrc/layers/moe/common/moe_types.hpp | 93 ++++++++ csrc/layers/moe/common/topk_output.hpp | 13 ++ .../layers/moe/dispatcher/base_dispatcher.hpp | 27 +++ .../moe/dispatcher/dispatcher_factory.cpp | 28 +++ .../moe/dispatcher/dispatcher_factory.hpp | 14 ++ .../moe/dispatcher/standard_dispatcher.cpp | 42 ++++ .../moe/dispatcher/standard_dispatcher.hpp | 26 +++ .../allgather_reduce_scatter_dispatcher.cpp | 162 ++++++++++++++ .../allgather_reduce_scatter_dispatcher.hpp | 27 +++ csrc/layers/moe/ep/base_ep_dispatcher.cpp | 71 ++++++ csrc/layers/moe/ep/base_ep_dispatcher.hpp | 30 +++ csrc/layers/moe/ep/deepep_dispatcher.cpp | 47 ++++ csrc/layers/moe/ep/deepep_dispatcher.hpp | 30 +++ csrc/layers/moe/ep/ep_config.cpp | 131 +++++++++++ csrc/layers/moe/ep/ep_config.hpp | 52 +++++ .../moe/ep/local_allreduce_dispatcher.cpp | 54 +++++ .../moe/ep/local_allreduce_dispatcher.hpp | 22 ++ csrc/layers/moe/experts/fused_moe_experts.cpp | 78 +++++++ csrc/layers/moe/experts/fused_moe_experts.hpp | 30 +++ csrc/layers/moe/fused_moe.cpp | 49 ++++ csrc/layers/moe/fused_moe.hpp | 30 +++ csrc/layers/{mlp => moe/legacy}/moe_mlp.cpp | 8 +- csrc/layers/{mlp => moe/legacy}/moe_mlp.hpp | 8 +- csrc/layers/moe/router/topk_router.cpp | 131 +++++++++++ csrc/layers/moe/router/topk_router.hpp | 42 ++++ csrc/layers/moe/runner/base_runner.hpp | 16 ++ .../moe/runner/cuda_fused_moe_runner.cpp | 211 ++++++++++++++++++ .../moe/runner/cuda_fused_moe_runner.hpp | 42 ++++ csrc/layers/moe/sparse_moe_block.cpp | 34 +++ csrc/layers/moe/sparse_moe_block.hpp | 29 +++ csrc/models/deepseek_v2/deepseek_v2_moe.hpp | 3 +- csrc/models/qwen3/qwen3_attention.cpp | 8 +- csrc/models/qwen3_moe/qwen3_moe_experts.cpp | 59 ----- csrc/models/qwen3_moe/qwen3_moe_experts.hpp | 25 --- .../qwen3_moe/qwen3_moe_sparse_moe_block.cpp | 20 +- .../qwen3_moe/qwen3_moe_sparse_moe_block.hpp | 18 +- .../qwen3_moe/qwen3_moe_topk_router.cpp | 36 --- .../qwen3_moe/qwen3_moe_topk_router.hpp | 23 -- .../qwen3_next/qwen3_next_attention.cpp | 8 +- csrc/pybind11/engine/engine.hpp | 2 +- examples/bench.py | 91 ++++++++ python/infinilm/base_config.py | 15 ++ python/infinilm/config/engine_config.py | 4 + python/infinilm/infer_engine.py | 7 +- python/infinilm/llm/llm.py | 8 + .../infinilm/llm/model_runner/model_runner.py | 2 + python/infinilm/modeling_utils.py | 47 ++-- python/infinilm/server/inference_server.py | 100 +++++++++ 60 files changed, 1929 insertions(+), 217 deletions(-) create mode 100644 csrc/layers/moe/common/moe_types.hpp create mode 100644 csrc/layers/moe/common/topk_output.hpp create mode 100644 csrc/layers/moe/dispatcher/base_dispatcher.hpp create mode 100644 csrc/layers/moe/dispatcher/dispatcher_factory.cpp create mode 100644 csrc/layers/moe/dispatcher/dispatcher_factory.hpp create mode 100644 csrc/layers/moe/dispatcher/standard_dispatcher.cpp create mode 100644 csrc/layers/moe/dispatcher/standard_dispatcher.hpp create mode 100644 csrc/layers/moe/ep/allgather_reduce_scatter_dispatcher.cpp create mode 100644 csrc/layers/moe/ep/allgather_reduce_scatter_dispatcher.hpp create mode 100644 csrc/layers/moe/ep/base_ep_dispatcher.cpp create mode 100644 csrc/layers/moe/ep/base_ep_dispatcher.hpp create mode 100644 csrc/layers/moe/ep/deepep_dispatcher.cpp create mode 100644 csrc/layers/moe/ep/deepep_dispatcher.hpp create mode 100644 csrc/layers/moe/ep/ep_config.cpp create mode 100644 csrc/layers/moe/ep/ep_config.hpp create mode 100644 csrc/layers/moe/ep/local_allreduce_dispatcher.cpp create mode 100644 csrc/layers/moe/ep/local_allreduce_dispatcher.hpp create mode 100644 csrc/layers/moe/experts/fused_moe_experts.cpp create mode 100644 csrc/layers/moe/experts/fused_moe_experts.hpp create mode 100644 csrc/layers/moe/fused_moe.cpp create mode 100644 csrc/layers/moe/fused_moe.hpp rename csrc/layers/{mlp => moe/legacy}/moe_mlp.cpp (92%) rename csrc/layers/{mlp => moe/legacy}/moe_mlp.hpp (86%) create mode 100644 csrc/layers/moe/router/topk_router.cpp create mode 100644 csrc/layers/moe/router/topk_router.hpp create mode 100644 csrc/layers/moe/runner/base_runner.hpp create mode 100644 csrc/layers/moe/runner/cuda_fused_moe_runner.cpp create mode 100644 csrc/layers/moe/runner/cuda_fused_moe_runner.hpp create mode 100644 csrc/layers/moe/sparse_moe_block.cpp create mode 100644 csrc/layers/moe/sparse_moe_block.hpp delete mode 100644 csrc/models/qwen3_moe/qwen3_moe_experts.cpp delete mode 100644 csrc/models/qwen3_moe/qwen3_moe_experts.hpp delete mode 100644 csrc/models/qwen3_moe/qwen3_moe_topk_router.cpp delete mode 100644 csrc/models/qwen3_moe/qwen3_moe_topk_router.hpp diff --git a/csrc/config/model_config.cpp b/csrc/config/model_config.cpp index f0095558e..03839f8b4 100644 --- a/csrc/config/model_config.cpp +++ b/csrc/config/model_config.cpp @@ -38,6 +38,10 @@ infinicore::DataType ModelConfig::get_dtype() const { return parse_dtype(dtype_str); } +bool ModelConfig::contains_non_null(const std::string &key) const { + return config_json.contains(key) && !config_json.at(key).is_null(); +} + size_t ModelConfig::get_rotary_dim() const { size_t head_dim = get_head_dim(); double partial_rotary_factor = get_or("partial_rotary_factor", 1.0); diff --git a/csrc/config/model_config.hpp b/csrc/config/model_config.hpp index 7f694dbbc..51322cfb2 100644 --- a/csrc/config/model_config.hpp +++ b/csrc/config/model_config.hpp @@ -48,6 +48,20 @@ class ModelConfig { return default_value; } } + + bool contains_non_null(const std::string &key) const; + + template + T get_or_alias(const std::string &key, const std::string &alias, const T &default_value) const { + if (!key.empty() && config_json.contains(key) && !config_json.at(key).is_null()) { + return config_json.at(key).get(); + } + if (!alias.empty() && config_json.contains(alias) && !config_json.at(alias).is_null()) { + return config_json.at(alias).get(); + } + return default_value; + } + size_t get_kv_dim() const { return get("hidden_size") * get("num_key_value_heads") / get("num_attention_heads"); } diff --git a/csrc/engine/compiler/paged_compiler.cpp b/csrc/engine/compiler/paged_compiler.cpp index 054fd023e..994196475 100644 --- a/csrc/engine/compiler/paged_compiler.cpp +++ b/csrc/engine/compiler/paged_compiler.cpp @@ -80,12 +80,17 @@ void PagedCompiler::compile() { auto input = make_decode_input(b); barrier_->wait(); + (void)model_->forward(input); + infinicore::context::syncStream(); // Capture must not start with stale Marlin locks from previous // warmup/capture attempts. This reset is intentionally outside // graph capture; the current implementation still pays a memset // before every graph replay in get_compiled(). model_->reset_runtime_state(); infinicore::context::syncStream(); + barrier_->wait(); + + barrier_->wait(); infinicore::context::startGraphRecording(); auto output = model_->forward(input); auto graph = infinicore::context::stopGraphRecording(); @@ -93,8 +98,10 @@ void PagedCompiler::compile() { auto shared_output = std::shared_ptr( new InfinilmModel::Output{infinicore::graph::GraphTensor(output.logits)}); + auto replay_output = std::shared_ptr( + new InfinilmModel::Output{shared_output->logits->resume_from_blob_()}); - compiled_map_decode_[b] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output)}; + compiled_map_decode_[b] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output), replay_output}; } } } @@ -141,7 +148,7 @@ PagedCompiler::Compiled PagedCompiler::get_compiled(const InfinilmModel::Input & model_->reset_runtime_state(); auto graph = std::get<0>(result->second.compiled); - auto shared_output = std::shared_ptr(new InfinilmModel::Output{std::get<1>(result->second.compiled)->logits->resume_from_blob_()}); + auto shared_output = result->second.replay_output; return std::make_tuple(graph, shared_output); } diff --git a/csrc/engine/compiler/paged_compiler.hpp b/csrc/engine/compiler/paged_compiler.hpp index a1125864d..0e372075b 100644 --- a/csrc/engine/compiler/paged_compiler.hpp +++ b/csrc/engine/compiler/paged_compiler.hpp @@ -21,6 +21,7 @@ class PagedCompiler : public GraphCompiler { struct CompiledResult { InfinilmModel::Input input; Compiled compiled; + std::shared_ptr replay_output; }; std::unordered_map< diff --git a/csrc/engine/compiler/static_batching_compiler.cpp b/csrc/engine/compiler/static_batching_compiler.cpp index dcd7f7143..3274553b9 100644 --- a/csrc/engine/compiler/static_batching_compiler.cpp +++ b/csrc/engine/compiler/static_batching_compiler.cpp @@ -28,6 +28,11 @@ void StaticBatchingCompiler::compile() { input.slot_mapping, }; + barrier_->wait(); + (void)model_->forward(input); + infinicore::context::syncStream(); + barrier_->wait(); + barrier_->wait(); infinicore::context::startGraphRecording(); auto output = model_->forward(input); @@ -35,8 +40,9 @@ void StaticBatchingCompiler::compile() { barrier_->wait(); auto shared_output = std::shared_ptr(new InfinilmModel::Output{infinicore::graph::GraphTensor(output.logits)}); + auto replay_output = std::shared_ptr(new InfinilmModel::Output{shared_output->logits->resume_from_blob_()}); - compiled_map_[std::make_tuple(b, 1)] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output)}; + compiled_map_[std::make_tuple(b, 1)] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output), replay_output}; } } @@ -56,7 +62,7 @@ StaticBatchingCompiler::Compiled StaticBatchingCompiler::get_compiled( graph_input.total_sequence_lengths.value()->copy_from(input.total_sequence_lengths.value()); auto graph = std::get<0>(result->second.compiled); - auto shared_output = std::shared_ptr(new InfinilmModel::Output{std::get<1>(result->second.compiled)->logits->resume_from_blob_()}); + auto shared_output = result->second.replay_output; return std::make_tuple(graph, shared_output); } } else { diff --git a/csrc/engine/compiler/static_batching_compiler.hpp b/csrc/engine/compiler/static_batching_compiler.hpp index fe1180fcf..297bb355d 100644 --- a/csrc/engine/compiler/static_batching_compiler.hpp +++ b/csrc/engine/compiler/static_batching_compiler.hpp @@ -25,6 +25,7 @@ class StaticBatchingCompiler : public GraphCompiler { struct CompiledResult { InfinilmModel::Input input; Compiled compiled; + std::shared_ptr replay_output; }; std::unordered_map< diff --git a/csrc/engine/infer_engine.cpp b/csrc/engine/infer_engine.cpp index 7c8429236..13b3e15b5 100644 --- a/csrc/engine/infer_engine.cpp +++ b/csrc/engine/infer_engine.cpp @@ -3,6 +3,7 @@ #include "spdlog/spdlog.h" #include #include +#include namespace infinilm::engine { @@ -67,10 +68,10 @@ void InferEngine::load_param(const std::string &name, const infinicore::Tensor & } } -void InferEngine::load_params(const std::unordered_map ¶ms) { +void InferEngine::load_params(const std::unordered_map ¶ms, bool strict) { if (workers_.size() <= 1 || weight_load_mode_ == "sync") { for (auto &worker : workers_) { - worker->load_params(params); + worker->load_params(params, strict); } return; } @@ -78,8 +79,8 @@ void InferEngine::load_params(const std::unordered_map> futures; futures.reserve(workers_.size()); for (auto &worker : workers_) { - futures.emplace_back(std::async(std::launch::async, [&worker, ¶ms] { - worker->load_params(params); + futures.emplace_back(std::async(std::launch::async, [&worker, ¶ms, strict] { + worker->load_params(params, strict); })); } for (auto &future : futures) { @@ -118,7 +119,16 @@ std::vector InferEngine::state_dict_keys() { if (0 == workers_.size()) { throw std::runtime_error(" Model object not found. "); } - return workers_.front()->state_dict_keys(); + std::vector keys; + std::unordered_set seen; + for (auto &worker : workers_) { + for (const auto &key : worker->state_dict_keys()) { + if (seen.emplace(key).second) { + keys.push_back(key); + } + } + } + return keys; } //------------------------------------------------------ diff --git a/csrc/engine/infer_engine.hpp b/csrc/engine/infer_engine.hpp index eb218e870..9d8515570 100644 --- a/csrc/engine/infer_engine.hpp +++ b/csrc/engine/infer_engine.hpp @@ -37,7 +37,7 @@ class InferEngine { void load_param(const std::string &name, const infinicore::Tensor ¶m); // Load a batch of parameters to all workers, syncing each worker once after the batch. - void load_params(const std::unordered_map ¶ms); + void load_params(const std::unordered_map ¶ms, bool strict = true); // process the weights after loading on all workers (e.g., for quantization) void process_weights_after_loading(); diff --git a/csrc/engine/rank_worker.cpp b/csrc/engine/rank_worker.cpp index 52bc237d7..07c1298b9 100644 --- a/csrc/engine/rank_worker.cpp +++ b/csrc/engine/rank_worker.cpp @@ -87,7 +87,7 @@ void RankWorker::load_param(const std::string &name, //------------------------------------------------------ // load_params -- synchronous batch load //------------------------------------------------------ -void RankWorker::load_params(const std::unordered_map ¶ms) { +void RankWorker::load_params(const std::unordered_map ¶ms, bool strict) { { std::lock_guard lock(mutex_); if (should_exit_) { @@ -95,6 +95,7 @@ void RankWorker::load_params(const std::unordered_map local_params; + bool local_params_strict = true; Input local_args; std::unique_ptr local_cache_config; @@ -314,6 +316,8 @@ void RankWorker::thread_loop() { local_param = pending_param_; } else if (local_cmd == Command::LOAD_BATCH) { local_params = std::move(pending_params_); + local_params_strict = pending_params_strict_; + pending_params_strict_ = true; pending_params_.clear(); } else if (local_cmd == Command::PREPROCESS) { @@ -353,7 +357,7 @@ void RankWorker::thread_loop() { } else if (local_cmd == Command::LOAD_BATCH) { try { - model_->load_parameters_no_sync(local_params); + model_->load_parameters_no_sync(local_params, local_params_strict); infinicore::context::syncStream(); } catch (const std::exception &e) { { diff --git a/csrc/engine/rank_worker.hpp b/csrc/engine/rank_worker.hpp index a26a3cd69..f015a405f 100644 --- a/csrc/engine/rank_worker.hpp +++ b/csrc/engine/rank_worker.hpp @@ -85,7 +85,7 @@ class RankWorker { void load_param(const std::string &name, const infinicore::Tensor ¶m); - void load_params(const std::unordered_map ¶ms); + void load_params(const std::unordered_map ¶ms, bool strict = true); void process_weights_after_loading(); @@ -148,6 +148,7 @@ class RankWorker { std::string pending_param_name_; infinicore::Tensor pending_param_; std::unordered_map pending_params_; + bool pending_params_strict_ = true; Input pending_args_; std::unique_ptr pending_cache_config_; diff --git a/csrc/layers/causal_lm_templates/text_decoder_layer.hpp b/csrc/layers/causal_lm_templates/text_decoder_layer.hpp index 8d70a0410..471276e76 100644 --- a/csrc/layers/causal_lm_templates/text_decoder_layer.hpp +++ b/csrc/layers/causal_lm_templates/text_decoder_layer.hpp @@ -7,6 +7,7 @@ #include "infinicore/ops.hpp" #include "infinicore/tensor.hpp" #include +#include #include namespace infinilm::layers::causal_lm_templates { @@ -32,7 +33,7 @@ class TextDecoderLayer : public infinicore::nn::Module { post_attention_layernorm_ = this->register_module("post_attention_layernorm", hidden_size, rms_norm_eps, dtype, device); self_attn_ = this->register_module("self_attn", model_config, layer_idx, device); - mlp_ = this->register_module("mlp", model_config, device); + mlp_ = register_mlp(model_config, layer_idx, device); } std::tuple forward(const infinicore::Tensor &positions, @@ -68,6 +69,20 @@ class TextDecoderLayer : public infinicore::nn::Module { INFINICORE_NN_MODULE(MLP, mlp); size_t layer_idx_; + +private: + std::shared_ptr register_mlp(std::shared_ptr model_config, + size_t layer_idx, + const infinicore::Device &device) { + if constexpr (std::is_constructible_v, + size_t, + const infinicore::Device &>) { + return this->register_module("mlp", model_config, layer_idx, device); + } else { + return this->register_module("mlp", model_config, device); + } + } }; } // namespace infinilm::layers::causal_lm_templates diff --git a/csrc/layers/common_modules.hpp b/csrc/layers/common_modules.hpp index 9904f7b92..eb49738fe 100644 --- a/csrc/layers/common_modules.hpp +++ b/csrc/layers/common_modules.hpp @@ -1,6 +1,5 @@ #pragma once #include "mlp/mlp.hpp" -#include "mlp/moe_mlp.hpp" #include "attention/attention.hpp" #include "causal_lm_templates/text_causal_lm.hpp" @@ -12,7 +11,6 @@ namespace infinilm::layers { using MLP = infinilm::layers::mlp::MLP; -using MoeMLP = infinilm::layers::moe_mlp::MoeMLP; namespace attention { diff --git a/csrc/layers/moe/common/moe_types.hpp b/csrc/layers/moe/common/moe_types.hpp new file mode 100644 index 000000000..8b784cf22 --- /dev/null +++ b/csrc/layers/moe/common/moe_types.hpp @@ -0,0 +1,93 @@ +#pragma once + +#include "topk_output.hpp" + +#include "infinicore/tensor.hpp" + +#include + +namespace infinilm::layers::moe { + +enum class DispatchOutputFormat { + Standard, + DeepEPNormal, + DeepEPLL, +}; + +enum class CombineInputFormat { + Standard, + DeepEPNormal, + DeepEPLL, +}; + +struct DispatchOutput { + DispatchOutputFormat format = DispatchOutputFormat::Standard; + infinicore::Tensor hidden_states; + infinicore::Tensor hidden_states_scale; + TopKOutput topk_output; + infinicore::Tensor expert_map; +}; + +struct MoeRoutingMetadata { + infinicore::Tensor sorted_token_ids; + infinicore::Tensor expert_ids; + infinicore::Tensor num_tokens_post_padded; + + infinicore::Tensor expert_offsets; + infinicore::Tensor blockscale_offsets; + infinicore::Tensor problem_sizes1; + infinicore::Tensor problem_sizes2; + infinicore::Tensor input_permutation; + infinicore::Tensor output_permutation; + + bool has_grouped_gemm_metadata = false; +}; + +struct CombineInput { + CombineInputFormat format = CombineInputFormat::Standard; + infinicore::Tensor hidden_states; + TopKOutput topk_output; + MoeRoutingMetadata routing_metadata; +}; + +struct MoeWeights { + infinicore::Tensor packed_w13; + infinicore::Tensor packed_w2; + + bool empty() const { + return !packed_w13 && !packed_w2; + } + + bool has_packed_dense_weights() const { + return packed_w13 && packed_w2; + } +}; + +struct MoeWorkspace { + infinicore::Tensor ep_gathered_hidden_states; + infinicore::Tensor ep_gathered_topk_weights; + infinicore::Tensor ep_gathered_topk_ids; + infinicore::Tensor ep_reduced_hidden_states; + infinicore::Tensor fused_moe_output; + + infinicore::Tensor sorted_token_ids; + infinicore::Tensor expert_ids; + infinicore::Tensor num_tokens_post_padded; + infinicore::Tensor expert_offsets; + infinicore::Tensor blockscale_offsets; + infinicore::Tensor problem_sizes1; + infinicore::Tensor problem_sizes2; + infinicore::Tensor input_permutation; + infinicore::Tensor output_permutation; + + size_t sorted_token_ids_capacity = 0; + size_t expert_ids_capacity = 0; + size_t ep_gathered_tokens_capacity = 0; + size_t ep_reduced_tokens_capacity = 0; + size_t fused_moe_output_tokens_capacity = 0; + size_t blockscale_offsets_capacity = 0; + size_t permutation_capacity = 0; + size_t prepared_num_experts = 0; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/common/topk_output.hpp b/csrc/layers/moe/common/topk_output.hpp new file mode 100644 index 000000000..eca114b32 --- /dev/null +++ b/csrc/layers/moe/common/topk_output.hpp @@ -0,0 +1,13 @@ +#pragma once + +#include "infinicore/tensor.hpp" + +namespace infinilm::layers::moe { + +struct TopKOutput { + infinicore::Tensor topk_weights; + infinicore::Tensor topk_ids; + infinicore::Tensor router_logits; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/dispatcher/base_dispatcher.hpp b/csrc/layers/moe/dispatcher/base_dispatcher.hpp new file mode 100644 index 000000000..9e878673e --- /dev/null +++ b/csrc/layers/moe/dispatcher/base_dispatcher.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include "../common/moe_types.hpp" + +#include "infinicore/device.hpp" + +namespace infinilm::layers::moe { + +class BaseDispatcher { +public: + virtual ~BaseDispatcher() = default; + + virtual void initialize(const infinicore::Device &device, + MoeWorkspace &workspace) { + (void)device; + (void)workspace; + } + + virtual DispatchOutput dispatch(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + MoeWorkspace &workspace) const = 0; + + virtual infinicore::Tensor combine(const CombineInput &combine_input, + MoeWorkspace &workspace) const = 0; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/dispatcher/dispatcher_factory.cpp b/csrc/layers/moe/dispatcher/dispatcher_factory.cpp new file mode 100644 index 000000000..b641132e2 --- /dev/null +++ b/csrc/layers/moe/dispatcher/dispatcher_factory.cpp @@ -0,0 +1,28 @@ +#include "dispatcher_factory.hpp" + +#include "standard_dispatcher.hpp" +#include "../ep/allgather_reduce_scatter_dispatcher.hpp" +#include "../ep/deepep_dispatcher.hpp" +#include "../ep/local_allreduce_dispatcher.hpp" + +#include + +namespace infinilm::layers::moe { + +std::shared_ptr make_dispatcher(const EPConfig &ep_config, + size_t num_experts) { + switch (ep_config.backend) { + case EPBackend::Disabled: + return std::make_shared(); + case EPBackend::AllGatherReduceScatter: + return std::make_shared(ep_config, num_experts); + case EPBackend::LocalAllReduce: + return std::make_shared(ep_config, num_experts); + case EPBackend::DeepEP: + return std::make_shared(ep_config); + default: + throw std::runtime_error("Unsupported MoE EP backend"); + } +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/dispatcher/dispatcher_factory.hpp b/csrc/layers/moe/dispatcher/dispatcher_factory.hpp new file mode 100644 index 000000000..dba06ed14 --- /dev/null +++ b/csrc/layers/moe/dispatcher/dispatcher_factory.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "base_dispatcher.hpp" +#include "../ep/ep_config.hpp" + +#include +#include + +namespace infinilm::layers::moe { + +std::shared_ptr make_dispatcher(const EPConfig &ep_config, + size_t num_experts); + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/dispatcher/standard_dispatcher.cpp b/csrc/layers/moe/dispatcher/standard_dispatcher.cpp new file mode 100644 index 000000000..27e5c0b23 --- /dev/null +++ b/csrc/layers/moe/dispatcher/standard_dispatcher.cpp @@ -0,0 +1,42 @@ +#include "standard_dispatcher.hpp" + +#include "../../../global_state/parallel_state.hpp" + +#include "infinicore/ops/distributed/allreduce.hpp" + +#include + +namespace infinilm::layers::moe { + +StandardDispatcher::StandardDispatcher() { + const auto &rank_info = infinilm::global_state::get_tensor_model_parallel_rank_info(); + tp_size_ = static_cast(rank_info.tp_size); + communicator_ = rank_info.comm; +} + +DispatchOutput StandardDispatcher::dispatch(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + MoeWorkspace &workspace) const { + (void)workspace; + return DispatchOutput{ + DispatchOutputFormat::Standard, + hidden_states, + infinicore::Tensor(), + topk_output, + }; +} + +infinicore::Tensor StandardDispatcher::combine(const CombineInput &combine_input, + MoeWorkspace &workspace) const { + (void)workspace; + if (tp_size_ > 1 && communicator_ != nullptr) { + infinicore::op::distributed::allreduce_( + combine_input.hidden_states, + combine_input.hidden_states, + INFINICCL_SUM, + communicator_); + } + return combine_input.hidden_states; +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/dispatcher/standard_dispatcher.hpp b/csrc/layers/moe/dispatcher/standard_dispatcher.hpp new file mode 100644 index 000000000..a10e1a44d --- /dev/null +++ b/csrc/layers/moe/dispatcher/standard_dispatcher.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include "base_dispatcher.hpp" +#include "../common/moe_types.hpp" + +#include + +namespace infinilm::layers::moe { + +class StandardDispatcher final : public BaseDispatcher { +public: + StandardDispatcher(); + + DispatchOutput dispatch(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + MoeWorkspace &workspace) const override; + + infinicore::Tensor combine(const CombineInput &combine_input, + MoeWorkspace &workspace) const override; + +private: + size_t tp_size_ = 1; + infinicclComm_t communicator_ = nullptr; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/allgather_reduce_scatter_dispatcher.cpp b/csrc/layers/moe/ep/allgather_reduce_scatter_dispatcher.cpp new file mode 100644 index 000000000..4741e5cf9 --- /dev/null +++ b/csrc/layers/moe/ep/allgather_reduce_scatter_dispatcher.cpp @@ -0,0 +1,162 @@ +#include "allgather_reduce_scatter_dispatcher.hpp" + +#include "infinicore/context/context.hpp" +#include "infinicore/ops/distributed/allgather.hpp" +#include "infinicore/ops/distributed/reduce_scatter.hpp" + +#include +#include + +namespace infinilm::layers::moe { +namespace { + +bool same_device(const infinicore::Tensor &tensor, const infinicore::Device &device) { + return tensor && + tensor->device().getType() == device.getType() && + tensor->device().getIndex() == device.getIndex(); +} + +void ensure_tensor(infinicore::Tensor &tensor, + const infinicore::Shape &shape, + infinicore::DataType dtype, + const infinicore::Device &device) { + if (!same_device(tensor, device) || + tensor->dtype() != dtype || + tensor->shape() != shape) { + if (infinicore::context::isGraphRecording()) { + throw std::runtime_error("MoE AG/RS workspace tensor was not initialized before graph capture"); + } + tensor = infinicore::Tensor::empty(shape, dtype, device); + } +} + +} // namespace + +AllGatherReduceScatterDispatcher::AllGatherReduceScatterDispatcher(EPConfig ep_config, + size_t num_experts) + : BaseEPDispatcher(std::move(ep_config), num_experts) {} + +void AllGatherReduceScatterDispatcher::all_gather_dim0_many_( + const std::vector &outputs, + const std::vector &inputs) const { + if (inputs.empty()) { + return; + } + const auto local_dim0 = inputs.front()->shape()[0]; + for (const auto &input : inputs) { + if (!input) { + throw std::runtime_error("MoE AG/RS all_gather_many does not support null tensors"); + } + if (input->ndim() == 0) { + throw std::runtime_error("MoE AG/RS all_gather_many expects tensors with dim 0"); + } + if (input->shape()[0] != local_dim0) { + throw std::runtime_error("MoE AG/RS all_gather_many expects all tensors to share dim 0"); + } + } + infinicore::op::distributed::allgatherv_many_( + outputs, + inputs, + equal_split_sizes(local_dim0), + communicator_); +} + +void AllGatherReduceScatterDispatcher::reduce_scatter_dim0_( + infinicore::Tensor output, + const infinicore::Tensor &input) const { + if (!input || !output) { + return; + } + if (input->ndim() == 0 || input->shape()[0] % config_.ep_size != 0) { + throw std::runtime_error("MoE AG/RS reduce_scatter expects dim 0 to be divisible by ep_size"); + } + const size_t local_dim0 = input->shape()[0] / config_.ep_size; + infinicore::op::distributed::reduce_scatterv_( + output, + input, + equal_split_sizes(local_dim0), + INFINICCL_SUM, + communicator_); +} + +DispatchOutput AllGatherReduceScatterDispatcher::dispatch( + const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + MoeWorkspace &workspace) const { + if (config_.ep_size == 1) { + return DispatchOutput{ + DispatchOutputFormat::Standard, + hidden_states, + infinicore::Tensor(), + topk_output, + infinicore::Tensor(), + }; + } + + const size_t local_tokens = hidden_states->shape()[0]; + const size_t global_tokens = local_tokens * config_.ep_size; + const auto device = hidden_states->device(); + + auto hidden_shape = hidden_states->shape(); + hidden_shape[0] = global_tokens; + auto topk_weights_shape = topk_output.topk_weights->shape(); + topk_weights_shape[0] = global_tokens; + auto topk_ids_shape = topk_output.topk_ids->shape(); + topk_ids_shape[0] = global_tokens; + + ensure_tensor(workspace.ep_gathered_hidden_states, hidden_shape, hidden_states->dtype(), device); + ensure_tensor(workspace.ep_gathered_topk_weights, topk_weights_shape, topk_output.topk_weights->dtype(), device); + ensure_tensor(workspace.ep_gathered_topk_ids, topk_ids_shape, topk_output.topk_ids->dtype(), device); + workspace.ep_gathered_tokens_capacity = global_tokens; + + all_gather_dim0_many_( + { + workspace.ep_gathered_hidden_states, + workspace.ep_gathered_topk_weights, + workspace.ep_gathered_topk_ids, + }, + { + hidden_states, + topk_output.topk_weights, + topk_output.topk_ids, + }); + + TopKOutput dispatched_topk{ + workspace.ep_gathered_topk_weights, + workspace.ep_gathered_topk_ids, + topk_output.router_logits, + }; + return DispatchOutput{ + DispatchOutputFormat::Standard, + workspace.ep_gathered_hidden_states, + infinicore::Tensor(), + dispatched_topk, + expert_map(hidden_states->device()), + }; +} + +infinicore::Tensor AllGatherReduceScatterDispatcher::combine( + const CombineInput &combine_input, + MoeWorkspace &workspace) const { + if (config_.ep_size == 1) { + return combine_input.hidden_states; + } + if (!combine_input.hidden_states || combine_input.hidden_states->ndim() == 0 || + combine_input.hidden_states->shape()[0] % config_.ep_size != 0) { + throw std::runtime_error("MoE AG/RS combine expects hidden_states dim 0 to be divisible by ep_size"); + } + + auto output_shape = combine_input.hidden_states->shape(); + output_shape[0] /= config_.ep_size; + ensure_tensor( + workspace.ep_reduced_hidden_states, + output_shape, + combine_input.hidden_states->dtype(), + combine_input.hidden_states->device()); + workspace.ep_reduced_tokens_capacity = output_shape[0]; + + reduce_scatter_dim0_(workspace.ep_reduced_hidden_states, combine_input.hidden_states); + return workspace.ep_reduced_hidden_states; +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/allgather_reduce_scatter_dispatcher.hpp b/csrc/layers/moe/ep/allgather_reduce_scatter_dispatcher.hpp new file mode 100644 index 000000000..6c2d45ddc --- /dev/null +++ b/csrc/layers/moe/ep/allgather_reduce_scatter_dispatcher.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include "base_ep_dispatcher.hpp" + +#include + +namespace infinilm::layers::moe { + +class AllGatherReduceScatterDispatcher final : public BaseEPDispatcher { +public: + AllGatherReduceScatterDispatcher(EPConfig ep_config, size_t num_experts); + + DispatchOutput dispatch(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + MoeWorkspace &workspace) const override; + + infinicore::Tensor combine(const CombineInput &combine_input, + MoeWorkspace &workspace) const override; + +private: + void all_gather_dim0_many_(const std::vector &outputs, + const std::vector &inputs) const; + void reduce_scatter_dim0_(infinicore::Tensor output, + const infinicore::Tensor &input) const; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/base_ep_dispatcher.cpp b/csrc/layers/moe/ep/base_ep_dispatcher.cpp new file mode 100644 index 000000000..e6dac79eb --- /dev/null +++ b/csrc/layers/moe/ep/base_ep_dispatcher.cpp @@ -0,0 +1,71 @@ +#include "base_ep_dispatcher.hpp" + +#include "../../../global_state/parallel_state.hpp" +#include "infinicore/context/context.hpp" + +#include +#include + +namespace infinilm::layers::moe { + +BaseEPDispatcher::BaseEPDispatcher(EPConfig ep_config, size_t num_experts) + : config_(std::move(ep_config)), + num_experts_(num_experts) { + const auto &rank_info = infinilm::global_state::get_tensor_model_parallel_rank_info(); + if (config_.ep_size != static_cast(rank_info.tp_size) || + config_.ep_rank != static_cast(rank_info.tp_rank)) { + throw std::runtime_error( + "MoE EP currently reuses the tensor parallel communication group, " + "so EP size/rank must match TP size/rank"); + } + if (config_.ep_size > 1) { + communicator_ = rank_info.comm; + if (communicator_ == nullptr) { + throw std::runtime_error("MoE EP requires a valid communication group when ep_size > 1"); + } + } +} + +void BaseEPDispatcher::initialize(const infinicore::Device &device, + MoeWorkspace &workspace) { + (void)workspace; + if (config_.ep_size > 1) { + (void)expert_map(device); + } +} + +std::vector BaseEPDispatcher::equal_split_sizes(size_t local_dim0) const { + return std::vector(config_.ep_size, local_dim0); +} + +infinicore::Tensor BaseEPDispatcher::expert_map(const infinicore::Device &device) const { + if (config_.ep_size == 1) { + return infinicore::Tensor(); + } + if (expert_map_ && expert_map_->device().getType() == device.getType() && + expert_map_->device().getIndex() == device.getIndex()) { + return expert_map_; + } + + const ExpertPlacement placement = make_expert_placement(config_, num_experts_); + std::vector map(num_experts_, -1); + for (size_t global_expert = placement.local_expert_start; + global_expert < placement.local_expert_end; + ++global_expert) { + map[global_expert] = static_cast(placement.global_to_local(global_expert)); + } + + if (infinicore::context::isGraphRecording()) { + throw std::runtime_error("MoE EP expert_map was not initialized before graph capture"); + } + auto cpu = infinicore::Tensor::from_blob( + map.data(), + {num_experts_}, + infinicore::DataType::I32, + infinicore::Device(infinicore::Device::Type::CPU, 0)); + expert_map_ = infinicore::Tensor::empty({num_experts_}, infinicore::DataType::I32, device); + expert_map_->copy_from(cpu); + return expert_map_; +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/base_ep_dispatcher.hpp b/csrc/layers/moe/ep/base_ep_dispatcher.hpp new file mode 100644 index 000000000..d5d9b75e5 --- /dev/null +++ b/csrc/layers/moe/ep/base_ep_dispatcher.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include "../dispatcher/base_dispatcher.hpp" +#include "ep_config.hpp" + +#include +#include + +namespace infinilm::layers::moe { + +class BaseEPDispatcher : public BaseDispatcher { +public: + BaseEPDispatcher(EPConfig ep_config, size_t num_experts); + + void initialize(const infinicore::Device &device, + MoeWorkspace &workspace) override; + +protected: + std::vector equal_split_sizes(size_t local_dim0) const; + infinicore::Tensor expert_map(const infinicore::Device &device) const; + + EPConfig config_; + size_t num_experts_ = 0; + infinicclComm_t communicator_ = nullptr; + +private: + mutable infinicore::Tensor expert_map_; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/deepep_dispatcher.cpp b/csrc/layers/moe/ep/deepep_dispatcher.cpp new file mode 100644 index 000000000..d4d960c29 --- /dev/null +++ b/csrc/layers/moe/ep/deepep_dispatcher.cpp @@ -0,0 +1,47 @@ +#include "deepep_dispatcher.hpp" + +#include + +namespace infinilm::layers::moe { + +DeepEPDispatcher::DeepEPDispatcher(EPConfig config) + : config_(config) {} + +DispatchOutput DeepEPDispatcher::dispatch(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + MoeWorkspace &workspace) const { + (void)workspace; + dispatch_a(hidden_states, topk_output); + return dispatch_b(); +} + +infinicore::Tensor DeepEPDispatcher::combine(const CombineInput &combine_input, + MoeWorkspace &workspace) const { + (void)workspace; + combine_a(combine_input); + return combine_b(); +} + +void DeepEPDispatcher::dispatch_a(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output) const { + (void)config_; + (void)hidden_states; + (void)topk_output; + throw std::runtime_error("DeepEPDispatcher::dispatch_a is reserved but not implemented yet"); +} + +DispatchOutput DeepEPDispatcher::dispatch_b() const { + throw std::runtime_error("DeepEPDispatcher::dispatch_b is reserved but not implemented yet"); +} + +void DeepEPDispatcher::combine_a(const CombineInput &combine_input) const { + (void)config_; + (void)combine_input; + throw std::runtime_error("DeepEPDispatcher::combine_a is reserved but not implemented yet"); +} + +infinicore::Tensor DeepEPDispatcher::combine_b() const { + throw std::runtime_error("DeepEPDispatcher::combine_b is reserved but not implemented yet"); +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/deepep_dispatcher.hpp b/csrc/layers/moe/ep/deepep_dispatcher.hpp new file mode 100644 index 000000000..842cfef02 --- /dev/null +++ b/csrc/layers/moe/ep/deepep_dispatcher.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include "../dispatcher/base_dispatcher.hpp" +#include "ep_config.hpp" + +namespace infinilm::layers::moe { + +class DeepEPDispatcher final : public BaseDispatcher { +public: + explicit DeepEPDispatcher(EPConfig config); + + DispatchOutput dispatch(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + MoeWorkspace &workspace) const override; + + infinicore::Tensor combine(const CombineInput &combine_input, + MoeWorkspace &workspace) const override; + + void dispatch_a(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output) const; + DispatchOutput dispatch_b() const; + + void combine_a(const CombineInput &combine_input) const; + infinicore::Tensor combine_b() const; + +private: + EPConfig config_; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/ep_config.cpp b/csrc/layers/moe/ep/ep_config.cpp new file mode 100644 index 000000000..2b2eac369 --- /dev/null +++ b/csrc/layers/moe/ep/ep_config.cpp @@ -0,0 +1,131 @@ +#include "ep_config.hpp" + +#include "../../../global_state/parallel_state.hpp" + +#include +#include +#include +#include + +namespace infinilm::layers::moe { + +namespace { + +std::string lower_string(std::string value) { + std::transform(value.begin(), value.end(), value.begin(), [](unsigned char c) { + return static_cast(std::tolower(c)); + }); + return value; +} + +EPBackend parse_backend(const std::string &backend) { + if (backend.empty() || backend == "0" || backend == "none" || backend == "off" || backend == "disabled" || backend == "standard") { + return EPBackend::Disabled; + } + if (backend == "naive" || backend == "ag_rs" || backend == "allgather_reducescatter" || backend == "all_gather_reduce_scatter") { + return EPBackend::AllGatherReduceScatter; + } + if (backend == "local_allreduce" || backend == "local_all_reduce" || backend == "tp_ep" || backend == "vllm_tp" || backend == "dp1") { + return EPBackend::LocalAllReduce; + } + if (backend == "deepep" || backend == "deep_ep") { + return EPBackend::DeepEP; + } + throw std::runtime_error("Unsupported MoE EP backend: " + backend); +} + +DeepEPMode parse_deepep_mode(const std::string &mode) { + if (mode.empty() || mode == "auto") { + return DeepEPMode::Auto; + } + if (mode == "normal") { + return DeepEPMode::Normal; + } + if (mode == "low_latency" || mode == "lowlatency" || mode == "ll") { + return DeepEPMode::LowLatency; + } + throw std::runtime_error("Unsupported MoE DeepEP mode: " + mode); +} + +} // namespace + +const char *ep_backend_name(EPBackend backend) { + switch (backend) { + case EPBackend::Disabled: + return "disabled"; + case EPBackend::AllGatherReduceScatter: + return "allgather_reducescatter"; + case EPBackend::LocalAllReduce: + return "local_allreduce"; + case EPBackend::DeepEP: + return "deepep"; + } + return "unknown"; +} + +EPConfig make_ep_config(const std::shared_ptr &model_config) { + EPConfig config; + auto &config_json = model_config->get_config_json(); + const std::string backend = config_json.contains("moe_ep_backend") && !config_json["moe_ep_backend"].is_null() + ? lower_string(config_json["moe_ep_backend"].get()) + : std::string(); + config.backend = parse_backend(backend); + + if (config.backend == EPBackend::Disabled) { + return config; + } + + const std::string deepep_mode = config_json.contains("moe_deepep_mode") && !config_json["moe_deepep_mode"].is_null() + ? lower_string(config_json["moe_deepep_mode"].get()) + : std::string(); + config.deepep_mode = parse_deepep_mode(deepep_mode); + + const size_t tp_size = infinilm::global_state::get_tensor_model_parallel_world_size(); + const size_t tp_rank = infinilm::global_state::get_tensor_model_parallel_rank(); + config.ep_size = config_json.contains("moe_ep_size") && !config_json["moe_ep_size"].is_null() + ? config_json["moe_ep_size"].get() + : tp_size; + config.ep_rank = tp_rank; + + if (config.ep_size == 0) { + throw std::runtime_error("MoE EP size must be greater than 0"); + } + if (config.ep_rank >= config.ep_size) { + throw std::runtime_error("MoE EP rank must be smaller than EP size"); + } + return config; +} + +ExpertPlacement make_expert_placement(const EPConfig &config, + size_t global_num_experts) { + if (global_num_experts == 0) { + throw std::runtime_error("MoE expert placement requires at least one expert"); + } + if (config.backend == EPBackend::Disabled || config.ep_size == 1) { + return ExpertPlacement{ + global_num_experts, + global_num_experts, + 0, + global_num_experts, + }; + } + if (config.ep_size == 0) { + throw std::runtime_error("MoE EP size must be greater than 0"); + } + if (config.ep_rank >= config.ep_size) { + throw std::runtime_error("MoE EP rank must be smaller than EP size"); + } + + const size_t base_experts = global_num_experts / config.ep_size; + const size_t remainder = global_num_experts % config.ep_size; + const size_t local_num_experts = base_experts + (config.ep_rank < remainder ? 1 : 0); + const size_t local_expert_start = config.ep_rank * base_experts + std::min(config.ep_rank, remainder); + return ExpertPlacement{ + global_num_experts, + local_num_experts, + local_expert_start, + local_expert_start + local_num_experts, + }; +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/ep_config.hpp b/csrc/layers/moe/ep/ep_config.hpp new file mode 100644 index 000000000..b6e4d369f --- /dev/null +++ b/csrc/layers/moe/ep/ep_config.hpp @@ -0,0 +1,52 @@ +#pragma once + +#include "../../../config/model_config.hpp" + +#include +#include + +namespace infinilm::layers::moe { + +enum class EPBackend { + Disabled, + AllGatherReduceScatter, + LocalAllReduce, + DeepEP, +}; + +enum class DeepEPMode { + Normal, + LowLatency, + Auto, +}; + +struct EPConfig { + EPBackend backend = EPBackend::Disabled; + size_t ep_rank = 0; + size_t ep_size = 1; + DeepEPMode deepep_mode = DeepEPMode::Auto; +}; + +struct ExpertPlacement { + size_t global_num_experts = 0; + size_t local_num_experts = 0; + size_t local_expert_start = 0; + size_t local_expert_end = 0; + + bool owns(size_t global_expert) const { + return global_expert >= local_expert_start && global_expert < local_expert_end; + } + + size_t global_to_local(size_t global_expert) const { + return global_expert - local_expert_start; + } +}; + +const char *ep_backend_name(EPBackend backend); + +EPConfig make_ep_config(const std::shared_ptr &model_config); + +ExpertPlacement make_expert_placement(const EPConfig &config, + size_t global_num_experts); + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/local_allreduce_dispatcher.cpp b/csrc/layers/moe/ep/local_allreduce_dispatcher.cpp new file mode 100644 index 000000000..e9bf28761 --- /dev/null +++ b/csrc/layers/moe/ep/local_allreduce_dispatcher.cpp @@ -0,0 +1,54 @@ +#include "local_allreduce_dispatcher.hpp" + +#include "infinicore/ops/distributed/allreduce.hpp" + +#include + +namespace infinilm::layers::moe { + +LocalAllReduceDispatcher::LocalAllReduceDispatcher(EPConfig ep_config, + size_t num_experts) + : BaseEPDispatcher(std::move(ep_config), num_experts) {} + +void LocalAllReduceDispatcher::allreduce_(infinicore::Tensor tensor) const { + if (!tensor) { + return; + } + infinicore::op::distributed::allreduce_(tensor, tensor, INFINICCL_SUM, communicator_); +} + +DispatchOutput LocalAllReduceDispatcher::dispatch( + const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + MoeWorkspace &workspace) const { + (void)workspace; + if (config_.ep_size == 1) { + return DispatchOutput{ + DispatchOutputFormat::Standard, + hidden_states, + infinicore::Tensor(), + topk_output, + infinicore::Tensor(), + }; + } + return DispatchOutput{ + DispatchOutputFormat::Standard, + hidden_states, + infinicore::Tensor(), + topk_output, + expert_map(hidden_states->device()), + }; +} + +infinicore::Tensor LocalAllReduceDispatcher::combine( + const CombineInput &combine_input, + MoeWorkspace &workspace) const { + (void)workspace; + if (config_.ep_size == 1) { + return combine_input.hidden_states; + } + allreduce_(combine_input.hidden_states); + return combine_input.hidden_states; +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/ep/local_allreduce_dispatcher.hpp b/csrc/layers/moe/ep/local_allreduce_dispatcher.hpp new file mode 100644 index 000000000..7304ea623 --- /dev/null +++ b/csrc/layers/moe/ep/local_allreduce_dispatcher.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "base_ep_dispatcher.hpp" + +namespace infinilm::layers::moe { + +class LocalAllReduceDispatcher final : public BaseEPDispatcher { +public: + LocalAllReduceDispatcher(EPConfig ep_config, size_t num_experts); + + DispatchOutput dispatch(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + MoeWorkspace &workspace) const override; + + infinicore::Tensor combine(const CombineInput &combine_input, + MoeWorkspace &workspace) const override; + +private: + void allreduce_(infinicore::Tensor tensor) const; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/experts/fused_moe_experts.cpp b/csrc/layers/moe/experts/fused_moe_experts.cpp new file mode 100644 index 000000000..baee31f7b --- /dev/null +++ b/csrc/layers/moe/experts/fused_moe_experts.cpp @@ -0,0 +1,78 @@ +#include "fused_moe_experts.hpp" + +#include "../../../global_state/global_state.hpp" +#include "../ep/ep_config.hpp" + +#include + +namespace infinilm::layers::moe { + +FusedMoeExperts::FusedMoeExperts(std::shared_ptr model_config, + const infinicore::Device &device) { + num_experts_ = model_config->get("num_experts"); + hidden_size_ = model_config->get("hidden_size"); + const size_t intermediate_size = model_config->get("moe_intermediate_size"); + const auto dtype = model_config->get_dtype(); + ASSERT(num_experts_ > 0); + + const auto ep_config = make_ep_config(model_config); + const auto expert_placement = make_expert_placement(ep_config, num_experts_); + const size_t num_local_experts = expert_placement.local_num_experts; + + const engine::distributed::RankInfo &rank_info = infinilm::global_state::get_tensor_model_parallel_rank_info(); + const size_t tp_rank = static_cast(rank_info.tp_rank); + const size_t tp_size = static_cast(rank_info.tp_size); + const bool ep_enabled = ep_config.backend != EPBackend::Disabled; + if (ep_enabled) { + intermediate_size_per_partition_ = intermediate_size; + } else { + ASSERT(intermediate_size % tp_size == 0); + intermediate_size_per_partition_ = intermediate_size / tp_size; + } + const size_t expert_tp_rank = ep_enabled ? 0 : tp_rank; + const size_t expert_tp_size = ep_enabled ? 1 : tp_size; + + w13_weight_ = infinicore::nn::Parameter( + {num_local_experts, intermediate_size_per_partition_ * 2, hidden_size_}, + dtype, + device); + w2_weight_ = infinicore::nn::Parameter( + {num_local_experts, hidden_size_, intermediate_size_per_partition_}, + dtype, + device); + this->register_parameter("w13_weight", w13_weight_); + this->register_parameter("w2_weight", w2_weight_); + + for (size_t local_expert = 0; local_expert < num_local_experts; ++local_expert) { + const size_t global_expert = expert_placement.local_expert_start + local_expert; + auto gate_weight = w13_weight_ + ->narrow({{0, local_expert, 1}, {1, 0, intermediate_size_per_partition_}}) + ->squeeze(0); + auto up_weight = w13_weight_ + ->narrow({{0, local_expert, 1}, {1, intermediate_size_per_partition_, intermediate_size_per_partition_}}) + ->squeeze(0); + auto down_weight = w2_weight_ + ->narrow({{0, local_expert, 1}}) + ->squeeze(0); + + const std::string prefix = std::to_string(global_expert) + "."; + this->register_parameter( + prefix + "gate_proj.weight", + infinicore::nn::Parameter(gate_weight, 0, expert_tp_rank, expert_tp_size)); + this->register_parameter( + prefix + "up_proj.weight", + infinicore::nn::Parameter(up_weight, 0, expert_tp_rank, expert_tp_size)); + this->register_parameter( + prefix + "down_proj.weight", + infinicore::nn::Parameter(down_weight, 1, expert_tp_rank, expert_tp_size)); + } + + moe_weights_.packed_w13 = w13_weight_; + moe_weights_.packed_w2 = w2_weight_; +} + +const MoeWeights &FusedMoeExperts::moe_weights() const { + return moe_weights_; +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/experts/fused_moe_experts.hpp b/csrc/layers/moe/experts/fused_moe_experts.hpp new file mode 100644 index 000000000..3f231ec64 --- /dev/null +++ b/csrc/layers/moe/experts/fused_moe_experts.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include "../common/moe_types.hpp" + +#include "../../../config/model_config.hpp" +#include "infinicore/nn/module.hpp" + +#include +#include + +namespace infinilm::layers::moe { + +class FusedMoeExperts : public infinicore::nn::Module { +public: + FusedMoeExperts(std::shared_ptr model_config, + const infinicore::Device &device); + + const MoeWeights &moe_weights() const; + +protected: + INFINICORE_NN_PARAMETER(w13_weight); + INFINICORE_NN_PARAMETER(w2_weight); + + size_t num_experts_{0}; + size_t hidden_size_{0}; + size_t intermediate_size_per_partition_{0}; + MoeWeights moe_weights_; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/fused_moe.cpp b/csrc/layers/moe/fused_moe.cpp new file mode 100644 index 000000000..867224777 --- /dev/null +++ b/csrc/layers/moe/fused_moe.cpp @@ -0,0 +1,49 @@ +#include "fused_moe.hpp" + +#include "dispatcher/dispatcher_factory.hpp" +#include "ep/ep_config.hpp" +#include "runner/cuda_fused_moe_runner.hpp" + +#include "../../global_state/parallel_state.hpp" + +#include + +namespace infinilm::layers::moe { + +FusedMoE::FusedMoE(std::shared_ptr model_config, + const infinicore::Device &device, + size_t layer_id) { + (void)layer_id; + + const EPConfig ep_config = make_ep_config(model_config); + const size_t num_experts = model_config->get("num_experts"); + const ExpertPlacement expert_placement = make_expert_placement(ep_config, num_experts); + const size_t hidden_size = model_config->get("hidden_size"); + const size_t tp_size = infinilm::global_state::get_tensor_model_parallel_world_size(); + const size_t intermediate_size = model_config->get("moe_intermediate_size"); + size_t intermediate_size_per_partition = intermediate_size; + if (ep_config.backend == EPBackend::Disabled) { + if (intermediate_size % tp_size != 0) { + throw std::runtime_error("moe_intermediate_size must be divisible by tensor parallel world size"); + } + intermediate_size_per_partition = intermediate_size / tp_size; + } + + dispatcher_ = make_dispatcher(ep_config, num_experts); + runner_ = std::make_shared( + expert_placement.local_num_experts, + hidden_size, + intermediate_size_per_partition, + model_config->get_or("moe_align_block_size", 16)); + dispatcher_->initialize(device, workspace_); +} + +infinicore::Tensor FusedMoE::forward(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + const MoeWeights &weights) const { + auto dispatch_output = dispatcher_->dispatch(hidden_states, topk_output, workspace_); + auto combine_input = runner_->run(dispatch_output, weights, workspace_); + return dispatcher_->combine(combine_input, workspace_); +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/fused_moe.hpp b/csrc/layers/moe/fused_moe.hpp new file mode 100644 index 000000000..81b3cad77 --- /dev/null +++ b/csrc/layers/moe/fused_moe.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include "common/moe_types.hpp" +#include "dispatcher/base_dispatcher.hpp" +#include "runner/base_runner.hpp" + +#include "../../config/model_config.hpp" +#include "infinicore/nn/module.hpp" + +#include + +namespace infinilm::layers::moe { + +class FusedMoE final : public infinicore::nn::Module { +public: + FusedMoE(std::shared_ptr model_config, + const infinicore::Device &device, + size_t layer_id = 0); + + infinicore::Tensor forward(const infinicore::Tensor &hidden_states, + const TopKOutput &topk_output, + const MoeWeights &weights) const; + +private: + std::shared_ptr dispatcher_; + std::shared_ptr runner_; + mutable MoeWorkspace workspace_; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/mlp/moe_mlp.cpp b/csrc/layers/moe/legacy/moe_mlp.cpp similarity index 92% rename from csrc/layers/mlp/moe_mlp.cpp rename to csrc/layers/moe/legacy/moe_mlp.cpp index 9fec6be84..2c35eceb9 100644 --- a/csrc/layers/mlp/moe_mlp.cpp +++ b/csrc/layers/moe/legacy/moe_mlp.cpp @@ -1,8 +1,9 @@ #include "moe_mlp.hpp" -#include "../../global_state/global_state.hpp" + +#include "../../../global_state/global_state.hpp" #include "infinicore/ops.hpp" -namespace infinilm::layers::moe_mlp { +namespace infinilm::layers::moe::legacy { MoeMLP::MoeMLP(std::shared_ptr model_config, const infinicore::Device &device) { @@ -36,4 +37,5 @@ infinicore::Tensor MoeMLP::forward(const infinicore::Tensor &hidden_states) cons auto output = down_proj_->forward(intermediate); return output; } -} // namespace infinilm::layers::moe_mlp + +} // namespace infinilm::layers::moe::legacy diff --git a/csrc/layers/mlp/moe_mlp.hpp b/csrc/layers/moe/legacy/moe_mlp.hpp similarity index 86% rename from csrc/layers/mlp/moe_mlp.hpp rename to csrc/layers/moe/legacy/moe_mlp.hpp index 45bbe7459..77d39944d 100644 --- a/csrc/layers/mlp/moe_mlp.hpp +++ b/csrc/layers/moe/legacy/moe_mlp.hpp @@ -1,10 +1,10 @@ #pragma once -#include "../../config/model_config.hpp" -#include "../linear/linear.hpp" +#include "../../../config/model_config.hpp" +#include "../../linear/linear.hpp" #include "infinicore/nn/module.hpp" -namespace infinilm::layers::moe_mlp { +namespace infinilm::layers::moe::legacy { class MoeMLP : public infinicore::nn::Module { public: @@ -30,4 +30,4 @@ class MoeMLP : public infinicore::nn::Module { bool use_bias_; }; -} // namespace infinilm::layers::moe_mlp +} // namespace infinilm::layers::moe::legacy diff --git a/csrc/layers/moe/router/topk_router.cpp b/csrc/layers/moe/router/topk_router.cpp new file mode 100644 index 000000000..fe87782b7 --- /dev/null +++ b/csrc/layers/moe/router/topk_router.cpp @@ -0,0 +1,131 @@ +#include "topk_router.hpp" + +#include "infinicore/ops.hpp" + +#include +#include + +namespace infinilm::layers::moe { +namespace { + +TopKRouterBackend parse_router_backend(const std::string &backend) { + if (backend == "softmax") { + return TopKRouterBackend::Softmax; + } + if (backend == "sigmoid") { + return TopKRouterBackend::Sigmoid; + } + if (backend == "fused_gate" || backend == "noaux_tc") { + return TopKRouterBackend::FusedGate; + } + throw std::runtime_error("Unsupported MoE router backend: " + backend); +} + +std::string router_backend_name(const std::shared_ptr &model_config) { + auto &config_json = model_config->get_config_json(); + if (config_json.contains("moe_router_backend") && !config_json["moe_router_backend"].is_null()) { + return config_json["moe_router_backend"].get(); + } + if (config_json.contains("topk_method") && !config_json["topk_method"].is_null() && + config_json["topk_method"].get() == "noaux_tc") { + return "fused_gate"; + } + if (config_json.contains("scoring_func") && !config_json["scoring_func"].is_null()) { + return config_json["scoring_func"].get(); + } + return "softmax"; +} + +} // namespace + +TopKRouter::TopKRouter(std::shared_ptr model_config, + const infinicore::Device &device) { + num_experts_ = model_config->get("num_experts"); + num_experts_per_tok_ = model_config->get("num_experts_per_tok"); + norm_topk_prob_ = model_config->get_or("norm_topk_prob", false); + routed_scaling_factor_ = model_config->get_or("routed_scaling_factor", 1.0f); + moe_softcapping_ = model_config->get_or("moe_softcapping", 0.0f); + num_expert_group_ = model_config->get_or_alias("num_expert_group", "n_group", 0); + topk_group_ = model_config->get_or("topk_group", 0); + num_fused_shared_experts_ = model_config->get_or("num_fused_shared_experts", 0); + apply_routed_scaling_factor_on_output_ = + model_config->get_or("apply_routed_scaling_factor_on_output", false); + router_backend_ = parse_router_backend(router_backend_name(model_config)); + use_correction_bias_ = + model_config->contains_non_null("e_score_correction_bias") || + router_backend_ == TopKRouterBackend::FusedGate || + model_config->get_or("moe_router_use_correction_bias", false); + ASSERT((num_experts_ > 0) && (num_experts_per_tok_ > 0) && (num_experts_per_tok_ <= num_experts_)); + + INFINICORE_NN_PARAMETER_INIT( + weight, + ({num_experts_, model_config->get("hidden_size")}, model_config->get_dtype(), device)); + + if (use_correction_bias_) { + INFINICORE_NN_PARAMETER_INIT(e_score_correction_bias, ({num_experts_}, infinicore::DataType::F32, device)); + } + + if (router_backend_ == TopKRouterBackend::FusedGate) { + if (!use_correction_bias_) { + throw std::runtime_error("fused_gate MoE router requires correction bias"); + } + if (num_expert_group_ == 0 || topk_group_ == 0) { + throw std::runtime_error("fused_gate MoE router requires num_expert_group/n_group and topk_group"); + } + if (num_experts_ % num_expert_group_ != 0) { + throw std::runtime_error("fused_gate MoE router requires num_experts divisible by num_expert_group"); + } + if (num_experts_per_tok_ <= num_fused_shared_experts_) { + throw std::runtime_error("fused_gate MoE router requires num_experts_per_tok > num_fused_shared_experts"); + } + } +} + +std::tuple TopKRouter::forward(const infinicore::Tensor &hidden_states) const { + ASSERT(hidden_states->ndim() == 2); + + size_t ntoken = hidden_states->shape()[0]; + auto router_logits = infinicore::op::linear(hidden_states, weight_, std::nullopt, 1.0f); + + auto router_scores = infinicore::Tensor::empty({ntoken, num_experts_per_tok_}, infinicore::DataType::F32, hidden_states->device()); + auto router_indices = infinicore::Tensor::empty({ntoken, num_experts_per_tok_}, infinicore::DataType::I32, hidden_states->device()); + + const infinicore::Tensor correction_bias = + use_correction_bias_ ? static_cast(e_score_correction_bias_) : infinicore::Tensor(); + + switch (router_backend_) { + case TopKRouterBackend::Softmax: + infinicore::op::moe_topk_softmax_( + router_scores, + router_indices, + router_logits, + correction_bias, + norm_topk_prob_, + moe_softcapping_); + break; + case TopKRouterBackend::Sigmoid: + infinicore::op::moe_topk_sigmoid_( + router_scores, + router_indices, + router_logits, + correction_bias, + norm_topk_prob_); + break; + case TopKRouterBackend::FusedGate: + infinicore::op::moe_fused_gate_( + router_scores, + router_indices, + router_logits, + e_score_correction_bias_, + num_expert_group_, + topk_group_, + num_fused_shared_experts_, + routed_scaling_factor_, + apply_routed_scaling_factor_on_output_); + break; + } + + return std::make_tuple(router_scores, router_indices); +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/router/topk_router.hpp b/csrc/layers/moe/router/topk_router.hpp new file mode 100644 index 000000000..ce1d468b7 --- /dev/null +++ b/csrc/layers/moe/router/topk_router.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include "../../../config/model_config.hpp" +#include "../../common_modules.hpp" + +#include +#include +#include + +namespace infinilm::layers::moe { + +enum class TopKRouterBackend { + Softmax, + Sigmoid, + FusedGate, +}; + +class TopKRouter : public infinicore::nn::Module { +public: + TopKRouter(std::shared_ptr model_config, + const infinicore::Device &device); + + std::tuple forward(const infinicore::Tensor &hidden_states) const; + +protected: + INFINICORE_NN_PARAMETER(weight); + INFINICORE_NN_PARAMETER(e_score_correction_bias); + + size_t num_experts_{0}; + size_t num_experts_per_tok_{0}; + size_t num_expert_group_{0}; + size_t topk_group_{0}; + size_t num_fused_shared_experts_{0}; + float routed_scaling_factor_{1.0f}; + float moe_softcapping_{0.0f}; + bool norm_topk_prob_{false}; + bool apply_routed_scaling_factor_on_output_{false}; + bool use_correction_bias_{false}; + TopKRouterBackend router_backend_{TopKRouterBackend::Softmax}; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/runner/base_runner.hpp b/csrc/layers/moe/runner/base_runner.hpp new file mode 100644 index 000000000..9930cacbd --- /dev/null +++ b/csrc/layers/moe/runner/base_runner.hpp @@ -0,0 +1,16 @@ +#pragma once + +#include "../common/moe_types.hpp" + +namespace infinilm::layers::moe { + +class MoeRunnerCore { +public: + virtual ~MoeRunnerCore() = default; + + virtual CombineInput run(const DispatchOutput &dispatch_output, + const MoeWeights &weights, + MoeWorkspace &workspace) const = 0; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/runner/cuda_fused_moe_runner.cpp b/csrc/layers/moe/runner/cuda_fused_moe_runner.cpp new file mode 100644 index 000000000..a959208be --- /dev/null +++ b/csrc/layers/moe/runner/cuda_fused_moe_runner.cpp @@ -0,0 +1,211 @@ +#include "cuda_fused_moe_runner.hpp" + +#include "infinicore/ops/moe_align.hpp" +#include "infinicore/ops/moe_fused_dense.hpp" +#include "infinicore/context/context.hpp" + +#include +#include +#include +#include + +namespace infinilm::layers::moe { + +CudaFusedMoeRunner::CudaFusedMoeRunner(size_t num_local_experts, + size_t hidden_size, + size_t intermediate_size_per_partition, + size_t align_block_size) + : num_local_experts_(num_local_experts), + hidden_size_(hidden_size), + intermediate_size_per_partition_(intermediate_size_per_partition), + align_block_size_(align_block_size) {} + +namespace { + +bool same_device(const infinicore::Tensor &tensor, const infinicore::Device &device) { + return tensor && + tensor->device().getType() == device.getType() && + tensor->device().getIndex() == device.getIndex(); +} + +void ensure_tensor(infinicore::Tensor &tensor, + const infinicore::Shape &shape, + infinicore::DataType dtype, + const infinicore::Device &device) { + if (!same_device(tensor, device) || + tensor->dtype() != dtype || + tensor->shape() != shape) { + if (infinicore::context::isGraphRecording()) { + throw std::runtime_error("MoE runner workspace tensor was not initialized before graph capture"); + } + tensor = infinicore::Tensor::empty(shape, dtype, device); + } +} + +std::string shape_to_string(const infinicore::Shape &shape) { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < shape.size(); ++i) { + if (i != 0) { + oss << ", "; + } + oss << shape[i]; + } + oss << "]"; + return oss.str(); +} + +void check_packed_weight_tensor(const infinicore::Tensor &tensor, + const std::string &name, + const infinicore::Device &device, + const infinicore::DataType dtype, + const infinicore::Shape &shape) { + if (!tensor) { + throw std::runtime_error("MoE fused dense core requires " + name); + } + if (tensor->device().getType() != device.getType() || tensor->device().getIndex() != device.getIndex()) { + throw std::runtime_error("MoE fused dense core requires packed weights on the hidden_states device"); + } + if (tensor->dtype() != dtype) { + throw std::runtime_error("MoE fused dense core requires packed weights to have the same dtype as hidden_states"); + } + if (tensor->shape() != shape) { + throw std::runtime_error( + "MoE fused dense core packed weight shape mismatch for " + name + + ": expected " + shape_to_string(shape) + + ", got " + shape_to_string(tensor->shape())); + } +} + +} // namespace + +CombineInput CudaFusedMoeRunner::run(const DispatchOutput &dispatch_output, + const MoeWeights &weights, + MoeWorkspace &workspace) const { + auto runner_input = prepare_runner_input( + dispatch_output, + workspace); + + auto runner_output = run_fused_core(runner_input, weights, workspace); + + return CombineInput{ + CombineInputFormat::Standard, + runner_output.hidden_states, + runner_input.topk_output, + runner_input.routing_metadata, + }; +} + +CudaFusedMoeRunnerInput CudaFusedMoeRunner::prepare_runner_input(const DispatchOutput &dispatch_output, + MoeWorkspace &workspace) const { + const auto &topk_ids = dispatch_output.topk_output.topk_ids; + const auto &topk_shape = topk_ids->shape(); + if (topk_shape.size() != 2) { + throw std::runtime_error("MoE runner requires topk_ids to be a 2D tensor"); + } + const size_t num_pairs = topk_shape[0] * topk_shape[1]; + const size_t block_size = align_block_size_; + const size_t align_num_experts = num_local_experts_ + 1; + const size_t max_num_tokens_padded = + num_pairs < align_num_experts + ? num_pairs * block_size + : num_pairs + align_num_experts * (block_size - 1); + const size_t sorted_token_ids_capacity = ((max_num_tokens_padded + 3) / 4) * 4; + const size_t max_num_blocks = (max_num_tokens_padded + block_size - 1) / block_size; + const auto device = topk_ids->device(); + + if (!workspace.sorted_token_ids || workspace.sorted_token_ids_capacity < sorted_token_ids_capacity) { + if (infinicore::context::isGraphRecording()) { + throw std::runtime_error("MoE sorted_token_ids workspace was not initialized before graph capture"); + } + workspace.sorted_token_ids = infinicore::Tensor::empty( + {sorted_token_ids_capacity}, infinicore::DataType::I32, device); + workspace.sorted_token_ids_capacity = sorted_token_ids_capacity; + } + if (!workspace.expert_ids || workspace.expert_ids_capacity < max_num_blocks) { + if (infinicore::context::isGraphRecording()) { + throw std::runtime_error("MoE expert_ids workspace was not initialized before graph capture"); + } + workspace.expert_ids = infinicore::Tensor::empty( + {max_num_blocks}, infinicore::DataType::I32, device); + workspace.expert_ids_capacity = max_num_blocks; + } + if (!workspace.num_tokens_post_padded) { + if (infinicore::context::isGraphRecording()) { + throw std::runtime_error("MoE num_tokens_post_padded workspace was not initialized before graph capture"); + } + workspace.num_tokens_post_padded = infinicore::Tensor::empty( + {1}, infinicore::DataType::I32, device); + } + + if (dispatch_output.expert_map) { + infinicore::op::moe_align_with_expert_map_( + workspace.sorted_token_ids, + workspace.expert_ids, + workspace.num_tokens_post_padded, + topk_ids, + dispatch_output.expert_map, + num_local_experts_, + block_size, + true); + } else { + infinicore::op::moe_align_( + workspace.sorted_token_ids, + workspace.expert_ids, + workspace.num_tokens_post_padded, + topk_ids, + num_local_experts_, + block_size, + true); + } + return CudaFusedMoeRunnerInput{ + dispatch_output.hidden_states, + dispatch_output.topk_output, + MoeRoutingMetadata{ + workspace.sorted_token_ids, + workspace.expert_ids, + workspace.num_tokens_post_padded, + }, + }; +} + +CudaFusedMoeRunnerOutput CudaFusedMoeRunner::run_fused_core(const CudaFusedMoeRunnerInput &runner_input, + const MoeWeights &weights, + MoeWorkspace &workspace) const { + if (!weights.has_packed_dense_weights()) { + throw std::runtime_error("MoE fused dense runner requires load-time packed w13/w2 weights"); + } + check_packed_weight_tensor( + weights.packed_w13, + "w13", + runner_input.hidden_states->device(), + runner_input.hidden_states->dtype(), + {num_local_experts_, intermediate_size_per_partition_ * 2, hidden_size_}); + check_packed_weight_tensor( + weights.packed_w2, + "w2", + runner_input.hidden_states->device(), + runner_input.hidden_states->dtype(), + {num_local_experts_, hidden_size_, intermediate_size_per_partition_}); + ensure_tensor( + workspace.fused_moe_output, + runner_input.hidden_states->shape(), + runner_input.hidden_states->dtype(), + runner_input.hidden_states->device()); + workspace.fused_moe_output_tokens_capacity = runner_input.hidden_states->shape()[0]; + infinicore::op::moe_fused_dense_( + workspace.fused_moe_output, + runner_input.hidden_states, + weights.packed_w13, + weights.packed_w2, + runner_input.topk_output.topk_weights, + runner_input.topk_output.topk_ids, + runner_input.routing_metadata.sorted_token_ids, + runner_input.routing_metadata.expert_ids, + runner_input.routing_metadata.num_tokens_post_padded); + return CudaFusedMoeRunnerOutput{ + workspace.fused_moe_output, + }; +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/runner/cuda_fused_moe_runner.hpp b/csrc/layers/moe/runner/cuda_fused_moe_runner.hpp new file mode 100644 index 000000000..2a1a1f94e --- /dev/null +++ b/csrc/layers/moe/runner/cuda_fused_moe_runner.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include "base_runner.hpp" + +namespace infinilm::layers::moe { + +struct CudaFusedMoeRunnerInput { + infinicore::Tensor hidden_states; + TopKOutput topk_output; + MoeRoutingMetadata routing_metadata; +}; + +struct CudaFusedMoeRunnerOutput { + infinicore::Tensor hidden_states; +}; + +class CudaFusedMoeRunner final : public MoeRunnerCore { +public: + CudaFusedMoeRunner(size_t num_local_experts, + size_t hidden_size, + size_t intermediate_size_per_partition, + size_t align_block_size); + + CombineInput run(const DispatchOutput &dispatch_output, + const MoeWeights &weights, + MoeWorkspace &workspace) const override; + +private: + CudaFusedMoeRunnerInput prepare_runner_input(const DispatchOutput &dispatch_output, + MoeWorkspace &workspace) const; + + CudaFusedMoeRunnerOutput run_fused_core(const CudaFusedMoeRunnerInput &runner_input, + const MoeWeights &weights, + MoeWorkspace &workspace) const; + + size_t num_local_experts_ = 0; + size_t hidden_size_ = 0; + size_t intermediate_size_per_partition_ = 0; + size_t align_block_size_ = 16; +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/sparse_moe_block.cpp b/csrc/layers/moe/sparse_moe_block.cpp new file mode 100644 index 000000000..2ce2b2eb7 --- /dev/null +++ b/csrc/layers/moe/sparse_moe_block.cpp @@ -0,0 +1,34 @@ +#include "sparse_moe_block.hpp" + +namespace infinilm::layers::moe { + +SparseMoeBlock::SparseMoeBlock(std::shared_ptr model_config, + const infinicore::Device &device, + size_t layer_id) { + INFINICORE_NN_MODULE_INIT(gate, model_config, device); + INFINICORE_NN_MODULE_INIT(experts, model_config, device); + INFINICORE_NN_MODULE_INIT(fused_moe, model_config, device, layer_id); +} + +infinicore::Tensor SparseMoeBlock::forward(const infinicore::Tensor &hidden_states) const { + ASSERT(hidden_states->ndim() == 3); + + auto shape = hidden_states->shape(); + auto hidden_states_reshaped = hidden_states->view({shape[0] * shape[1], shape[2]}); + + auto [routing_weights, selected_experts] = gate_->forward(hidden_states_reshaped); + TopKOutput topk_output{ + routing_weights, + selected_experts, + infinicore::Tensor(), + }; + + auto final_hidden_states = fused_moe_->forward( + hidden_states_reshaped, + topk_output, + experts_->moe_weights()); + + return final_hidden_states->view({shape[0], shape[1], shape[2]}); +} + +} // namespace infinilm::layers::moe diff --git a/csrc/layers/moe/sparse_moe_block.hpp b/csrc/layers/moe/sparse_moe_block.hpp new file mode 100644 index 000000000..c2b6a37cd --- /dev/null +++ b/csrc/layers/moe/sparse_moe_block.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include "experts/fused_moe_experts.hpp" +#include "fused_moe.hpp" +#include "router/topk_router.hpp" + +#include "../../config/model_config.hpp" +#include "infinicore/nn/module.hpp" + +#include +#include + +namespace infinilm::layers::moe { + +class SparseMoeBlock : public infinicore::nn::Module { +public: + SparseMoeBlock(std::shared_ptr model_config, + const infinicore::Device &device, + size_t layer_id = 0); + + infinicore::Tensor forward(const infinicore::Tensor &hidden_states) const; + +protected: + INFINICORE_NN_MODULE(TopKRouter, gate); + INFINICORE_NN_MODULE(FusedMoeExperts, experts); + INFINICORE_NN_MODULE(FusedMoE, fused_moe); +}; + +} // namespace infinilm::layers::moe diff --git a/csrc/models/deepseek_v2/deepseek_v2_moe.hpp b/csrc/models/deepseek_v2/deepseek_v2_moe.hpp index 9f788fc9a..11af3669c 100644 --- a/csrc/models/deepseek_v2/deepseek_v2_moe.hpp +++ b/csrc/models/deepseek_v2/deepseek_v2_moe.hpp @@ -4,6 +4,7 @@ #include "../../layers/common_modules.hpp" #include "../../layers/linear/linear.hpp" #include "../../layers/mlp/mlp.hpp" +#include "../../layers/moe/legacy/moe_mlp.hpp" #include "infinicore/device.hpp" #include "infinicore/nn/module.hpp" #include "infinicore/tensor.hpp" @@ -16,7 +17,7 @@ namespace infinilm::models::deepseek_v2 { using DeepseekV2MLP = infinilm::layers::mlp::MLP; -using DeepseekV2ExpertMLP = infinilm::layers::MoeMLP; +using DeepseekV2ExpertMLP = infinilm::layers::moe::legacy::MoeMLP; class DeepseekV2TopKRouter : public infinicore::nn::Module { public: diff --git a/csrc/models/qwen3/qwen3_attention.cpp b/csrc/models/qwen3/qwen3_attention.cpp index bff9c8d73..00f27d913 100644 --- a/csrc/models/qwen3/qwen3_attention.cpp +++ b/csrc/models/qwen3/qwen3_attention.cpp @@ -23,12 +23,10 @@ Qwen3Attention::Qwen3Attention(std::shared_ptr mo const engine::distributed::RankInfo &rank_info = infinilm::global_state::get_tensor_model_parallel_rank_info(); int tp_rank = infinilm::global_state::get_tensor_model_parallel_rank(); int tp_size = infinilm::global_state::get_tensor_model_parallel_world_size(); - if ((total_num_kv_heads < tp_size) || (0 != (total_num_kv_heads % tp_size))) { - throw std::runtime_error("infinilm::models::qwen3::Qwen3Attention: num_key_value_heads must be divisible by tp_size"); - } - num_attention_heads_ = total_num_heads / tp_size; - num_key_value_heads_ = total_num_kv_heads / tp_size; + num_key_value_heads_ = total_num_kv_heads < static_cast(tp_size) + ? 1 + : total_num_kv_heads / tp_size; auto quantization_method = model_config->get_quantization_method(); auto register_fn = [this](const std::string &n, infinicore::nn::Parameter p) { this->register_parameter(n, std::move(p)); }; diff --git a/csrc/models/qwen3_moe/qwen3_moe_experts.cpp b/csrc/models/qwen3_moe/qwen3_moe_experts.cpp deleted file mode 100644 index 9520b7e15..000000000 --- a/csrc/models/qwen3_moe/qwen3_moe_experts.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "qwen3_moe_experts.hpp" - -namespace infinilm::models::qwen3_moe { -Qwen3MoeExperts::Qwen3MoeExperts(std::shared_ptr model_config, - const infinicore::Device &device) { - - num_experts_ = model_config->get("num_experts"); - num_experts_per_tok_ = model_config->get("num_experts_per_tok"); - - ASSERT((num_experts_ > 0) && (num_experts_per_tok_ > 0) && (num_experts_per_tok_ <= num_experts_)); - - for (size_t i = 0; i < num_experts_; ++i) { - experts_.push_back(this->register_module(std::to_string(i), model_config, device)); - } -} - -infinicore::Tensor Qwen3MoeExperts::forward(const infinicore::Tensor &hidden_states, - const infinicore::Tensor &top_k_index, - const infinicore::Tensor &top_k_weights) const { - ASSERT(hidden_states->ndim() == 2); - - auto top_k_weights_cpu = top_k_weights->to(infinicore::Device::Type::CPU); - auto top_k_index_cpu = top_k_index->to(infinicore::Device::Type::CPU); - - int *top_k_index_ptr = (int *)top_k_index_cpu->data(); - float *top_k_weights_ptr = (float *)top_k_weights_cpu->data(); - - size_t ntoken = hidden_states->shape()[0]; - int index; - float score; - - auto final_hidden_states = infinicore::Tensor::empty(hidden_states->shape(), hidden_states->dtype(), hidden_states->device()); - for (size_t itok = 0; itok < ntoken; ++itok) { - auto hidden_states_i = hidden_states->narrow({{0, itok, 1}}); - const size_t route_row = itok * num_experts_per_tok_; - - infinicore::Tensor final_hidden_states_i; - for (size_t k = 0; k < num_experts_per_tok_; ++k) { - index = top_k_index_ptr[route_row + k]; - score = top_k_weights_ptr[route_row + k]; - - ASSERT(index >= 0 && index < num_experts_); - - experts_[index]->set_alpha(score); - auto expert_out = experts_[index]->forward(hidden_states_i); - - if (k == 0) { - final_hidden_states_i = expert_out; - } else { - infinicore::op::add_(final_hidden_states_i, final_hidden_states_i, expert_out); - } - } - - final_hidden_states->narrow({{0, itok, 1}})->copy_from(final_hidden_states_i); - } - return final_hidden_states; -} - -} // namespace infinilm::models::qwen3_moe diff --git a/csrc/models/qwen3_moe/qwen3_moe_experts.hpp b/csrc/models/qwen3_moe/qwen3_moe_experts.hpp deleted file mode 100644 index 90f5739bf..000000000 --- a/csrc/models/qwen3_moe/qwen3_moe_experts.hpp +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once -#include "../../layers/common_modules.hpp" - -#include - -namespace infinilm::models::qwen3_moe { - -using Qwen3MoeMLP = infinilm::layers::MoeMLP; - -class Qwen3MoeExperts : public infinicore::nn::Module { -public: - Qwen3MoeExperts(std::shared_ptr model_config, - const infinicore::Device &device); - - infinicore::Tensor forward(const infinicore::Tensor &hidden_states, - const infinicore::Tensor &top_k_index, - const infinicore::Tensor &top_k_weights) const; - -protected: - INFINICORE_NN_MODULE_VEC(Qwen3MoeMLP, experts); - size_t num_experts_per_tok_{0}; - size_t num_experts_{0}; -}; - -} // namespace infinilm::models::qwen3_moe diff --git a/csrc/models/qwen3_moe/qwen3_moe_sparse_moe_block.cpp b/csrc/models/qwen3_moe/qwen3_moe_sparse_moe_block.cpp index c617c61fc..4df3b981d 100644 --- a/csrc/models/qwen3_moe/qwen3_moe_sparse_moe_block.cpp +++ b/csrc/models/qwen3_moe/qwen3_moe_sparse_moe_block.cpp @@ -3,21 +3,13 @@ namespace infinilm::models::qwen3_moe { Qwen3MoeSparseMoeBlock::Qwen3MoeSparseMoeBlock(std::shared_ptr model_config, - const infinicore::Device &device) { - INFINICORE_NN_MODULE_INIT(gate, model_config, device); - INFINICORE_NN_MODULE_INIT(experts, model_config, device); + const infinicore::Device &device) + : Qwen3MoeSparseMoeBlock(model_config, 0, device) { } -infinicore::Tensor Qwen3MoeSparseMoeBlock::forward(const infinicore::Tensor &hidden_states) const { - ASSERT(hidden_states->ndim() == 3); - - auto shape = hidden_states->shape(); // shape[ 1 11 2048 ] - auto hidden_states_reshaped = hidden_states->view({shape[0] * shape[1], shape[2]}); - - auto [routing_weights, selected_experts] = gate_->forward(hidden_states_reshaped); - auto final_hidden_states = experts_->forward(hidden_states_reshaped, selected_experts, routing_weights); - - return final_hidden_states->view({shape[0], shape[1], shape[2]}); -} +Qwen3MoeSparseMoeBlock::Qwen3MoeSparseMoeBlock(std::shared_ptr model_config, + size_t layer_idx, + const infinicore::Device &device) + : infinilm::layers::moe::SparseMoeBlock(model_config, device, layer_idx) {} } // namespace infinilm::models::qwen3_moe diff --git a/csrc/models/qwen3_moe/qwen3_moe_sparse_moe_block.hpp b/csrc/models/qwen3_moe/qwen3_moe_sparse_moe_block.hpp index ad4bebd69..c03984135 100644 --- a/csrc/models/qwen3_moe/qwen3_moe_sparse_moe_block.hpp +++ b/csrc/models/qwen3_moe/qwen3_moe_sparse_moe_block.hpp @@ -1,20 +1,20 @@ #pragma once -#include "qwen3_moe_experts.hpp" -#include "qwen3_moe_topk_router.hpp" +#include "../../config/model_config.hpp" +#include "../../layers/moe/sparse_moe_block.hpp" + +#include +#include namespace infinilm::models::qwen3_moe { -class Qwen3MoeSparseMoeBlock : public infinicore::nn::Module { +class Qwen3MoeSparseMoeBlock final : public infinilm::layers::moe::SparseMoeBlock { public: Qwen3MoeSparseMoeBlock(std::shared_ptr model_config, const infinicore::Device &device); - - infinicore::Tensor forward(const infinicore::Tensor &hidden_states) const; - -protected: - INFINICORE_NN_MODULE(Qwen3MoeTopKRouter, gate); - INFINICORE_NN_MODULE(Qwen3MoeExperts, experts); + Qwen3MoeSparseMoeBlock(std::shared_ptr model_config, + size_t layer_idx, + const infinicore::Device &device); }; } // namespace infinilm::models::qwen3_moe diff --git a/csrc/models/qwen3_moe/qwen3_moe_topk_router.cpp b/csrc/models/qwen3_moe/qwen3_moe_topk_router.cpp deleted file mode 100644 index 64721fbe5..000000000 --- a/csrc/models/qwen3_moe/qwen3_moe_topk_router.cpp +++ /dev/null @@ -1,36 +0,0 @@ -#include "qwen3_moe_topk_router.hpp" - -#include "infinicore/ops.hpp" - -namespace infinilm::models::qwen3_moe { - -Qwen3MoeTopKRouter::Qwen3MoeTopKRouter(std::shared_ptr model_config, - const infinicore::Device &device) { - const auto &dtype{model_config->get_dtype()}; - - size_t hidden_size = model_config->get("hidden_size"); - size_t num_experts = model_config->get("num_experts"); - num_experts_per_tok_ = model_config->get("num_experts_per_tok"); - norm_topk_prob_ = model_config->get("norm_topk_prob"); - - ASSERT((num_experts > 0) && (num_experts_per_tok_ > 0) && (num_experts_per_tok_ <= num_experts)); - - INFINICORE_NN_PARAMETER_INIT(weight, ({num_experts, hidden_size}, dtype, device)); -} - -std::tuple Qwen3MoeTopKRouter::forward(const infinicore::Tensor &hidden_states) const { - - ASSERT(hidden_states->ndim() == 2); - - size_t ntoken = hidden_states->shape()[0]; - auto router_logits = infinicore::op::linear(hidden_states, weight_, std::nullopt, 1.0f); - - auto router_scores = infinicore::Tensor::empty({ntoken, num_experts_per_tok_}, infinicore::DataType::F32, hidden_states->device()); - auto router_indices = infinicore::Tensor::empty({ntoken, num_experts_per_tok_}, infinicore::DataType::I32, hidden_states->device()); - - infinicore::op::topksoftmax(router_scores, router_indices, router_logits, num_experts_per_tok_, norm_topk_prob_); - - return std::make_tuple(router_scores, router_indices); -} - -} // namespace infinilm::models::qwen3_moe diff --git a/csrc/models/qwen3_moe/qwen3_moe_topk_router.hpp b/csrc/models/qwen3_moe/qwen3_moe_topk_router.hpp deleted file mode 100644 index dadadb3ff..000000000 --- a/csrc/models/qwen3_moe/qwen3_moe_topk_router.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once -#include "../../layers/common_modules.hpp" - -#include -#include - -namespace infinilm::models::qwen3_moe { - -class Qwen3MoeTopKRouter : public infinicore::nn::Module { -public: - Qwen3MoeTopKRouter(std::shared_ptr model_config, - const infinicore::Device &device); - - std::tuple forward(const infinicore::Tensor &hidden_states) const; - -protected: - INFINICORE_NN_PARAMETER(weight); - - size_t num_experts_per_tok_{0}; - bool norm_topk_prob_{false}; -}; - -} // namespace infinilm::models::qwen3_moe diff --git a/csrc/models/qwen3_next/qwen3_next_attention.cpp b/csrc/models/qwen3_next/qwen3_next_attention.cpp index 67fd38082..625d7d8ab 100644 --- a/csrc/models/qwen3_next/qwen3_next_attention.cpp +++ b/csrc/models/qwen3_next/qwen3_next_attention.cpp @@ -28,12 +28,10 @@ Qwen3NextAttention::Qwen3NextAttention(std::shared_ptr(tp_size) + ? 1 + : total_num_kv_heads / tp_size; auto quantization_method = model_config->get_quantization_method(); auto register_fn = [this](const std::string &n, infinicore::nn::Parameter p) { this->register_parameter(n, std::move(p)); }; diff --git a/csrc/pybind11/engine/engine.hpp b/csrc/pybind11/engine/engine.hpp index 4c5058ac5..4c2e0e470 100644 --- a/csrc/pybind11/engine/engine.hpp +++ b/csrc/pybind11/engine/engine.hpp @@ -92,7 +92,7 @@ inline void bind_infer_engine(py::module &m) { py::arg("name"), py::arg("param"), "Load a parameter tensor into all workers (each worker picks its shard)") .def("load_params", &InferEngine::load_params, - py::arg("params"), + py::arg("params"), py::arg("strict") = true, "Load a batch of parameter tensors into all workers, syncing once per worker") .def("state_dict_keyname", &InferEngine::state_dict_keys) .def("state_dict", [](InferEngine &self) { diff --git a/examples/bench.py b/examples/bench.py index 7e1b9de41..606393e2c 100644 --- a/examples/bench.py +++ b/examples/bench.py @@ -167,6 +167,86 @@ def repeat_prompt(input_ids: list[int], target_length: int): return (input_ids * repeat_times)[:target_length] +def _normalize_moe_ep_backend(backend: str) -> str: + backend = (backend or "auto").strip().lower() + aliases = { + "": "auto", + "none": "disabled", + "off": "disabled", + "standard": "disabled", + "0": "disabled", + "naive": "allgather_reducescatter", + "ag_rs": "allgather_reducescatter", + "all_gather_reduce_scatter": "allgather_reducescatter", + "local_all_reduce": "local_allreduce", + "tp_ep": "local_allreduce", + "vllm_tp": "local_allreduce", + "dp1": "local_allreduce", + "deep_ep": "deepep", + } + return aliases.get(backend, backend) + + +def _is_moe_model(model_path: str) -> bool: + config_path = os.path.join(model_path, "config.json") + if not os.path.exists(config_path): + return False + with open(config_path, "r") as f: + config = json.load(f) + model_type = str(config.get("model_type", "")).lower() + return "moe" in model_type or "num_experts" in config + + +def configure_moe_ep_backend( + tp: int, + dp: int, + ep: int | None, + backend: str, + model_path: str, +) -> tuple[str, int]: + if tp < 1: + raise ValueError("--tp must be greater than 0") + if dp < 1: + raise ValueError("--dp must be greater than 0") + if not _is_moe_model(model_path): + return "disabled", 1 + + if ep is None: + ep = tp + if ep < 1: + raise ValueError("--ep must be greater than 0") + + backend = _normalize_moe_ep_backend(backend) + + if backend == "auto": + if dp == 1: + backend = "disabled" if ep == 1 else "local_allreduce" + else: + backend = "allgather_reducescatter" + + if backend not in { + "disabled", + "local_allreduce", + "allgather_reducescatter", + "deepep", + }: + raise ValueError(f"Unsupported --moe-ep-backend: {backend}") + + if dp != 1 and backend != "disabled": + raise NotImplementedError( + "InfiniLM bench currently has only a TP communication group. " + "True DP>1 MoE EP needs DP rank/group support before selecting " + f"{backend}." + ) + if backend != "disabled" and ep != tp: + raise NotImplementedError( + "InfiniLM MoE EP currently reuses the TP communication group, " + f"so EP size must equal TP size. Got EP={ep}, TP={tp}." + ) + + return backend, ep + + class TestModel: model: infinicore.nn.Module input_ids_list: list[int] @@ -182,6 +262,8 @@ def __init__( attn_backend="default", use_mla=False, weight_load_mode="async", + moe_ep_backend="disabled", + moe_ep_size=1, ) -> None: model_path = os.path.expanduser(model_path) # ---------------------------------------------------------------------------- # @@ -197,6 +279,8 @@ def __init__( kv_cache_dtype=cfg.kv_cache_dtype, use_mla=use_mla, weight_load_mode=weight_load_mode, + moe_ep_backend=moe_ep_backend, + moe_ep_size=moe_ep_size, ) # ---------------------------------------------------------------------------- # @@ -287,6 +371,11 @@ def run( infini_device = infinicore.device(device_str, 0) tp = cfg.tp + dp = cfg.dp + moe_ep_backend, ep = configure_moe_ep_backend( + tp, dp, cfg.ep, cfg.moe_ep_backend, model_path + ) + print(f"MoE EP backend: {moe_ep_backend} TP={tp} DP={dp} EP={ep}") skip_load = cfg.skip_load @@ -341,6 +430,8 @@ def run( attn_backend=attn_backend, use_mla=cfg.use_mla, weight_load_mode=cfg.weight_load_mode, + moe_ep_backend=moe_ep_backend, + moe_ep_size=ep, ) # ---------------------------------------------------------------------------- # diff --git a/python/infinilm/base_config.py b/python/infinilm/base_config.py index 07c12ba05..19a92388c 100644 --- a/python/infinilm/base_config.py +++ b/python/infinilm/base_config.py @@ -56,6 +56,9 @@ def __init__(self): self.model = self.args.model self.device = self.args.device self.tp = self.args.tp + self.dp = self.args.dp + self.ep = self.args.ep + self.moe_ep_backend = self.args.moe_ep_backend self.attn = self.args.attn self.enable_graph = self.args.enable_graph @@ -115,6 +118,18 @@ def _add_common_args(self): self.parser.add_argument("--model", type=str, required=True) self.parser.add_argument("--device", type=str, default="cpu") self.parser.add_argument("--tp", "--tensor-parallel-size", type=int, default=1) + self.parser.add_argument("--dp", "--data-parallel-size", type=int, default=1) + self.parser.add_argument("--ep", "--expert-parallel-size", type=int, default=None) + self.parser.add_argument( + "--moe-ep-backend", + type=str, + default="auto", + help=( + "MoE EP backend selector for examples/bench.py. " + "Defaults to auto, with DP=1 and EP=TP unless explicitly set. " + "or one of disabled/local_allreduce/allgather_reducescatter/deepep." + ), + ) # --- Infer backend optimization --- self.parser.add_argument( diff --git a/python/infinilm/config/engine_config.py b/python/infinilm/config/engine_config.py index e931e9943..1a5fbedba 100644 --- a/python/infinilm/config/engine_config.py +++ b/python/infinilm/config/engine_config.py @@ -12,6 +12,8 @@ class EngineConfig: device: Device type string ('cpu', 'cuda', 'mlu', etc.). dtype: Data type string ('float16', 'bfloat16', 'float32'). tensor_parallel_size: Number of devices for tensor parallelism. + moe_ep_backend: MoE expert-parallel backend. + moe_ep_size: MoE expert-parallel size. cache_type: Cache type ('paged' or 'static'). max_batch_size: Maximum batch size for inference (only for paged cache). max_tokens: Default maximum tokens to generate. @@ -32,6 +34,8 @@ class EngineConfig: device: str = "cuda" dtype: str = "float16" tensor_parallel_size: int = 1 + moe_ep_backend: str = "disabled" + moe_ep_size: int = 1 cache_type: str = "paged" # "paged" or "static" max_batch_size: int = 16 max_tokens: int = 4096 diff --git a/python/infinilm/infer_engine.py b/python/infinilm/infer_engine.py index 10cf58be2..17519e760 100644 --- a/python/infinilm/infer_engine.py +++ b/python/infinilm/infer_engine.py @@ -69,9 +69,13 @@ def __init__( kv_cache_dtype=None, use_mla=False, weight_load_mode="async", + moe_ep_backend="disabled", + moe_ep_size=1, ): self.hf_config = read_hf_config(model_path) self.hf_generation_config = read_hf_generation_config(model_path) + self.hf_config["moe_ep_backend"] = moe_ep_backend + self.hf_config["moe_ep_size"] = moe_ep_size if device is None: device = infinicore.device() @@ -386,7 +390,8 @@ def state_dict_keyname(self): def load_state_dict(self, state_dict, strict=None): super().load_params( - {name: param._underlying for name, param in state_dict.items()} + {name: param._underlying for name, param in state_dict.items()}, + strict=True if strict is None else strict, ) def process_weights_after_loading(self): diff --git a/python/infinilm/llm/llm.py b/python/infinilm/llm/llm.py index fe90cfcf0..d01251cdd 100644 --- a/python/infinilm/llm/llm.py +++ b/python/infinilm/llm/llm.py @@ -290,6 +290,8 @@ def __init__( device: str = "cuda", dtype: str = "float16", tensor_parallel_size: int = 1, + moe_ep_backend: str = "disabled", + moe_ep_size: int = 1, cache_type: str = "paged", max_batch_size: int = 16, max_tokens: int = 4096, @@ -331,6 +333,8 @@ def __init__( device=device, dtype=dtype, tensor_parallel_size=tensor_parallel_size, + moe_ep_backend=moe_ep_backend, + moe_ep_size=moe_ep_size, cache_type=cache_type, max_batch_size=max_batch_size, max_tokens=max_tokens, @@ -487,6 +491,8 @@ def __init__( device: str = "cuda", dtype: str = "float16", tensor_parallel_size: int = 1, + moe_ep_backend: str = "disabled", + moe_ep_size: int = 1, cache_type: str = "paged", max_batch_size: int = 16, max_tokens: int = 512, @@ -531,6 +537,8 @@ def __init__( device=device, dtype=dtype, tensor_parallel_size=tensor_parallel_size, + moe_ep_backend=moe_ep_backend, + moe_ep_size=moe_ep_size, cache_type=cache_type, max_batch_size=max_batch_size, max_tokens=max_tokens, diff --git a/python/infinilm/llm/model_runner/model_runner.py b/python/infinilm/llm/model_runner/model_runner.py index daf60e143..01cc4e212 100644 --- a/python/infinilm/llm/model_runner/model_runner.py +++ b/python/infinilm/llm/model_runner/model_runner.py @@ -75,6 +75,8 @@ def __init__(self, config: EngineConfig): attention_backend=config.attn_backend, use_mla=config.use_mla, weight_load_mode=config.weight_load_mode, + moe_ep_backend=config.moe_ep_backend, + moe_ep_size=config.moe_ep_size, ) # Load model weights diff --git a/python/infinilm/modeling_utils.py b/python/infinilm/modeling_utils.py index ea2518708..c9e356b24 100644 --- a/python/infinilm/modeling_utils.py +++ b/python/infinilm/modeling_utils.py @@ -54,12 +54,22 @@ def parse_dtype(dtype_str: str): } +def _is_internal_moe_packed_weight(key: str) -> bool: + return key.endswith(".mlp.experts.w13_weight") or key.endswith( + ".mlp.experts.w2_weight" + ) + + def check_parameters(model_keys: list, already_loaded_keys: list): model_keys = set(model_keys) already_loaded_keys = set(already_loaded_keys) intersection = model_keys & already_loaded_keys - missing_keys = model_keys - intersection + missing_keys = { + key + for key in model_keys - intersection + if not _is_internal_moe_packed_weight(key) + } unexpected_keys = already_loaded_keys - intersection error_msgs: list[str] = [] @@ -183,7 +193,8 @@ def load_model_state_dict_by_file( already_loaded_keys = [] embed_tokens_torch_unscaled = None - file_list = glob.glob(os.path.join(model_path, "*.safetensors")) + remapper = _WEIGHT_REMAPPER.get(model_type) + file_list = sorted(glob.glob(os.path.join(model_path, "*.safetensors"))) if len(file_list) > 0: for file_path in tqdm(file_list, desc="Processing files"): tqdm.write(f"Processing: {os.path.basename(file_path)}") @@ -196,7 +207,6 @@ def load_model_state_dict_by_file( ) # Apply model-specific weight remapping - remapper = _WEIGHT_REMAPPER.get(model_type) if remapper is not None: model_param = remapper(model_param, config=model.hf_config) @@ -223,7 +233,6 @@ def load_model_state_dict_by_file( del model_param_infini del model_param gc.collect() - if not ( "lm_head.weight" in model_keys and "lm_head.weight" not in already_loaded_keys @@ -351,18 +360,19 @@ def load_model_state_dict_by_tensor( t2 = time.time() print(f" load weights over! {(t2 - t1) * 1000} ms \n") + # ============================================================================ # Common weight transformation utilities # ============================================================================ + def drop_keys( state_dict: Dict[str, torch.Tensor], substrings: List[str], ) -> Dict[str, torch.Tensor]: """Drop keys containing any of the given substrings.""" return { - k: v for k, v in state_dict.items() - if not any(sub in k for sub in substrings) + k: v for k, v in state_dict.items() if not any(sub in k for sub in substrings) } @@ -444,6 +454,7 @@ def split_fused_weight( return result + def split_fused_weight_with_sizes( state_dict: Dict[str, torch.Tensor], fused_key: str, @@ -484,6 +495,7 @@ def split_fused_weight_with_sizes( return result + # ============================================================================ # Model-specific remap functions # ============================================================================ @@ -530,18 +542,22 @@ def _remap_chatglm(state_dict, config=None): ) # 4. Rename keys - state_dict = rename_keys(state_dict, { - "transformer.encoder.layers.": "model.layers.", - "transformer.embedding.word_embeddings": "model.embed_tokens", - "transformer.encoder.final_layernorm": "model.norm", - "transformer.output_layer": "lm_head", - "self_attention.": "self_attn.", - "self_attn.dense": "self_attn.o_proj", - "mlp.dense_4h_to_h": "mlp.down_proj", - }) + state_dict = rename_keys( + state_dict, + { + "transformer.encoder.layers.": "model.layers.", + "transformer.embedding.word_embeddings": "model.embed_tokens", + "transformer.encoder.final_layernorm": "model.norm", + "transformer.output_layer": "lm_head", + "self_attention.": "self_attn.", + "self_attn.dense": "self_attn.o_proj", + "mlp.dense_4h_to_h": "mlp.down_proj", + }, + ) return state_dict + def _is_baichuan2(config): """ Baichuan1 and Baichuan2 share the same model_type "baichuan" in official HuggingFace configs, @@ -554,6 +570,7 @@ def _is_baichuan2(config): """ return config.get("vocab_size") == 125696 + def _remap_baichuan(state_dict, config=None): """Split Baichuan fused W_pack into q_proj, k_proj, v_proj and apply Baichuan2-specific fixes.""" diff --git a/python/infinilm/server/inference_server.py b/python/infinilm/server/inference_server.py index 8cfa8f2f2..a047a64ed 100644 --- a/python/infinilm/server/inference_server.py +++ b/python/infinilm/server/inference_server.py @@ -25,6 +25,85 @@ DEFAULT_REQUEST_TIMEOUT = 1000.0 +def _normalize_moe_ep_backend(backend: str) -> str: + backend = (backend or "auto").strip().lower() + aliases = { + "": "auto", + "none": "disabled", + "off": "disabled", + "standard": "disabled", + "0": "disabled", + "naive": "allgather_reducescatter", + "ag_rs": "allgather_reducescatter", + "all_gather_reduce_scatter": "allgather_reducescatter", + "local_all_reduce": "local_allreduce", + "tp_ep": "local_allreduce", + "vllm_tp": "local_allreduce", + "dp1": "local_allreduce", + "deep_ep": "deepep", + } + return aliases.get(backend, backend) + + +def _is_moe_model(model_path: str) -> bool: + config_path = os.path.join(model_path, "config.json") + if not os.path.exists(config_path): + return False + with open(config_path, "r") as f: + config = json.load(f) + model_type = str(config.get("model_type", "")).lower() + return "moe" in model_type or "num_experts" in config + + +def configure_moe_ep_backend( + tp: int, + dp: int, + ep: Optional[int], + backend: str, + model_path: str, +) -> tuple[str, int]: + if tp < 1: + raise ValueError("--tp must be greater than 0") + if dp < 1: + raise ValueError("--dp must be greater than 0") + if not _is_moe_model(model_path): + return "disabled", 1 + + if ep is None: + ep = tp + if ep < 1: + raise ValueError("--ep must be greater than 0") + + backend = _normalize_moe_ep_backend(backend) + if backend == "auto": + if dp == 1: + backend = "disabled" if ep == 1 else "local_allreduce" + else: + backend = "allgather_reducescatter" + + if backend not in { + "disabled", + "local_allreduce", + "allgather_reducescatter", + "deepep", + }: + raise ValueError(f"Unsupported --moe-ep-backend: {backend}") + + if dp != 1 and backend != "disabled": + raise NotImplementedError( + "InfiniLM server currently has only a TP communication group. " + "True DP>1 MoE EP needs DP rank/group support before selecting " + f"{backend}." + ) + if backend != "disabled" and ep != tp: + raise NotImplementedError( + "InfiniLM MoE EP currently reuses the TP communication group, " + f"so EP size must equal TP size. Got EP={ep}, TP={tp}." + ) + + return backend, ep + + def chunk_json( id_, content=None, role=None, finish_reason=None, model: str = "unknown" ): @@ -97,6 +176,8 @@ def __init__( device: str = "cuda", dtype: str = "float16", tensor_parallel_size: int = 1, + moe_ep_backend: str = "disabled", + moe_ep_size: int = 1, cache_type: str = "paged", max_tokens: int = 4096, max_batch_size: int = 16, @@ -122,6 +203,8 @@ def __init__( device: Device type ('cpu', 'cuda', 'mlu', 'moore'). dtype: Data type ('float16', 'bfloat16', 'float32'). tensor_parallel_size: Number of devices for tensor parallelism. + moe_ep_backend: MoE expert-parallel backend. + moe_ep_size: MoE expert-parallel size. cache_type: Cache type ('paged' or 'static'). max_tokens: Default maximum tokens to generate. max_batch_size: Maximum batch size for inference (only for paged cache). @@ -146,6 +229,8 @@ def __init__( self.device = device self.dtype = dtype self.tensor_parallel_size = tensor_parallel_size + self.moe_ep_backend = moe_ep_backend + self.moe_ep_size = moe_ep_size self.cache_type = cache_type self.max_tokens = max_tokens self.max_batch_size = max_batch_size @@ -183,6 +268,8 @@ async def lifespan(app: FastAPI): device=self.device, dtype=self.dtype, tensor_parallel_size=self.tensor_parallel_size, + moe_ep_backend=self.moe_ep_backend, + moe_ep_size=self.moe_ep_size, cache_type=self.cache_type, max_batch_size=self.max_batch_size, max_tokens=self.max_tokens, @@ -580,11 +667,24 @@ def main(): if cfg.kv_transfer_config: kv_transfer_config = parse_kv_transfer_config(cfg.kv_transfer_config) + moe_ep_backend, ep = configure_moe_ep_backend( + cfg.tp, cfg.dp, cfg.ep, cfg.moe_ep_backend, cfg.model + ) + logger.info( + "MoE EP backend: %s TP=%s DP=%s EP=%s", + moe_ep_backend, + cfg.tp, + cfg.dp, + ep, + ) + server = InferenceServer( model_path=cfg.model, device=device, dtype=cfg.dtype, tensor_parallel_size=cfg.tp, + moe_ep_backend=moe_ep_backend, + moe_ep_size=ep, cache_type="paged" if cfg.enable_paged_attn else "static", max_tokens=cfg.max_new_tokens, max_batch_size=cfg.max_batch_size,