Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions csrc/config/model_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ infinicore::DataType ModelConfig::get_dtype() const {
return parse_dtype(dtype_str);
}

bool ModelConfig::contains_non_null(const std::string &key) const {
return config_json.contains(key) && !config_json.at(key).is_null();
}

size_t ModelConfig::get_rotary_dim() const {
size_t head_dim = get_head_dim();
double partial_rotary_factor = get_or<double>("partial_rotary_factor", 1.0);
Expand Down
14 changes: 14 additions & 0 deletions csrc/config/model_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ class ModelConfig {
return default_value;
}
}

bool contains_non_null(const std::string &key) const;

template <typename T>
T get_or_alias(const std::string &key, const std::string &alias, const T &default_value) const {
if (!key.empty() && config_json.contains(key) && !config_json.at(key).is_null()) {
return config_json.at(key).get<T>();
}
if (!alias.empty() && config_json.contains(alias) && !config_json.at(alias).is_null()) {
return config_json.at(alias).get<T>();
}
return default_value;
}

size_t get_kv_dim() const {
return get<size_t>("hidden_size") * get<size_t>("num_key_value_heads") / get<size_t>("num_attention_heads");
}
Expand Down
11 changes: 9 additions & 2 deletions csrc/engine/compiler/paged_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,28 @@ void PagedCompiler::compile() {
auto input = make_decode_input(b);

barrier_->wait();
(void)model_->forward(input);
infinicore::context::syncStream();
// Capture must not start with stale Marlin locks from previous
// warmup/capture attempts. This reset is intentionally outside
// graph capture; the current implementation still pays a memset
// before every graph replay in get_compiled().
model_->reset_runtime_state();
infinicore::context::syncStream();
barrier_->wait();

barrier_->wait();
infinicore::context::startGraphRecording();
auto output = model_->forward(input);
auto graph = infinicore::context::stopGraphRecording();
barrier_->wait();

auto shared_output = std::shared_ptr<InfinilmModel::Output>(
new InfinilmModel::Output{infinicore::graph::GraphTensor(output.logits)});
auto replay_output = std::shared_ptr<InfinilmModel::Output>(
new InfinilmModel::Output{shared_output->logits->resume_from_blob_()});

compiled_map_decode_[b] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output)};
compiled_map_decode_[b] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output), replay_output};
}
}
}
Expand Down Expand Up @@ -141,7 +148,7 @@ PagedCompiler::Compiled PagedCompiler::get_compiled(const InfinilmModel::Input &
model_->reset_runtime_state();

auto graph = std::get<0>(result->second.compiled);
auto shared_output = std::shared_ptr<InfinilmModel::Output>(new InfinilmModel::Output{std::get<1>(result->second.compiled)->logits->resume_from_blob_()});
auto shared_output = result->second.replay_output;

return std::make_tuple(graph, shared_output);
}
Expand Down
1 change: 1 addition & 0 deletions csrc/engine/compiler/paged_compiler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class PagedCompiler : public GraphCompiler {
struct CompiledResult {
InfinilmModel::Input input;
Compiled compiled;
std::shared_ptr<InfinilmModel::Output> replay_output;
};

std::unordered_map<
Expand Down
10 changes: 8 additions & 2 deletions csrc/engine/compiler/static_batching_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,21 @@ void StaticBatchingCompiler::compile() {
input.slot_mapping,
};

barrier_->wait();
(void)model_->forward(input);
infinicore::context::syncStream();
barrier_->wait();

barrier_->wait();
infinicore::context::startGraphRecording();
auto output = model_->forward(input);
auto graph = infinicore::context::stopGraphRecording();
barrier_->wait();

auto shared_output = std::shared_ptr<InfinilmModel::Output>(new InfinilmModel::Output{infinicore::graph::GraphTensor(output.logits)});
auto replay_output = std::shared_ptr<InfinilmModel::Output>(new InfinilmModel::Output{shared_output->logits->resume_from_blob_()});

compiled_map_[std::make_tuple(b, 1)] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output)};
compiled_map_[std::make_tuple(b, 1)] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output), replay_output};
}
}

