Skip to content

vllm.model_executor.models.qwen3_next_mtp

Inference-only Qwen3Next MTP model.

KVCache module-attribute

KVCache = tuple[Tensor, Tensor]

logger module-attribute

logger = init_logger(__name__)

Qwen3NextMTP

Bases: Module, SupportsPP

Source code in vllm/model_executor/models/qwen3_next_mtp.py
@support_torch_compile
class Qwen3NextMTP(nn.Module, SupportsPP):
    packed_modules_mapping = {
        "qkv_proj": [
            "q_proj",
            "k_proj",
            "v_proj",
        ],
        "gate_up_proj": ["up_proj", "down_proj"]
    }

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        config = vllm_config.model_config.hf_config
        self.vllm_config = vllm_config
        cache_config = vllm_config.cache_config
        assert not cache_config.enable_prefix_caching, \
            "Qwen3NextMTP currently does not support prefix caching"

        self.quant_config = vllm_config.quant_config

        super().__init__()
        self.config = config
        self.model = Qwen3NextMultiTokenPredictor(vllm_config=vllm_config,
                                                  prefix=maybe_prefix(
                                                      prefix, "mtp"))
        self.unpadded_vocab_size = config.vocab_size
        self.lm_head = ParallelLMHead(self.unpadded_vocab_size,
                                      config.hidden_size,
                                      org_num_embeddings=config.vocab_size,
                                      padding_size=DEFAULT_VOCAB_PADDING_SIZE,
                                      prefix=maybe_prefix(prefix, "lm_head"))
        self.logits_processor = LogitsProcessor(self.unpadded_vocab_size,
                                                config.vocab_size)
        self.make_empty_intermediate_tensors = (
            self.model.make_empty_intermediate_tensors)

    def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor:
        return self.model.get_input_embeddings(input_ids)

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
        intermediate_tensors: Optional[IntermediateTensors] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        **kwargs: object,
    ):
        hidden_states = self.model(input_ids, positions, hidden_states,
                                   intermediate_tensors, inputs_embeds)
        return hidden_states

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
        spec_step_idx: int = 0,
    ) -> Optional[torch.Tensor]:
        return self.logits_processor(self.lm_head, hidden_states)

    def load_weights(self, weights: Iterable[tuple[str,
                                                   torch.Tensor]]) -> set[str]:
        shared_weight_names = ["embed_tokens", "lm_head"]

        def remap_weight_names(weights):
            for name, weight in weights:
                if name.startswith("mtp."):
                    name = name.replace("mtp.", "model.")
                elif not any(key in name for key in shared_weight_names):
                    continue
                yield name, weight

        loader = AutoWeightsLoader(self)
        return loader.load_weights(remap_weight_names(weights))

config instance-attribute

config = config

lm_head instance-attribute

lm_head = ParallelLMHead(
    unpadded_vocab_size,
    hidden_size,
    org_num_embeddings=vocab_size,
    padding_size=DEFAULT_VOCAB_PADDING_SIZE,
    prefix=maybe_prefix(prefix, "lm_head"),
)

logits_processor instance-attribute

logits_processor = LogitsProcessor(
    unpadded_vocab_size, vocab_size
)

make_empty_intermediate_tensors instance-attribute

make_empty_intermediate_tensors = (
    make_empty_intermediate_tensors
)

model instance-attribute

model = Qwen3NextMultiTokenPredictor(
    vllm_config=vllm_config,
    prefix=maybe_prefix(prefix, "mtp"),
)

packed_modules_mapping class-attribute instance-attribute

packed_modules_mapping = {
    "qkv_proj": ["q_proj", "k_proj", "v_proj"],
    "gate_up_proj": ["up_proj", "down_proj"],
}

quant_config instance-attribute

quant_config = quant_config

unpadded_vocab_size instance-attribute

unpadded_vocab_size = vocab_size

vllm_config instance-attribute

vllm_config = vllm_config

__init__