Expand All @@ -56,7 +62,7 @@ StaticBatchingCompiler::Compiled StaticBatchingCompiler::get_compiled(
graph_input.total_sequence_lengths.value()->copy_from(input.total_sequence_lengths.value());

auto graph = std::get<0>(result->second.compiled);
auto shared_output = std::shared_ptr<InfinilmModel::Output>(new InfinilmModel::Output{std::get<1>(result->second.compiled)->logits->resume_from_blob_()});
auto shared_output = result->second.replay_output;
return std::make_tuple(graph, shared_output);
}
} else {
Expand Down
1 change: 1 addition & 0 deletions csrc/engine/compiler/static_batching_compiler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class StaticBatchingCompiler : public GraphCompiler {
struct CompiledResult {
InfinilmModel::Input input;
Compiled compiled;
std::shared_ptr<InfinilmModel::Output> replay_output;

@pengcheng888 pengcheng888 Jun 26, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个新增的replay_output变量,以及graph编译时新增和修改的代码。可以注释或解释一下么,不知道啥意思

};

std::unordered_map<
Expand Down
20 changes: 15 additions & 5 deletions csrc/engine/infer_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "spdlog/spdlog.h"
#include <future>
#include <stdexcept>
#include <unordered_set>

namespace infinilm::engine {

Expand Down Expand Up @@ -67,19 +68,19 @@ void InferEngine::load_param(const std::string &name, const infinicore::Tensor &
}
}

void InferEngine::load_params(const std::unordered_map<std::string, infinicore::Tensor> &params) {
void InferEngine::load_params(const std::unordered_map<std::string, infinicore::Tensor> &params, bool strict) {
if (workers_.size() <= 1 || weight_load_mode_ == "sync") {
for (auto &worker : workers_) {
worker->load_params(params);
worker->load_params(params, strict);
}
return;
}

std::vector<std::future<void>> futures;
futures.reserve(workers_.size());
for (auto &worker : workers_) {
futures.emplace_back(std::async(std::launch::async, [&worker, &params] {
worker->load_params(params);
futures.emplace_back(std::async(std::launch::async, [&worker, &params, strict] {
worker->load_params(params, strict);
}));
}
for (auto &future : futures) {
Expand Down Expand Up @@ -118,7 +119,16 @@ std::vector<std::string> InferEngine::state_dict_keys() {
if (0 == workers_.size()) {
throw std::runtime_error(" Model object not found. ");
}
return workers_.front()->state_dict_keys();
std::vector<std::string> keys;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个写法,我看了好一会才看懂。
其是等价于下面的写法。先求set, 最后再赋值给key_vec.
`
std::unordered_setstd::string keys;
for (const auto& worker : workers_) {
const auto& worker_keys = worker->state_dict_keys();
keys.insert(worker_keys.begin(), worker_keys.end());
}

std::vectorstd::string keys_vec(keys.begin(), keys.end());
return keys_vec;

`

std::unordered_set<std::string> seen;
for (auto &worker : workers_) {
for (const auto &key : worker->state_dict_keys()) {
if (seen.emplace(key).second) {
keys.push_back(key);
}
}
}
return keys;
}

//------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion csrc/engine/infer_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class InferEngine {
void load_param(const std::string &name, const infinicore::Tensor &param);

// Load a batch of parameters to all workers, syncing each worker once after the batch.
void load_params(const std::unordered_map<std::string, infinicore::Tensor> &params);
void load_params(const std::unordered_map<std::string, infinicore::Tensor> &params, bool strict = true);

// process the weights after loading on all workers (e.g., for quantization)
void process_weights_after_loading();
Expand Down
8 changes: 6 additions & 2 deletions csrc/engine/rank_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ void RankWorker::load_param(const std::string &name,
//------------------------------------------------------
// load_params -- synchronous batch load
//------------------------------------------------------
void RankWorker::load_params(const std::unordered_map<std::string, infinicore::Tensor> &params) {
void RankWorker::load_params(const std::unordered_map<std::string, infinicore::Tensor> &params, bool strict) {
{
std::lock_guard<std::mutex> lock(mutex_);
if (should_exit_) {
throw std::runtime_error("RankWorker is closing; cannot load_params");
}

pending_params_ = params;
pending_params_strict_ = strict;
job_cmd_ = Command::LOAD_BATCH;
has_job_ = true;
job_done_ = false;
Expand Down Expand Up @@ -295,6 +296,7 @@ void RankWorker::thread_loop() {
std::string local_param_name;
infinicore::Tensor local_param;
std::unordered_map<std::string, infinicore::Tensor> local_params;
bool local_params_strict = true;
Input local_args;
std::unique_ptr<cache::CacheConfig> local_cache_config;

Expand All @@ -314,6 +316,8 @@ void RankWorker::thread_loop() {
local_param = pending_param_;
} else if (local_cmd == Command::LOAD_BATCH) {
local_params = std::move(pending_params_);
local_params_strict = pending_params_strict_;
pending_params_strict_ = true;
pending_params_.clear();
} else if (local_cmd == Command::PREPROCESS) {

Expand Down Expand Up @@ -353,7 +357,7 @@ void RankWorker::thread_loop() {

} else if (local_cmd == Command::LOAD_BATCH) {
try {
model_->load_parameters_no_sync(local_params);
model_->load_parameters_no_sync(local_params, local_params_strict);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

等价于这个写法么 model_->load_parameters_no_sync(local_params, strict);

infinicore::context::syncStream();
} catch (const std::exception &e) {
{
Expand Down
3 changes: 2 additions & 1 deletion csrc/engine/rank_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class RankWorker {
void load_param(const std::string &name,
const infinicore::Tensor &param);

void load_params(const std::unordered_map<std::string, infinicore::Tensor> &params);
void load_params(const std::unordered_map<std::string, infinicore::Tensor> &params, bool strict = true);

void process_weights_after_loading();

Expand Down Expand Up @@ -148,6 +148,7 @@ class RankWorker {
std::string pending_param_name_;
infinicore::Tensor pending_param_;
std::unordered_map<std::string, infinicore::Tensor> pending_params_;
bool pending_params_strict_ = true;
Input pending_args_;
std::unique_ptr<cache::CacheConfig> pending_cache_config_;

Expand Down
17 changes: 16 additions & 1 deletion csrc/layers/causal_lm_templates/text_decoder_layer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "infinicore/ops.hpp"
#include "infinicore/tensor.hpp"
#include <memory>
#include <type_traits>
#include <tuple>
namespace infinilm::layers::causal_lm_templates {

Expand All @@ -32,7 +33,7 @@ class TextDecoderLayer : public infinicore::nn::Module {
post_attention_layernorm_ = this->register_module<infinicore::nn::RMSNorm>("post_attention_layernorm", hidden_size, rms_norm_eps, dtype, device);

self_attn_ = this->register_module<Attention>("self_attn", model_config, layer_idx, device);
mlp_ = this->register_module<MLP>("mlp", model_config, device);
mlp_ = register_mlp(model_config, layer_idx, device);
}

std::tuple<infinicore::Tensor, infinicore::Tensor> forward(const infinicore::Tensor &positions,
Expand Down Expand Up @@ -68,6 +69,20 @@ class TextDecoderLayer : public infinicore::nn::Module {
INFINICORE_NN_MODULE(MLP, mlp);

size_t layer_idx_;

private:
std::shared_ptr<MLP> register_mlp(std::shared_ptr<infinilm::config::ModelConfig> model_config,
size_t layer_idx,
const infinicore::Device &device) {
if constexpr (std::is_constructible_v<MLP,
std::shared_ptr<infinilm::config::ModelConfig>,
size_t,
const infinicore::Device &>) {
return this->register_module<MLP>("mlp", model_config, layer_idx, device);
} else {
return this->register_module<MLP>("mlp", model_config, device);
}
}
};

} // namespace infinilm::layers::causal_lm_templates
2 changes: 0 additions & 2 deletions csrc/layers/common_modules.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once
#include "mlp/mlp.hpp"
#include "mlp/moe_mlp.hpp"

#include "attention/attention.hpp"
#include "causal_lm_templates/text_causal_lm.hpp"
Expand All @@ -12,7 +11,6 @@
namespace infinilm::layers {

using MLP = infinilm::layers::mlp::MLP;
using MoeMLP = infinilm::layers::moe_mlp::MoeMLP;

namespace attention {

Expand Down
93 changes: 93 additions & 0 deletions csrc/layers/moe/common/moe_types.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#pragma once

#include "topk_output.hpp"

#include "infinicore/tensor.hpp"

#include <cstddef>

namespace infinilm::layers::moe {

enum class DispatchOutputFormat {
Standard,
DeepEPNormal,
DeepEPLL,
};

enum class CombineInputFormat {
Standard,
DeepEPNormal,
DeepEPLL,
};

struct DispatchOutput {
DispatchOutputFormat format = DispatchOutputFormat::Standard;
infinicore::Tensor hidden_states;
infinicore::Tensor hidden_states_scale;
TopKOutput topk_output;
infinicore::Tensor expert_map;
};

struct MoeRoutingMetadata {
infinicore::Tensor sorted_token_ids;
infinicore::Tensor expert_ids;
infinicore::Tensor num_tokens_post_padded;

infinicore::Tensor expert_offsets;
infinicore::Tensor blockscale_offsets;
infinicore::Tensor problem_sizes1;
infinicore::Tensor problem_sizes2;
infinicore::Tensor input_permutation;
infinicore::Tensor output_permutation;

bool has_grouped_gemm_metadata = false;
};

struct CombineInput {
CombineInputFormat format = CombineInputFormat::Standard;
infinicore::Tensor hidden_states;
TopKOutput topk_output;
MoeRoutingMetadata routing_metadata;
};

struct MoeWeights {
infinicore::Tensor packed_w13;
infinicore::Tensor packed_w2;

bool empty() const {
return !packed_w13 && !packed_w2;
}

bool has_packed_dense_weights() const {
return packed_w13 && packed_w2;
}
};

struct MoeWorkspace {
infinicore::Tensor ep_gathered_hidden_states;
infinicore::Tensor ep_gathered_topk_weights;
infinicore::Tensor ep_gathered_topk_ids;
infinicore::Tensor ep_reduced_hidden_states;
infinicore::Tensor fused_moe_output;

infinicore::Tensor sorted_token_ids;
infinicore::Tensor expert_ids;
infinicore::Tensor num_tokens_post_padded;
infinicore::Tensor expert_offsets;
infinicore::Tensor blockscale_offsets;
infinicore::Tensor problem_sizes1;
infinicore::Tensor problem_sizes2;
infinicore::Tensor input_permutation;
infinicore::Tensor output_permutation;

size_t sorted_token_ids_capacity = 0;
size_t expert_ids_capacity = 0;
size_t ep_gathered_tokens_capacity = 0;
size_t ep_reduced_tokens_capacity = 0;
size_t fused_moe_output_tokens_capacity = 0;
size_t blockscale_offsets_capacity = 0;
size_t permutation_capacity = 0;
size_t prepared_num_experts = 0;
};

} // namespace infinilm::layers::moe
13 changes: 13 additions & 0 deletions csrc/layers/moe/common/topk_output.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

#include "infinicore/tensor.hpp"

namespace infinilm::layers::moe {

struct TopKOutput {
infinicore::Tensor topk_weights;
infinicore::Tensor topk_ids;
infinicore::Tensor router_logits;
};

} // namespace infinilm::layers::moe
Loading