__init__(*, vllm_config: VllmConfig, prefix: str = '')
Source code in vllm/model_executor/models/qwen3_next_mtp.py
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
    config = vllm_config.model_config.hf_config
    self.vllm_config = vllm_config
    cache_config = vllm_config.cache_config
    assert not cache_config.enable_prefix_caching, \
        "Qwen3NextMTP currently does not support prefix caching"

    self.quant_config = vllm_config.quant_config

    super().__init__()
    self.config = config
    self.model = Qwen3NextMultiTokenPredictor(vllm_config=vllm_config,
                                              prefix=maybe_prefix(
                                                  prefix, "mtp"))
    self.unpadded_vocab_size = config.vocab_size
    self.lm_head = ParallelLMHead(self.unpadded_vocab_size,
                                  config.hidden_size,
                                  org_num_embeddings=config.vocab_size,
                                  padding_size=DEFAULT_VOCAB_PADDING_SIZE,
                                  prefix=maybe_prefix(prefix, "lm_head"))
    self.logits_processor = LogitsProcessor(self.unpadded_vocab_size,
                                            config.vocab_size)
    self.make_empty_intermediate_tensors = (
        self.model.make_empty_intermediate_tensors)

compute_logits

compute_logits(
    hidden_states: Tensor, spec_step_idx: int = 0
) -> Optional[Tensor]
Source code in vllm/model_executor/models/qwen3_next_mtp.py
def compute_logits(
    self,
    hidden_states: torch.Tensor,
    spec_step_idx: int = 0,
) -> Optional[torch.Tensor]:
    return self.logits_processor(self.lm_head, hidden_states)

forward

forward(
    input_ids: Tensor,
    positions: Tensor,
    hidden_states: Tensor,
    intermediate_tensors: Optional[
        IntermediateTensors
    ] = None,
    inputs_embeds: Optional[Tensor] = None,
    **kwargs: object,
)
Source code in vllm/model_executor/models/qwen3_next_mtp.py
def forward(
    self,
    input_ids: torch.Tensor,
    positions: torch.Tensor,
    hidden_states: torch.Tensor,
    intermediate_tensors: Optional[IntermediateTensors] = None,
    inputs_embeds: Optional[torch.Tensor] = None,
    **kwargs: object,
):
    hidden_states = self.model(input_ids, positions, hidden_states,
                               intermediate_tensors, inputs_embeds)
    return hidden_states

get_input_embeddings

get_input_embeddings(input_ids: Tensor) -> Tensor
Source code in vllm/model_executor/models/qwen3_next_mtp.py
def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor:
    return self.model.get_input_embeddings(input_ids)

load_weights

load_weights(
    weights: Iterable[tuple[str, Tensor]],
) -> set[str]
Source code in vllm/model_executor/models/qwen3_next_mtp.py
def load_weights(self, weights: Iterable[tuple[str,
                                               torch.Tensor]]) -> set[str]:
    shared_weight_names = ["embed_tokens", "lm_head"]

    def remap_weight_names(weights):
        for name, weight in weights:
            if name.startswith("mtp."):
                name = name.replace("mtp.", "model.")
            elif not any(key in name for key in shared_weight_names):
                continue
            yield name, weight

    loader = AutoWeightsLoader(self)
    return loader.load_weights(remap_weight_names(weights))

Qwen3NextMultiTokenPredictor

Bases: Module

Source code in vllm/model_executor/models/qwen3_next_mtp.py
@support_torch_compile
class Qwen3NextMultiTokenPredictor(nn.Module):

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()

        model_config = vllm_config.model_config
        quant_config = vllm_config.quant_config
        lora_config = vllm_config.lora_config
        config: Qwen3NextConfig = model_config.hf_config

        self.config = config
        lora_vocab = ((lora_config.lora_extra_vocab_size *
                       (lora_config.max_loras or 1)) if lora_config else 0)
        self.vocab_size = config.vocab_size + lora_vocab
        self.org_vocab_size = config.vocab_size

        self.mtp_start_layer_idx = config.num_hidden_layers
        self.num_mtp_layers = getattr(config, "num_nextn_predict_layers", 1)

        self.embed_tokens = VocabParallelEmbedding(
            self.vocab_size,
            config.hidden_size,
            org_num_embeddings=config.vocab_size,
        )

        self.fc = ColumnParallelLinear(self.config.hidden_size * 2,
                                       self.config.hidden_size,
                                       gather_output=True,
                                       bias=False,
                                       return_bias=False,
                                       quant_config=quant_config,
                                       prefix=f'{prefix}.fc')

        self.layers = torch.nn.ModuleList(
            Qwen3NextDecoderLayer(
                vllm_config,
                layer_type="full_attention",
                prefix=f'{prefix}.layers.{idx}',
            ) for idx in range(self.num_mtp_layers))

        self.make_empty_intermediate_tensors = (
            make_empty_intermediate_tensors_factory(
                ["hidden_states", "residual"], config.hidden_size))

        self.norm = Qwen3NextRMSNorm(config.hidden_size,
                                     eps=config.rms_norm_eps)
        self.pre_fc_norm_hidden = Qwen3NextRMSNorm(config.hidden_size,
                                                   eps=config.rms_norm_eps)
        self.pre_fc_norm_embedding = Qwen3NextRMSNorm(config.hidden_size,
                                                      eps=config.rms_norm_eps)

    def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor:
        return self.embed_tokens(input_ids)

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
        intermediate_tensors: Optional[IntermediateTensors] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        spec_step_idx: int = 0,
    ) -> torch.Tensor:
        if get_pp_group().is_first_rank:
            if inputs_embeds is None:
                inputs_embeds = self.get_input_embeddings(input_ids)
            assert hidden_states.shape[-1] == inputs_embeds.shape[-1]
            inputs_embeds = self.pre_fc_norm_embedding(inputs_embeds)
            hidden_states = self.pre_fc_norm_hidden(hidden_states)
            hidden_states = torch.cat([inputs_embeds, hidden_states], dim=-1)
            hidden_states = self.fc(hidden_states)
            residual = None
        else:
            assert intermediate_tensors is not None
            hidden_states = intermediate_tensors["hidden_states"]
            residual = intermediate_tensors["residual"]

        current_step_idx = (spec_step_idx % self.num_mtp_layers)
        hidden_states, residual = self.layers[current_step_idx](
            positions=positions,
            hidden_states=hidden_states,
            residual=residual,
        )

        if not get_pp_group().is_last_rank:
            return IntermediateTensors({
                "hidden_states": hidden_states,
                "residual": residual
            })

        hidden_states, _ = self.norm(hidden_states, residual)
        return hidden_states

    def load_weights(self, weights: Iterable[tuple[str,
                                                   torch.Tensor]]) -> set[str]:
        stacked_params_mapping = [
            # (param_name, shard_name, shard_id)
            ("qkv_proj", "q_proj", "q"),
            ("qkv_proj", "k_proj", "k"),
            ("qkv_proj", "v_proj", "v"),
            ("gate_up_proj", "gate_proj", 0),
            ("gate_up_proj", "up_proj", 1),
        ]

        # Params for weights, fp8 weight scales, fp8 activation scales
        # (param_name, weight_name, expert_id, shard_id)
        expert_params_mapping = FusedMoE.make_expert_params_mapping(
            ckpt_gate_proj_name="gate_proj",
            ckpt_down_proj_name="down_proj",
            ckpt_up_proj_name="up_proj",
            num_experts=self.config.num_experts)

        params_dict = dict(self.named_parameters())
        loaded_params: set[str] = set()
        for name, loaded_weight in weights:
            if "rotary_emb.inv_freq" in name:
                continue

            for param_name, weight_name, shard_id in stacked_params_mapping:
                if weight_name not in name:
                    continue

                if "mlp.experts" in name:
                    continue

                name = name.replace(weight_name, param_name)
                # Skip loading extra bias for GPTQ models.
                if name.endswith(".bias") and name not in params_dict:
                    continue
                # Skip layers on other devices.
                if is_pp_missing_parameter(name, self):
                    continue
                if name not in params_dict:
                    continue
                param = params_dict[name]
                weight_loader = param.weight_loader
                weight_loader(param, loaded_weight, shard_id)
                break
            else:
                for mapping in expert_params_mapping:
                    param_name, weight_name, expert_id, shard_id = mapping
                    if weight_name not in name:
                        continue
                    name = name.replace(weight_name, param_name)
                    # Skip layers on other devices.
                    if is_pp_missing_parameter(name, self):
                        continue
                    # Skip loading extra bias for GPTQ models.
                    if ((name.endswith(".bias") or name.endswith("_bias"))
                            and name not in params_dict):
                        continue
                    param = params_dict[name]
                    weight_loader = param.weight_loader
                    weight_loader(param,
                                  loaded_weight,
                                  name,
                                  shard_id=shard_id,
                                  expert_id=expert_id)
                    break
                else:
                    # Skip loading extra bias for GPTQ models.
                    if name.endswith(".bias") and name not in params_dict:
                        continue
                    if is_pp_missing_parameter(name, self):
                        continue

                    param = params_dict[name]
                    weight_loader = getattr(param, "weight_loader",
                                            default_weight_loader)
                    weight_loader(param, loaded_weight)
            loaded_params.add(name)
        return loaded_params

config instance-attribute

config = config

embed_tokens instance-attribute

embed_tokens = VocabParallelEmbedding(
    vocab_size, hidden_size, org_num_embeddings=vocab_size
)

fc instance-attribute

fc = ColumnParallelLinear(
    hidden_size * 2,
    hidden_size,
    gather_output=True,
    bias=False,
    return_bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.fc",
)

layers instance-attribute

layers = ModuleList(
    (
        Qwen3NextDecoderLayer(
            vllm_config,
            layer_type="full_attention",
            prefix=f"{prefix}.layers.{idx}",
        )
    )
    for idx in (range(num_mtp_layers))
)

make_empty_intermediate_tensors instance-attribute

make_empty_intermediate_tensors = (
    make_empty_intermediate_tensors_factory(
        ["hidden_states", "residual"], hidden_size
    )
)

mtp_start_layer_idx instance-attribute

mtp_start_layer_idx = num_hidden_layers

norm instance-attribute

norm = Qwen3NextRMSNorm(hidden_size, eps=rms_norm_eps)

num_mtp_layers instance-attribute

num_mtp_layers = getattr(
    config, "num_nextn_predict_layers", 1
)

org_vocab_size instance-attribute

org_vocab_size = vocab_size

pre_fc_norm_embedding instance-attribute

pre_fc_norm_embedding = Qwen3NextRMSNorm(
    hidden_size, eps=rms_norm_eps
)

pre_fc_norm_hidden instance-attribute

pre_fc_norm_hidden = Qwen3NextRMSNorm(
    hidden_size, eps=rms_norm_eps
)

vocab_size instance-attribute

vocab_size = vocab_size + lora_vocab

__init__

__init__(*, vllm_config: VllmConfig, prefix: str = '')
Source code in vllm/model_executor/models/qwen3_next_mtp.py
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
    super().__init__()

    model_config = vllm_config.model_config
    quant_config = vllm_config.quant_config
    lora_config = vllm_config.lora_config
    config: Qwen3NextConfig = model_config.hf_config

    self.config = config
    lora_vocab = ((lora_config.lora_extra_vocab_size *
                   (lora_config.max_loras or 1)) if lora_config else 0)
    self.vocab_size = config.vocab_size + lora_vocab
    self.org_vocab_size = config.vocab_size

    self.mtp_start_layer_idx = config.num_hidden_layers
    self.num_mtp_layers = getattr(config, "num_nextn_predict_layers", 1)

    self.embed_tokens = VocabParallelEmbedding(
        self.vocab_size,
        config.hidden_size,
        org_num_embeddings=config.vocab_size,
    )

    self.fc = ColumnParallelLinear(self.config.hidden_size * 2,
                                   self.config.hidden_size,
                                   gather_output=True,
                                   bias=False,
                                   return_bias=False,
                                   quant_config=quant_config,
                                   prefix=f'{prefix}.fc')

    self.layers = torch.nn.ModuleList(
        Qwen3NextDecoderLayer(
            vllm_config,
            layer_type="full_attention",
            prefix=f'{prefix}.layers.{idx}',
        ) for idx in range(self.num_mtp_layers))

    self.make_empty_intermediate_tensors = (
        make_empty_intermediate_tensors_factory(
            ["hidden_states", "residual"], config.hidden_size))

    self.norm = Qwen3NextRMSNorm(config.hidden_size,
                                 eps=config.rms_norm_eps)
    self.pre_fc_norm_hidden = Qwen3NextRMSNorm(config.hidden_size,
                                               eps=config.rms_norm_eps)
    self.pre_fc_norm_embedding = Qwen3NextRMSNorm(config.hidden_size,
                                                  eps=config.rms_norm_eps)

forward

forward(
    input_ids: Tensor,
    positions: Tensor,
    hidden_states: Tensor,
    intermediate_tensors: Optional[
        IntermediateTensors
    ] = None,
    inputs_embeds: Optional[Tensor] = None,
    spec_step_idx: int = 0,
) -> Tensor
Source code in vllm/model_executor/models/qwen3_next_mtp.py
def forward(
    self,
    input_ids: torch.Tensor,
    positions: torch.Tensor,
    hidden_states: torch.Tensor,
    intermediate_tensors: Optional[IntermediateTensors] = None,
    inputs_embeds: Optional[torch.Tensor] = None,
    spec_step_idx: int = 0,
) -> torch.Tensor:
    if get_pp_group().is_first_rank:
        if inputs_embeds is None:
            inputs_embeds = self.get_input_embeddings(input_ids)
        assert hidden_states.shape[-1] == inputs_embeds.shape[-1]
        inputs_embeds = self.pre_fc_norm_embedding(inputs_embeds)
        hidden_states = self.pre_fc_norm_hidden(hidden_states)
        hidden_states = torch.cat([inputs_embeds, hidden_states], dim=-1)
        hidden_states = self.fc(hidden_states)
        residual = None
    else:
        assert intermediate_tensors is not None
        hidden_states = intermediate_tensors["hidden_states"]
        residual = intermediate_tensors["residual"]

    current_step_idx = (spec_step_idx % self.num_mtp_layers)
    hidden_states, residual = self.layers[current_step_idx](
        positions=positions,
        hidden_states=hidden_states,
        residual=residual,
    )

    if not get_pp_group().is_last_rank:
        return IntermediateTensors({
            "hidden_states": hidden_states,
            "residual": residual
        })

    hidden_states, _ = self.norm(hidden_states, residual)
    return hidden_states

get_input_embeddings

get_input_embeddings(input_ids: Tensor) -> Tensor
Source code in vllm/model_executor/models/qwen3_next_mtp.py
def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor:
    return self.embed_tokens(input_ids)

load_weights

load_weights(
    weights: Iterable[tuple[str, Tensor]],
) -> set[str]
Source code in vllm/model_executor/models/qwen3_next_mtp.py
def load_weights(self, weights: Iterable[tuple[str,
                                               torch.Tensor]]) -> set[str]:
    stacked_params_mapping = [
        # (param_name, shard_name, shard_id)
        ("qkv_proj", "q_proj", "q"),
        ("qkv_proj", "k_proj", "k"),
        ("qkv_proj", "v_proj", "v"),
        ("gate_up_proj", "gate_proj", 0),
        ("gate_up_proj", "up_proj", 1),
    ]

    # Params for weights, fp8 weight scales, fp8 activation scales
    # (param_name, weight_name, expert_id, shard_id)
    expert_params_mapping = FusedMoE.make_expert_params_mapping(
        ckpt_gate_proj_name="gate_proj",
        ckpt_down_proj_name="down_proj",
        ckpt_up_proj_name="up_proj",
        num_experts=self.config.num_experts)

    params_dict = dict(self.named_parameters())
    loaded_params: set[str] = set()
    for name, loaded_weight in weights:
        if "rotary_emb.inv_freq" in name:
            continue

        for param_name, weight_name, shard_id in stacked_params_mapping:
            if weight_name not in name:
                continue

            if "mlp.experts" in name:
                continue

            name = name.replace(weight_name, param_name)
            # Skip loading extra bias for GPTQ models.
            if name.endswith(".bias") and name not in params_dict:
                continue
            # Skip layers on other devices.
            if is_pp_missing_parameter(name, self):
                continue
            if name not in params_dict:
                continue
            param = params_dict[name]
            weight_loader = param.weight_loader
            weight_loader(param, loaded_weight, shard_id)
            break
        else:
            for mapping in expert_params_mapping:
                param_name, weight_name, expert_id, shard_id = mapping
                if weight_name not in name:
                    continue
                name = name.replace(weight_name, param_name)
                # Skip layers on other devices.
                if is_pp_missing_parameter(name, self):
                    continue
                # Skip loading extra bias for GPTQ models.
                if ((name.endswith(".bias") or name.endswith("_bias"))
                        and name not in params_dict):
                    continue
                param = params_dict[name]
                weight_loader = param.weight_loader
                weight_loader(param,
                              loaded_weight,
                              name,
                              shard_id=shard_id,
                              expert_id=expert_id)
                break
            else:
                # Skip loading extra bias for GPTQ models.
                if name.endswith(".bias") and name not in params_dict:
                    continue
                if is_pp_missing_parameter(name, self):
                    continue

                param = params_dict[name]
                weight_loader = getattr(param, "weight_loader",
                                        default_weight_loader)
                weight_loader(param, loaded_weight)
        loaded_params.add(name)
    return loaded_params