Skip to content

vllm.model_executor.models.nano_nemotron_vl

IMG_CONTEXT module-attribute

IMG_CONTEXT = '<image>'

IMG_END module-attribute

IMG_END = '</img>'

IMG_START module-attribute

IMG_START = '<img>'

MAX_FRAMES module-attribute

MAX_FRAMES = 16

NanoNemotronVLImageInputs module-attribute

NanoNemotronVLVideoInputs module-attribute

_I module-attribute

BaseNanoNemotronVLProcessingInfo

Bases: BaseProcessingInfo

Basic image-only ProcessingInfo for InternVL-style models.

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class BaseNanoNemotronVLProcessingInfo(BaseProcessingInfo):
    """Basic image-only ProcessingInfo for InternVL-style models."""

    @abstractmethod
    def get_hf_processor(
        self,
        **kwargs: object,
    ) -> BaseNanoNemotronVLProcessor:
        raise NotImplementedError

    def get_supported_mm_limits(self) -> Mapping[str, Optional[int]]:
        return {"image": None}

    def get_num_image_tokens(
        self,
        *,
        image_width: int,
        image_height: int,
        max_num_tiles: int,
        processor: Optional[BaseNanoNemotronVLProcessor],
    ) -> int:
        if processor is None:
            processor = self.get_hf_processor()

        return processor.get_num_image_tokens(
            image_width=image_width,
            image_height=image_height,
            max_num_tiles=max_num_tiles,
        )

    def get_image_size_with_most_features(self,
                                          max_num_tiles: int) -> ImageSize:
        processor = self.get_hf_processor()

        base_size = processor.image_size
        target_ratios = get_internvl_target_ratios(1, max_num_tiles)

        largest_feature_size, largest_feature_pinpoint = 0, None
        for wr, hr in target_ratios:
            width, height = base_size * wr, base_size * hr

            feat_size = self.get_num_image_tokens(
                image_width=width,
                image_height=height,
                max_num_tiles=max_num_tiles,
                processor=processor,
            )
            if feat_size > largest_feature_size:
                largest_feature_size = feat_size
                largest_feature_pinpoint = ImageSize(width=width,
                                                     height=height)

        if largest_feature_size == 0 or largest_feature_pinpoint is None:
            raise ValueError("Cannot have a largest feature size of 0!")

        return largest_feature_pinpoint

    def get_max_image_tokens(self) -> int:
        processor = self.get_hf_processor()
        # Use default max_num_tiles for max tokens calculation
        max_num_tiles = 12
        target_width, target_height = self.get_image_size_with_most_features(
            max_num_tiles)

        return self.get_num_image_tokens(
            image_width=target_width,
            image_height=target_height,
            max_num_tiles=max_num_tiles,
            processor=processor,
        )

get_hf_processor abstractmethod

get_hf_processor(
    **kwargs: object,
) -> BaseNanoNemotronVLProcessor
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@abstractmethod
def get_hf_processor(
    self,
    **kwargs: object,
) -> BaseNanoNemotronVLProcessor:
    raise NotImplementedError

get_image_size_with_most_features

get_image_size_with_most_features(
    max_num_tiles: int,
) -> ImageSize
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_image_size_with_most_features(self,
                                      max_num_tiles: int) -> ImageSize:
    processor = self.get_hf_processor()

    base_size = processor.image_size
    target_ratios = get_internvl_target_ratios(1, max_num_tiles)

    largest_feature_size, largest_feature_pinpoint = 0, None
    for wr, hr in target_ratios:
        width, height = base_size * wr, base_size * hr

        feat_size = self.get_num_image_tokens(
            image_width=width,
            image_height=height,
            max_num_tiles=max_num_tiles,
            processor=processor,
        )
        if feat_size > largest_feature_size:
            largest_feature_size = feat_size
            largest_feature_pinpoint = ImageSize(width=width,
                                                 height=height)

    if largest_feature_size == 0 or largest_feature_pinpoint is None:
        raise ValueError("Cannot have a largest feature size of 0!")

    return largest_feature_pinpoint

get_max_image_tokens

get_max_image_tokens() -> int
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_max_image_tokens(self) -> int:
    processor = self.get_hf_processor()
    # Use default max_num_tiles for max tokens calculation
    max_num_tiles = 12
    target_width, target_height = self.get_image_size_with_most_features(
        max_num_tiles)

    return self.get_num_image_tokens(
        image_width=target_width,
        image_height=target_height,
        max_num_tiles=max_num_tiles,
        processor=processor,
    )

get_num_image_tokens

get_num_image_tokens(
    *,
    image_width: int,
    image_height: int,
    max_num_tiles: int,
    processor: Optional[BaseNanoNemotronVLProcessor],
) -> int
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_num_image_tokens(
    self,
    *,
    image_width: int,
    image_height: int,
    max_num_tiles: int,
    processor: Optional[BaseNanoNemotronVLProcessor],
) -> int:
    if processor is None:
        processor = self.get_hf_processor()

    return processor.get_num_image_tokens(
        image_width=image_width,
        image_height=image_height,
        max_num_tiles=max_num_tiles,
    )

get_supported_mm_limits

get_supported_mm_limits() -> Mapping[str, Optional[int]]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_supported_mm_limits(self) -> Mapping[str, Optional[int]]:
    return {"image": None}

BaseNanoNemotronVLProcessor

Bases: ABC

This model doesn't define its own HF processor, so we implement our own one here.

The code to insert image tokens is based on: https://huggingface.co/OpenGVLab/InternVL2-1B/blob/main/modeling_internvl_chat.py#L252

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class BaseNanoNemotronVLProcessor(ABC):
    """
    This model doesn't define its own HF processor,
    so we implement our own one here.

    The code to insert image tokens is based on:
    https://huggingface.co/OpenGVLab/InternVL2-1B/blob/main/modeling_internvl_chat.py#L252
    """

    def __init__(self, config: PretrainedConfig, tokenizer: AnyTokenizer,
                 *args, **kwargs) -> None:
        super().__init__()

        self.config = config
        self.tokenizer = tokenizer

        image_size: int = config.force_image_size
        patch_size: int = config.patch_size

        self.num_image_token = int(
            (image_size // patch_size)**2 * (config.downsample_ratio**2))
        self.image_size = image_size
        self.use_thumbnail: bool = config.use_thumbnail
        self.norm_mean = torch.Tensor(config.norm_mean).reshape(1, 3, 1, 1)
        self.norm_std = torch.Tensor(config.norm_std).reshape(1, 3, 1, 1)

    @property
    @abstractmethod
    def image_token_id(self) -> int:
        raise NotImplementedError

    @abstractmethod
    def get_image_repl(
        self,
        feature_size: int,
        num_patches: Optional[int],
    ) -> PromptUpdateDetails[str]:
        raise NotImplementedError

    def get_num_image_tokens(
        self,
        *,
        image_width: int,
        image_height: int,
        max_num_tiles: int,
    ) -> int:
        target_ratios = get_internvl_target_ratios(1, max_num_tiles)

        num_patches, _, _ = calculate_internvl_targets(
            orig_width=image_width,
            orig_height=image_height,
            target_ratios=target_ratios,
            image_size=self.image_size,
            use_thumbnail=self.use_thumbnail,
        )

        return num_patches * self.num_image_token

    def _images_to_pixel_values_lst(
        self,
        images: list[Image.Image],
        max_num_tiles: int,
    ) -> list[torch.Tensor]:
        return [
            image_to_pixel_values(
                image,
                input_size=self.image_size,
                max_num=max_num_tiles,
                use_thumbnail=self.use_thumbnail,
                idx=idx,
            ) for idx, image in enumerate(images)
        ]

    def _preprocess_image(
        self,
        text: list[str],
        images: list[Image.Image],
        max_num_tiles: int,
    ) -> tuple[list[str], dict[str, torch.Tensor]]:
        if len(images) == 0:
            image_inputs = {}
        else:
            pixel_values_lst = self._images_to_pixel_values_lst(
                images, max_num_tiles)
            image_inputs: dict[str, NestedTensors] = {
                "pixel_values_flat":
                torch.cat(pixel_values_lst),
                "image_num_patches":
                torch.tensor([len(item) for item in pixel_values_lst]),
            }

            for pixel_values in pixel_values_lst:
                num_patches = pixel_values.shape[0]
                feature_size = num_patches * self.num_image_token
                image_repl = self.get_image_repl(feature_size, num_patches)
                text = [t.replace('<image>', image_repl.full, 1) for t in text]
        return text, image_inputs

    def _make_batch_input(self,
                          input_item: Optional[Union[Any, list[Any]]] = None):
        if input_item is None:
            input_item = []
        if not isinstance(input_item, list):
            input_item = [input_item]
        return input_item

    def __call__(
        self,
        text: Optional[Union[str, list[str]]] = None,
        images: Optional[Union[Image.Image, list[Image.Image]]] = None,
        return_tensors: Optional[Union[str, TensorType]] = None,
        max_num_tiles: Optional[int] = None,
    ) -> Mapping[str, NestedTensors]:
        # Use default if not provided
        if max_num_tiles is None:
            max_num_tiles = 12

        text, images = [self._make_batch_input(x) for x in (text, images)]

        text, image_inputs = self._preprocess_image(
            text=text,
            images=images,
            max_num_tiles=max_num_tiles,
        )

        text_inputs = self.tokenizer(text, add_special_tokens=False)

        return {
            **BatchEncoding(text_inputs, tensor_type=return_tensors),
            **image_inputs,
        }

config instance-attribute

config = config

image_size instance-attribute

image_size = image_size

image_token_id abstractmethod property

image_token_id: int

norm_mean instance-attribute

norm_mean = reshape(1, 3, 1, 1)

norm_std instance-attribute

norm_std = reshape(1, 3, 1, 1)

num_image_token instance-attribute

num_image_token = int(
    (image_size // patch_size) ** 2 * downsample_ratio**2
)

tokenizer instance-attribute

tokenizer = tokenizer

use_thumbnail instance-attribute

use_thumbnail: bool = use_thumbnail

__call__

__call__(
    text: Optional[Union[str, list[str]]] = None,
    images: Optional[Union[Image, list[Image]]] = None,
    return_tensors: Optional[Union[str, TensorType]] = None,
    max_num_tiles: Optional[int] = None,
) -> Mapping[str, NestedTensors]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __call__(
    self,
    text: Optional[Union[str, list[str]]] = None,
    images: Optional[Union[Image.Image, list[Image.Image]]] = None,
    return_tensors: Optional[Union[str, TensorType]] = None,
    max_num_tiles: Optional[int] = None,
) -> Mapping[str, NestedTensors]:
    # Use default if not provided
    if max_num_tiles is None:
        max_num_tiles = 12

    text, images = [self._make_batch_input(x) for x in (text, images)]

    text, image_inputs = self._preprocess_image(
        text=text,
        images=images,
        max_num_tiles=max_num_tiles,
    )

    text_inputs = self.tokenizer(text, add_special_tokens=False)

    return {
        **BatchEncoding(text_inputs, tensor_type=return_tensors),
        **image_inputs,
    }

__init__

__init__(
    config: PretrainedConfig,
    tokenizer: AnyTokenizer,
    *args,
    **kwargs,
) -> None
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __init__(self, config: PretrainedConfig, tokenizer: AnyTokenizer,
             *args, **kwargs) -> None:
    super().__init__()

    self.config = config
    self.tokenizer = tokenizer

    image_size: int = config.force_image_size
    patch_size: int = config.patch_size

    self.num_image_token = int(
        (image_size // patch_size)**2 * (config.downsample_ratio**2))
    self.image_size = image_size
    self.use_thumbnail: bool = config.use_thumbnail
    self.norm_mean = torch.Tensor(config.norm_mean).reshape(1, 3, 1, 1)
    self.norm_std = torch.Tensor(config.norm_std).reshape(1, 3, 1, 1)

_images_to_pixel_values_lst

_images_to_pixel_values_lst(
    images: list[Image], max_num_tiles: int
) -> list[Tensor]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _images_to_pixel_values_lst(
    self,
    images: list[Image.Image],
    max_num_tiles: int,
) -> list[torch.Tensor]:
    return [
        image_to_pixel_values(
            image,
            input_size=self.image_size,
            max_num=max_num_tiles,
            use_thumbnail=self.use_thumbnail,
            idx=idx,
        ) for idx, image in enumerate(images)
    ]

_make_batch_input

_make_batch_input(
    input_item: Optional[Union[Any, list[Any]]] = None,
)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _make_batch_input(self,
                      input_item: Optional[Union[Any, list[Any]]] = None):
    if input_item is None:
        input_item = []
    if not isinstance(input_item, list):
        input_item = [input_item]
    return input_item

_preprocess_image

_preprocess_image(
    text: list[str], images: list[Image], max_num_tiles: int
) -> tuple[list[str], dict[str, Tensor]]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _preprocess_image(
    self,
    text: list[str],
    images: list[Image.Image],
    max_num_tiles: int,
) -> tuple[list[str], dict[str, torch.Tensor]]:
    if len(images) == 0:
        image_inputs = {}
    else:
        pixel_values_lst = self._images_to_pixel_values_lst(
            images, max_num_tiles)
        image_inputs: dict[str, NestedTensors] = {
            "pixel_values_flat":
            torch.cat(pixel_values_lst),
            "image_num_patches":
            torch.tensor([len(item) for item in pixel_values_lst]),
        }

        for pixel_values in pixel_values_lst:
            num_patches = pixel_values.shape[0]
            feature_size = num_patches * self.num_image_token
            image_repl = self.get_image_repl(feature_size, num_patches)
            text = [t.replace('<image>', image_repl.full, 1) for t in text]
    return text, image_inputs

get_image_repl abstractmethod

get_image_repl(
    feature_size: int, num_patches: Optional[int]
) -> PromptUpdateDetails[str]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@abstractmethod
def get_image_repl(
    self,
    feature_size: int,
    num_patches: Optional[int],
) -> PromptUpdateDetails[str]:
    raise NotImplementedError

get_num_image_tokens

get_num_image_tokens(
    *,
    image_width: int,
    image_height: int,
    max_num_tiles: int,
) -> int
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_num_image_tokens(
    self,
    *,
    image_width: int,
    image_height: int,
    max_num_tiles: int,
) -> int:
    target_ratios = get_internvl_target_ratios(1, max_num_tiles)

    num_patches, _, _ = calculate_internvl_targets(
        orig_width=image_width,
        orig_height=image_height,
        target_ratios=target_ratios,
        image_size=self.image_size,
        use_thumbnail=self.use_thumbnail,
    )

    return num_patches * self.num_image_token

NanoNemotronBaseVLMultiModalProcessor

Bases: BaseMultiModalProcessor[_I]

Basic image-only MultiModalProcessor for InternVL-style models.

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronBaseVLMultiModalProcessor(BaseMultiModalProcessor[_I]):
    """Basic image-only MultiModalProcessor for InternVL-style models."""

    def _call_hf_processor(
        self,
        prompt: str,
        mm_data: Mapping[str, object],
        mm_kwargs: Mapping[str, object],
        tok_kwargs: Mapping[str, object],
    ) -> Mapping[str, NestedTensors]:
        processed_outputs = super()._call_hf_processor(
            prompt=prompt,
            mm_data=mm_data,
            mm_kwargs=mm_kwargs,
            tok_kwargs=tok_kwargs,
        )

        hf_processor = self.info.get_hf_processor(**mm_kwargs)
        image_token_id = hf_processor.image_token_id

        # Since there may be extra tokens in the feature placeholders,
        # we need to pass the image token ID to the model to select the
        # tokens to merge from the vision encoder outputs
        processed_outputs["image_token_id"] = torch.tensor(image_token_id)

        return processed_outputs

    def _get_mm_fields_config(
        self,
        hf_inputs: Mapping[str, NestedTensors],
        hf_processor_mm_kwargs: Mapping[str, object],
    ) -> Mapping[str, MultiModalFieldConfig]:
        image_num_patches = hf_inputs.get("image_num_patches", torch.empty(0))
        num_images = len(image_num_patches)

        return dict(
            pixel_values_flat=MultiModalFieldConfig.flat_from_sizes(
                "image", image_num_patches),
            image_num_patches=MultiModalFieldConfig.batched("image"),
            image_embeds=MultiModalFieldConfig.batched("image"),
            image_token_id=MultiModalFieldConfig.shared("image", num_images),
        )

    def _get_prompt_updates(
        self,
        mm_items: MultiModalDataItems,
        hf_processor_mm_kwargs: Mapping[str, object],
        out_mm_kwargs: MultiModalKwargs,
    ) -> Sequence[PromptUpdate]:
        hf_processor = self.info.get_hf_processor(**hf_processor_mm_kwargs)

        if "image_num_patches" in out_mm_kwargs:
            image_num_patches = out_mm_kwargs["image_num_patches"]
            assert isinstance(image_num_patches, torch.Tensor)
            image_num_patches = image_num_patches.tolist()
        elif "image_embeds" in out_mm_kwargs:
            # to compute num_patches (similar to Qwen2-VL)
            image_num_patches = [None] * len(out_mm_kwargs["image_embeds"])
        else:
            image_num_patches = []

        def get_replacement_custom(item_idx: int):
            images = mm_items.get_items(
                "image", (ImageEmbeddingItems, ImageProcessorItems))

            if isinstance(images, ImageEmbeddingItems):
                feature_size = images.get_feature_size(item_idx)
            else:
                image_size = images.get_image_size(item_idx)
                # Extract max_num_tiles from kwargs, default to 12
                max_num_tiles = hf_processor_mm_kwargs.get("max_num_tiles", 12)
                feature_size = self.info.get_num_image_tokens(
                    image_width=image_size.width,
                    image_height=image_size.height,
                    max_num_tiles=max_num_tiles,
                    processor=hf_processor,
                )

            num_patches = None
            local_image_num_patches = image_num_patches
            if isinstance(local_image_num_patches, torch.Tensor):
                local_image_num_patches = local_image_num_patches.tolist()
            if isinstance(
                    local_image_num_patches,
                (list, tuple)) and item_idx < len(local_image_num_patches):
                num_patches = int(local_image_num_patches[item_idx])

            return hf_processor.get_image_repl(feature_size, num_patches)

        return [
            PromptReplacement(
                modality="image",
                target="<image>",
                replacement=get_replacement_custom,
            )
        ]

_call_hf_processor

_call_hf_processor(
    prompt: str,
    mm_data: Mapping[str, object],
    mm_kwargs: Mapping[str, object],
    tok_kwargs: Mapping[str, object],
) -> Mapping[str, NestedTensors]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _call_hf_processor(
    self,
    prompt: str,
    mm_data: Mapping[str, object],
    mm_kwargs: Mapping[str, object],
    tok_kwargs: Mapping[str, object],
) -> Mapping[str, NestedTensors]:
    processed_outputs = super()._call_hf_processor(
        prompt=prompt,
        mm_data=mm_data,
        mm_kwargs=mm_kwargs,
        tok_kwargs=tok_kwargs,
    )

    hf_processor = self.info.get_hf_processor(**mm_kwargs)
    image_token_id = hf_processor.image_token_id

    # Since there may be extra tokens in the feature placeholders,
    # we need to pass the image token ID to the model to select the
    # tokens to merge from the vision encoder outputs
    processed_outputs["image_token_id"] = torch.tensor(image_token_id)

    return processed_outputs

_get_mm_fields_config

_get_mm_fields_config(
    hf_inputs: Mapping[str, NestedTensors],
    hf_processor_mm_kwargs: Mapping[str, object],
) -> Mapping[str, MultiModalFieldConfig]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _get_mm_fields_config(
    self,
    hf_inputs: Mapping[str, NestedTensors],
    hf_processor_mm_kwargs: Mapping[str, object],
) -> Mapping[str, MultiModalFieldConfig]:
    image_num_patches = hf_inputs.get("image_num_patches", torch.empty(0))
    num_images = len(image_num_patches)

    return dict(
        pixel_values_flat=MultiModalFieldConfig.flat_from_sizes(
            "image", image_num_patches),
        image_num_patches=MultiModalFieldConfig.batched("image"),
        image_embeds=MultiModalFieldConfig.batched("image"),
        image_token_id=MultiModalFieldConfig.shared("image", num_images),
    )

_get_prompt_updates

_get_prompt_updates(
    mm_items: MultiModalDataItems,
    hf_processor_mm_kwargs: Mapping[str, object],
    out_mm_kwargs: MultiModalKwargs,
) -> Sequence[PromptUpdate]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _get_prompt_updates(
    self,
    mm_items: MultiModalDataItems,
    hf_processor_mm_kwargs: Mapping[str, object],
    out_mm_kwargs: MultiModalKwargs,
) -> Sequence[PromptUpdate]:
    hf_processor = self.info.get_hf_processor(**hf_processor_mm_kwargs)

    if "image_num_patches" in out_mm_kwargs:
        image_num_patches = out_mm_kwargs["image_num_patches"]
        assert isinstance(image_num_patches, torch.Tensor)
        image_num_patches = image_num_patches.tolist()
    elif "image_embeds" in out_mm_kwargs:
        # to compute num_patches (similar to Qwen2-VL)
        image_num_patches = [None] * len(out_mm_kwargs["image_embeds"])
    else:
        image_num_patches = []

    def get_replacement_custom(item_idx: int):
        images = mm_items.get_items(
            "image", (ImageEmbeddingItems, ImageProcessorItems))

        if isinstance(images, ImageEmbeddingItems):
            feature_size = images.get_feature_size(item_idx)
        else:
            image_size = images.get_image_size(item_idx)
            # Extract max_num_tiles from kwargs, default to 12
            max_num_tiles = hf_processor_mm_kwargs.get("max_num_tiles", 12)
            feature_size = self.info.get_num_image_tokens(
                image_width=image_size.width,
                image_height=image_size.height,
                max_num_tiles=max_num_tiles,
                processor=hf_processor,
            )

        num_patches = None
        local_image_num_patches = image_num_patches
        if isinstance(local_image_num_patches, torch.Tensor):
            local_image_num_patches = local_image_num_patches.tolist()
        if isinstance(
                local_image_num_patches,
            (list, tuple)) and item_idx < len(local_image_num_patches):
            num_patches = int(local_image_num_patches[item_idx])

        return hf_processor.get_image_repl(feature_size, num_patches)

    return [
        PromptReplacement(
            modality="image",
            target="<image>",
            replacement=get_replacement_custom,
        )
    ]

NanoNemotronVLDummyInputsBuilder

Bases: NanoNemotronVLDummyInputsBuilder[NanoNemotronVLProcessingInfo]

DummyInputsBuilder extended for video support

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLDummyInputsBuilder(
        NanoNemotronVLDummyInputsBuilder[NanoNemotronVLProcessingInfo]):
    """DummyInputsBuilder extended for video support"""

    def get_dummy_text(self, mm_counts: Mapping[str, int]) -> str:
        num_videos = mm_counts.get("video", 0)

        return super().get_dummy_text(mm_counts) + "<video>" * num_videos

    def get_dummy_mm_data(
        self,
        seq_len: int,
        mm_counts: Mapping[str, int],
    ) -> MultiModalDataDict:
        dummy_image = super().get_dummy_mm_data(seq_len=seq_len,
                                                mm_counts=mm_counts)
        if self.info.supports_video:
            config = self.info.get_hf_config()
            image_size: int = config.force_image_size
            target_num_frames = \
                self.info.get_num_frames_with_most_features(seq_len, mm_counts)
            num_videos = mm_counts.get("video", 0)
            dummy_video = {
                "video":
                self._get_dummy_videos(width=image_size,
                                       height=image_size,
                                       num_frames=target_num_frames,
                                       num_videos=num_videos)
            }
        else:
            dummy_video = {}
        return {**dummy_image, **dummy_video}

get_dummy_mm_data

get_dummy_mm_data(
    seq_len: int, mm_counts: Mapping[str, int]
) -> MultiModalDataDict
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_dummy_mm_data(
    self,
    seq_len: int,
    mm_counts: Mapping[str, int],
) -> MultiModalDataDict:
    dummy_image = super().get_dummy_mm_data(seq_len=seq_len,
                                            mm_counts=mm_counts)
    if self.info.supports_video:
        config = self.info.get_hf_config()
        image_size: int = config.force_image_size
        target_num_frames = \
            self.info.get_num_frames_with_most_features(seq_len, mm_counts)
        num_videos = mm_counts.get("video", 0)
        dummy_video = {
            "video":
            self._get_dummy_videos(width=image_size,
                                   height=image_size,
                                   num_frames=target_num_frames,
                                   num_videos=num_videos)
        }
    else:
        dummy_video = {}
    return {**dummy_image, **dummy_video}

get_dummy_text

get_dummy_text(mm_counts: Mapping[str, int]) -> str
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_dummy_text(self, mm_counts: Mapping[str, int]) -> str:
    num_videos = mm_counts.get("video", 0)

    return super().get_dummy_text(mm_counts) + "<video>" * num_videos

NanoNemotronVLImageEmbeddinInputs

Bases: TypedDict

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLImageEmbeddinInputs(TypedDict):
    type: Literal["image_embeds"]
    data: Union[torch.Tensor, list[torch.Tensor]]
    """ 
    A tensor of shape `(num_images, total_image_feature_size, hidden_size)`
    or a list of tensors of shape `(total_image_feature_size, hidden_size)`

    `hidden_size` must match the hidden size of language model backbone.
    """

data instance-attribute

A tensor of shape (num_images, total_image_feature_size, hidden_size) or a list of tensors of shape (total_image_feature_size, hidden_size)

hidden_size must match the hidden size of language model backbone.

type instance-attribute

type: Literal['image_embeds']

NanoNemotronVLImagePixelInputs

Bases: TypedDict

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLImagePixelInputs(TypedDict):
    type: Literal["pixel_values"]
    pixel_values_flat: torch.Tensor
    """
    Shape:
    `(batch_size * num_images * (1 + num_patches), num_channels, height, width)`
    """

    num_patches: torch.Tensor
    """Shape: `(batch_size * num_images)`"""

num_patches instance-attribute

num_patches: Tensor

Shape: (batch_size * num_images)

pixel_values_flat instance-attribute

pixel_values_flat: Tensor

Shape: (batch_size * num_images * (1 + num_patches), num_channels, height, width)

type instance-attribute

type: Literal['pixel_values']

NanoNemotronVLMultiModalProcessor

Bases: NanoNemotronBaseVLMultiModalProcessor[NanoNemotronVLProcessingInfo]

MultiModalProcessor extended for video support

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLMultiModalProcessor(
        NanoNemotronBaseVLMultiModalProcessor[NanoNemotronVLProcessingInfo]):
    """MultiModalProcessor extended for video support"""

    def _call_hf_processor(
        self,
        prompt: str,
        mm_data: Mapping[str, object],
        mm_kwargs: Mapping[str, object],
        tok_kwargs: Mapping[str, object],
    ) -> Mapping[str, NestedTensors]:
        processed_outputs = super()._call_hf_processor(prompt, mm_data,
                                                       mm_kwargs, tok_kwargs)

        hf_processor = self.info.get_hf_processor(**mm_kwargs)
        if self.info.supports_video and (
                video_token_id := hf_processor.video_token_id) is not None:
            processed_outputs["video_token_id"] = torch.tensor(video_token_id)
        return processed_outputs

    def _get_mm_fields_config(
        self,
        hf_inputs: Mapping[str, NestedTensors],
        hf_processor_mm_kwargs: Mapping[str, object],
    ) -> Mapping[str, MultiModalFieldConfig]:
        image_fields = super()._get_mm_fields_config(hf_inputs,
                                                     hf_processor_mm_kwargs)
        if self.info.supports_video:
            video_num_patches = hf_inputs.get("video_num_patches",
                                              torch.empty(0))
            num_videos = len(video_num_patches)
            video_fields = dict(
                pixel_values_flat_video=MultiModalFieldConfig.flat_from_sizes(
                    "video", video_num_patches),
                video_num_patches=MultiModalFieldConfig.batched("video"),
                video_token_id=MultiModalFieldConfig.shared(
                    "video", num_videos))
        else:
            video_fields = {}

        return image_fields | video_fields

    def _get_prompt_updates(
        self,
        mm_items: MultiModalDataItems,
        hf_processor_mm_kwargs: Mapping[str, object],
        out_mm_kwargs: MultiModalKwargsItems,
    ) -> Sequence[PromptUpdate]:
        prompt_repl = super()._get_prompt_updates(
            mm_items=mm_items,
            hf_processor_mm_kwargs=hf_processor_mm_kwargs,
            out_mm_kwargs=out_mm_kwargs,
        )

        hf_processor = self.info.get_hf_processor(**hf_processor_mm_kwargs)

        out_mm_data = out_mm_kwargs.get_data()
        if "video_num_patches" in out_mm_data:
            video_num_patches = out_mm_data["video_num_patches"]
            assert isinstance(video_num_patches, torch.Tensor)
            video_num_patches = video_num_patches.tolist()
        else:
            video_num_patches = []

        def get_video_replacement_internvl(item_idx: int):
            feature_size = hf_processor.num_image_token
            num_patches = video_num_patches[item_idx]
            if num_patches is not None:
                assert isinstance(num_patches, int)

            return hf_processor.get_video_repl(
                feature_size,
                num_patches,
                video_context_token=hf_processor.video_token)

        if self.info.supports_video:
            prompt_repl = [
                *prompt_repl,
                PromptReplacement(
                    modality="video",
                    target="<video>",
                    replacement=get_video_replacement_internvl,
                )
            ]

        return prompt_repl

_call_hf_processor

_call_hf_processor(
    prompt: str,
    mm_data: Mapping[str, object],
    mm_kwargs: Mapping[str, object],
    tok_kwargs: Mapping[str, object],
) -> Mapping[str, NestedTensors]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _call_hf_processor(
    self,
    prompt: str,
    mm_data: Mapping[str, object],
    mm_kwargs: Mapping[str, object],
    tok_kwargs: Mapping[str, object],
) -> Mapping[str, NestedTensors]:
    processed_outputs = super()._call_hf_processor(prompt, mm_data,
                                                   mm_kwargs, tok_kwargs)

    hf_processor = self.info.get_hf_processor(**mm_kwargs)
    if self.info.supports_video and (
            video_token_id := hf_processor.video_token_id) is not None:
        processed_outputs["video_token_id"] = torch.tensor(video_token_id)
    return processed_outputs

_get_mm_fields_config

_get_mm_fields_config(
    hf_inputs: Mapping[str, NestedTensors],
    hf_processor_mm_kwargs: Mapping[str, object],
) -> Mapping[str, MultiModalFieldConfig]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _get_mm_fields_config(
    self,
    hf_inputs: Mapping[str, NestedTensors],
    hf_processor_mm_kwargs: Mapping[str, object],
) -> Mapping[str, MultiModalFieldConfig]:
    image_fields = super()._get_mm_fields_config(hf_inputs,
                                                 hf_processor_mm_kwargs)
    if self.info.supports_video:
        video_num_patches = hf_inputs.get("video_num_patches",
                                          torch.empty(0))
        num_videos = len(video_num_patches)
        video_fields = dict(
            pixel_values_flat_video=MultiModalFieldConfig.flat_from_sizes(
                "video", video_num_patches),
            video_num_patches=MultiModalFieldConfig.batched("video"),
            video_token_id=MultiModalFieldConfig.shared(
                "video", num_videos))
    else:
        video_fields = {}

    return image_fields | video_fields

_get_prompt_updates

_get_prompt_updates(
    mm_items: MultiModalDataItems,
    hf_processor_mm_kwargs: Mapping[str, object],
    out_mm_kwargs: MultiModalKwargsItems,
) -> Sequence[PromptUpdate]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _get_prompt_updates(
    self,
    mm_items: MultiModalDataItems,
    hf_processor_mm_kwargs: Mapping[str, object],
    out_mm_kwargs: MultiModalKwargsItems,
) -> Sequence[PromptUpdate]:
    prompt_repl = super()._get_prompt_updates(
        mm_items=mm_items,
        hf_processor_mm_kwargs=hf_processor_mm_kwargs,
        out_mm_kwargs=out_mm_kwargs,
    )

    hf_processor = self.info.get_hf_processor(**hf_processor_mm_kwargs)

    out_mm_data = out_mm_kwargs.get_data()
    if "video_num_patches" in out_mm_data:
        video_num_patches = out_mm_data["video_num_patches"]
        assert isinstance(video_num_patches, torch.Tensor)
        video_num_patches = video_num_patches.tolist()
    else:
        video_num_patches = []

    def get_video_replacement_internvl(item_idx: int):
        feature_size = hf_processor.num_image_token
        num_patches = video_num_patches[item_idx]
        if num_patches is not None:
            assert isinstance(num_patches, int)

        return hf_processor.get_video_repl(
            feature_size,
            num_patches,
            video_context_token=hf_processor.video_token)

    if self.info.supports_video:
        prompt_repl = [
            *prompt_repl,
            PromptReplacement(
                modality="video",
                target="<video>",
                replacement=get_video_replacement_internvl,
            )
        ]

    return prompt_repl

NanoNemotronVLProcessingInfo

Bases: BaseNanoNemotronVLProcessingInfo

ProcessingInfo extended for video processing

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLProcessingInfo(BaseNanoNemotronVLProcessingInfo):
    """ ProcessingInfo extended for video processing"""

    @property
    def supports_video(self):
        return self.get_hf_processor().supports_video

    def get_supported_mm_limits(self):
        video_limit = {"video": None} if self.supports_video else {}
        return {**super().get_supported_mm_limits(), **video_limit}

    def get_video_token(self) -> Optional[str]:
        return IMG_CONTEXT

    def get_num_frames_with_most_features(
        self,
        seq_len: int,
        mm_counts: Mapping[str, int],
    ) -> int:
        max_images = mm_counts.get("image", 0)
        max_videos = mm_counts.get("video", 0)

        processor = self.get_hf_processor()  # we get the CustomProcessor here

        max_image_tokens = self.get_max_image_tokens() * max_images
        max_total_frames = (seq_len -
                            max_image_tokens) // processor.num_image_token
        max_frames_per_video = max_total_frames // max(max_videos, 1)

        max_frames_per_video = min(max_frames_per_video, MAX_FRAMES)
        return max(max_frames_per_video, 1)

    def get_hf_processor(self, **kwargs: object) -> NanoNemotronVLProcessor:
        return self.ctx.init_processor(
            NanoNemotronVLProcessor,
            config=self.get_hf_config(),
            tokenizer=self.get_tokenizer(),
            video_token=self.get_video_token(),
            **kwargs,
        )

supports_video property

supports_video

get_hf_processor

get_hf_processor(
    **kwargs: object,
) -> NanoNemotronVLProcessor
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_hf_processor(self, **kwargs: object) -> NanoNemotronVLProcessor:
    return self.ctx.init_processor(
        NanoNemotronVLProcessor,
        config=self.get_hf_config(),
        tokenizer=self.get_tokenizer(),
        video_token=self.get_video_token(),
        **kwargs,
    )

get_num_frames_with_most_features

get_num_frames_with_most_features(
    seq_len: int, mm_counts: Mapping[str, int]
) -> int
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_num_frames_with_most_features(
    self,
    seq_len: int,
    mm_counts: Mapping[str, int],
) -> int:
    max_images = mm_counts.get("image", 0)
    max_videos = mm_counts.get("video", 0)

    processor = self.get_hf_processor()  # we get the CustomProcessor here

    max_image_tokens = self.get_max_image_tokens() * max_images
    max_total_frames = (seq_len -
                        max_image_tokens) // processor.num_image_token
    max_frames_per_video = max_total_frames // max(max_videos, 1)

    max_frames_per_video = min(max_frames_per_video, MAX_FRAMES)
    return max(max_frames_per_video, 1)

get_supported_mm_limits

get_supported_mm_limits()
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_supported_mm_limits(self):
    video_limit = {"video": None} if self.supports_video else {}
    return {**super().get_supported_mm_limits(), **video_limit}

get_video_token

get_video_token() -> Optional[str]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_video_token(self) -> Optional[str]:
    return IMG_CONTEXT

NanoNemotronVLProcessor

Bases: BaseNanoNemotronVLProcessor

HF Processor with extended video processing logic. Code for video processing is adapted from video example: https://huggingface.co/OpenGVLab/InternVL3-1B#inference-with-transformers

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLProcessor(BaseNanoNemotronVLProcessor):
    """
    HF Processor  with extended video processing logic.
    Code for video processing is adapted from video example:
    https://huggingface.co/OpenGVLab/InternVL3-1B#inference-with-transformers
    """

    def __init__(
        self,
        config: PretrainedConfig,
        tokenizer: AnyTokenizer,
        *,
        min_dynamic_patch: Optional[int] = None,
        max_dynamic_patch: Optional[int] = None,
        dynamic_image_size: Optional[bool] = None,
        video_token: Optional[str] = None,
    ) -> None:
        super().__init__(
            config=config,
            tokenizer=tokenizer,
            min_dynamic_patch=min_dynamic_patch,
            max_dynamic_patch=max_dynamic_patch,
            dynamic_image_size=dynamic_image_size,
        )
        # add extra video token for video processing
        self.video_token = video_token

    @property
    def supports_video(self) -> bool:
        return self.video_token_id is not None

    @property
    def video_token_id(self) -> Optional[int]:
        if self.video_token is None:
            return None
        return self.tokenizer.get_vocab().get(self.video_token, None)

    @property
    def image_token_id(self) -> int:
        return self.tokenizer.convert_tokens_to_ids(IMG_CONTEXT)

    def _videos_to_pixel_values_lst(
        self,
        videos: list[npt.NDArray],
        max_num_tiles: int,
        dynamic_image_size: Optional[bool] = None,
    ) -> list[torch.Tensor]:

        return [
            video_to_pixel_values(
                video,
                input_size=self.image_size,
                max_num_tiles=max_num_tiles,
                use_thumbnail=self.use_thumbnail,
            ) for video in videos
        ]

    def _preprocess_video(
        self,
        text: list[str],
        videos: list[npt.NDArray],
        max_num_tiles: int,
        dynamic_image_size: Optional[bool] = None,
    ):
        if len(videos) == 0 or not self.supports_video:
            video_inputs = {}
        else:
            pixel_values_lst_video = self._videos_to_pixel_values_lst(
                videos,
                max_num_tiles=max_num_tiles,
                dynamic_image_size=dynamic_image_size,
            )

            video_inputs: dict[str, NestedTensors] = {
                "pixel_values_flat_video":
                torch.cat(pixel_values_lst_video),
                "video_num_patches":
                torch.tensor([len(item) for item in pixel_values_lst_video]),
            }

            for pixel_values in pixel_values_lst_video:
                num_patches = pixel_values.shape[0]

                video_repl = self.get_video_repl(self.num_image_token,
                                                 num_patches, self.video_token)
                text = [t.replace('<video>', video_repl.full, 1) for t in text]
        return text, video_inputs

    def __call__(
        self,
        text: Optional[Union[str, list[str]]] = None,
        images: Optional[Union[Image.Image, list[Image.Image]]] = None,
        videos: Optional[Union[npt.NDArray, list[npt.NDArray]]] = None,
        return_tensors: Optional[Union[str, TensorType]] = None,
        max_num_tiles: Optional[int] = None,
        dynamic_image_size: Optional[bool] = None,
    ) -> Mapping[str, NestedTensors]:
        # Use default if not provided
        if max_num_tiles is None:
            max_num_tiles = 12

        text, images, videos = [
            self._make_batch_input(x) for x in (text, images, videos)
        ]

        text, image_inputs = self._preprocess_image(
            text=text,
            images=images,
            max_num_tiles=max_num_tiles,
        )

        text, video_inputs = self._preprocess_video(
            text=text,
            videos=videos,
            max_num_tiles=max_num_tiles,
            dynamic_image_size=dynamic_image_size,
        )

        text_inputs = self.tokenizer(text, add_special_tokens=False)

        return BatchFeature({
            **BatchEncoding(text_inputs, tensor_type=return_tensors),
            **image_inputs,
            **video_inputs,
        })

    def get_image_repl(
        self,
        feature_size: int,
        num_patches: Optional[int],
    ) -> PromptUpdateDetails[str]:
        repl_features = IMG_CONTEXT * feature_size
        repl_full = IMG_START + repl_features + IMG_END

        return PromptUpdateDetails.select_text(repl_full, IMG_CONTEXT)

    def get_video_repl(
        self,
        feature_size: int,
        num_patches: Optional[int] = None,
        video_context_token: str = IMG_CONTEXT,
    ) -> PromptUpdateDetails[str]:
        repl_features = video_context_token * self.num_image_token
        repl_features_with_sep = IMG_START + repl_features + IMG_END
        # num_patches is equal to num_frames
        repl_full = ''.join([
            f'Frame{i+1}: {repl_features_with_sep}' for i in range(num_patches)
        ])

        return PromptUpdateDetails.select_text(repl_full, video_context_token)

image_token_id property

image_token_id: int

supports_video property

supports_video: bool

video_token instance-attribute

video_token = video_token

video_token_id property

video_token_id: Optional[int]

__call__

__call__(
    text: Optional[Union[str, list[str]]] = None,
    images: Optional[Union[Image, list[Image]]] = None,
    videos: Optional[Union[NDArray, list[NDArray]]] = None,
    return_tensors: Optional[Union[str, TensorType]] = None,
    max_num_tiles: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> Mapping[str, NestedTensors]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __call__(
    self,
    text: Optional[Union[str, list[str]]] = None,
    images: Optional[Union[Image.Image, list[Image.Image]]] = None,
    videos: Optional[Union[npt.NDArray, list[npt.NDArray]]] = None,
    return_tensors: Optional[Union[str, TensorType]] = None,
    max_num_tiles: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> Mapping[str, NestedTensors]:
    # Use default if not provided
    if max_num_tiles is None:
        max_num_tiles = 12

    text, images, videos = [
        self._make_batch_input(x) for x in (text, images, videos)
    ]

    text, image_inputs = self._preprocess_image(
        text=text,
        images=images,
        max_num_tiles=max_num_tiles,
    )

    text, video_inputs = self._preprocess_video(
        text=text,
        videos=videos,
        max_num_tiles=max_num_tiles,
        dynamic_image_size=dynamic_image_size,
    )

    text_inputs = self.tokenizer(text, add_special_tokens=False)

    return BatchFeature({
        **BatchEncoding(text_inputs, tensor_type=return_tensors),
        **image_inputs,
        **video_inputs,
    })

__init__

__init__(
    config: PretrainedConfig,
    tokenizer: AnyTokenizer,
    *,
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
    video_token: Optional[str] = None,
) -> None
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __init__(
    self,
    config: PretrainedConfig,
    tokenizer: AnyTokenizer,
    *,
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
    video_token: Optional[str] = None,
) -> None:
    super().__init__(
        config=config,
        tokenizer=tokenizer,
        min_dynamic_patch=min_dynamic_patch,
        max_dynamic_patch=max_dynamic_patch,
        dynamic_image_size=dynamic_image_size,
    )
    # add extra video token for video processing
    self.video_token = video_token

_preprocess_video

_preprocess_video(
    text: list[str],
    videos: list[NDArray],
    max_num_tiles: int,
    dynamic_image_size: Optional[bool] = None,
)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _preprocess_video(
    self,
    text: list[str],
    videos: list[npt.NDArray],
    max_num_tiles: int,
    dynamic_image_size: Optional[bool] = None,
):
    if len(videos) == 0 or not self.supports_video:
        video_inputs = {}
    else:
        pixel_values_lst_video = self._videos_to_pixel_values_lst(
            videos,
            max_num_tiles=max_num_tiles,
            dynamic_image_size=dynamic_image_size,
        )

        video_inputs: dict[str, NestedTensors] = {
            "pixel_values_flat_video":
            torch.cat(pixel_values_lst_video),
            "video_num_patches":
            torch.tensor([len(item) for item in pixel_values_lst_video]),
        }

        for pixel_values in pixel_values_lst_video:
            num_patches = pixel_values.shape[0]

            video_repl = self.get_video_repl(self.num_image_token,
                                             num_patches, self.video_token)
            text = [t.replace('<video>', video_repl.full, 1) for t in text]
    return text, video_inputs

_videos_to_pixel_values_lst

_videos_to_pixel_values_lst(
    videos: list[NDArray],
    max_num_tiles: int,
    dynamic_image_size: Optional[bool] = None,
) -> list[Tensor]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _videos_to_pixel_values_lst(
    self,
    videos: list[npt.NDArray],
    max_num_tiles: int,
    dynamic_image_size: Optional[bool] = None,
) -> list[torch.Tensor]:

    return [
        video_to_pixel_values(
            video,
            input_size=self.image_size,
            max_num_tiles=max_num_tiles,
            use_thumbnail=self.use_thumbnail,
        ) for video in videos
    ]

get_image_repl

get_image_repl(
    feature_size: int, num_patches: Optional[int]
) -> PromptUpdateDetails[str]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_image_repl(
    self,
    feature_size: int,
    num_patches: Optional[int],
) -> PromptUpdateDetails[str]:
    repl_features = IMG_CONTEXT * feature_size
    repl_full = IMG_START + repl_features + IMG_END

    return PromptUpdateDetails.select_text(repl_full, IMG_CONTEXT)

get_video_repl

get_video_repl(
    feature_size: int,
    num_patches: Optional[int] = None,
    video_context_token: str = IMG_CONTEXT,
) -> PromptUpdateDetails[str]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_video_repl(
    self,
    feature_size: int,
    num_patches: Optional[int] = None,
    video_context_token: str = IMG_CONTEXT,
) -> PromptUpdateDetails[str]:
    repl_features = video_context_token * self.num_image_token
    repl_features_with_sep = IMG_START + repl_features + IMG_END
    # num_patches is equal to num_frames
    repl_full = ''.join([
        f'Frame{i+1}: {repl_features_with_sep}' for i in range(num_patches)
    ])

    return PromptUpdateDetails.select_text(repl_full, video_context_token)

NanoNemotronVLVideoEmbeddingInputs

Bases: TensorSchema

Dimensions
  • n: Number of videos
  • f: Total video feature size
  • h: Hidden size (must match the hidden size of language model backbone)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLVideoEmbeddingInputs(TensorSchema):
    """
    Dimensions:
        - n: Number of videos
        - f: Total video feature size
        - h: Hidden size (must match the hidden size of language model backbone)
    """
    type: Literal["video_embeds"]
    data: Annotated[Union[torch.Tensor, list[torch.Tensor]],
                    TensorShape("n", "f", "h")]

data instance-attribute

data: Annotated[
    Union[Tensor, list[Tensor]], TensorShape(n, f, h)
]

type instance-attribute

type: Literal['video_embeds']

NanoNemotronVLVideoPixelInputs

Bases: TensorSchema

Dimensions
  • bvf: Batch size * number of videos * num_frames
  • bn: Batch size * number of images
  • c: Number of channels (3)
  • h: Height of each video frame
  • w: Width of each video frame
Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLVideoPixelInputs(TensorSchema):
    """
    Dimensions:
        - bvf: Batch size * number of videos * num_frames
        - bn: Batch size * number of images
        - c: Number of channels (3)
        - h: Height of each video frame
        - w: Width of each video frame
    """
    type: Literal["pixel_values_videos"]
    pixel_values_flat: Annotated[torch.Tensor, TensorShape("bvf", 3, "h", "w")]
    num_patches: Annotated[torch.Tensor, TensorShape("bn")]

num_patches instance-attribute

num_patches: Annotated[Tensor, TensorShape(bn)]

pixel_values_flat instance-attribute

pixel_values_flat: Annotated[
    Tensor, TensorShape(bvf, 3, h, w)
]

type instance-attribute

type: Literal['pixel_values_videos']

NemotronH_Nano_VL_V2

Bases: Module, HasInnerState, IsHybrid, SupportsMultiModal

Source code in vllm/model_executor/models/nano_nemotron_vl.py
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
@MULTIMODAL_REGISTRY.register_processor(
    NanoNemotronVLMultiModalProcessor,
    info=NanoNemotronVLProcessingInfo,
    dummy_inputs=NanoNemotronVLDummyInputsBuilder,
)
class NemotronH_Nano_VL_V2(nn.Module, HasInnerState, IsHybrid,
                           SupportsMultiModal):

    @classmethod
    def get_placeholder_str(cls, modality: str, i: int) -> Optional[str]:
        if modality.startswith("image"):
            return "<image>"
        if modality.startswith("video"):
            return "<video>"
        return None

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()
        config = vllm_config.model_config.hf_config

        image_size = config.force_image_size
        patch_size = config.patch_size
        self.patch_size = patch_size
        self.template = config.template
        self.num_image_token = int(
            (image_size // patch_size)**2 * (config.downsample_ratio**2))
        self.downsample_ratio = config.downsample_ratio
        self.ps_version = config.ps_version
        self.image_tag_type = config.image_tag_type

        self.language_model = init_vllm_registered_model(
            vllm_config=vllm_config,
            hf_config=config.text_config,
            prefix=maybe_prefix(prefix, "language_model"),
        )
        self.vision_model = self.get_vit_model_from_radio_config(config).to(
            self.language_model.config.torch_dtype)

        # Construct the vision projection.
        vit_hidden_size = config.vit_hidden_size
        vision_projection_hidden_size = config.projector_hidden_size
        llm_hidden_size = config.text_config.hidden_size

        self.mlp1 = nn.Sequential(
            RMSNorm(hidden_size=vit_hidden_size *
                    int(1 / self.downsample_ratio)**2,
                    eps=1e-5),
            nn.Linear(
                vit_hidden_size * int(1 / self.downsample_ratio)**2,
                vision_projection_hidden_size,
                bias=False,
            ),
            ReLUSquaredActivation(),
            nn.Linear(vision_projection_hidden_size,
                      llm_hidden_size,
                      bias=False),
        )
        self.mlp1 = self.mlp1.to(self.language_model.config.torch_dtype)

        self.img_context_token_id = None
        self.video_context_token_id = None
        self.config = config

    def pixel_shuffle(self, x, scale_factor=0.5):
        n, w, h, c = x.size()
        # N, W, H, C --> N, W, H * scale, C // scale
        x = x.view(
            n,
            w,
            int(h * scale_factor),
            int(c / scale_factor),
        )
        # N, W, H * scale, C // scale --> N, H * scale, W, C // scale
        x = x.permute(0, 2, 1, 3).contiguous()
        # N, H * scale, W, C // scale -->
        # N, H * scale, W * scale, C // (scale ** 2)
        x = x.view(
            n,
            int(h * scale_factor),
            int(w * scale_factor),
            int(c / (scale_factor * scale_factor)),
        )
        if self.ps_version == "v1":
            warnings.warn(
                "In ps_version 'v1', the height and width have not "
                "been swapped back, which results in a transposed image.",
                stacklevel=2,
            )
        else:
            x = x.permute(0, 2, 1, 3).contiguous()
        return x

    def extract_feature(self, pixel_values):
        vit_embeds = self.vision_model(pixel_values)
        vit_embeds = vit_embeds.to(dtype=torch.bfloat16)
        h = w = int(vit_embeds.shape[1]**0.5)
        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1)
        vit_embeds = self.pixel_shuffle(vit_embeds,
                                        scale_factor=self.downsample_ratio)
        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1,
                                        vit_embeds.shape[-1])
        vit_embeds = self.mlp1(vit_embeds)
        return vit_embeds

    def _parse_and_validate_image_input(
            self, **kwargs: object) -> Optional[NanoNemotronVLImageInputs]:
        pixel_values_flat = kwargs.pop("pixel_values_flat", None)
        image_num_patches = kwargs.pop("image_num_patches", None)
        image_embeds = kwargs.pop("image_embeds", None)

        if pixel_values_flat is None and image_embeds is None:
            return None

        if image_embeds is not None:
            if not isinstance(image_embeds, (torch.Tensor, list)):
                raise ValueError("Incorrect type of image embeddings. "
                                 f"Got type: {type(image_embeds)}")

            return NanoNemotronVLImageEmbeddinInputs(
                type="image_embeds",
                data=flatten_bn(image_embeds),
            )

        image_token_id = kwargs["image_token_id"]
        assert isinstance(image_token_id, torch.Tensor)
        self.img_context_token_id = image_token_id.flatten().unique().item()

        if pixel_values_flat is not None:
            if not isinstance(pixel_values_flat, (torch.Tensor, list)):
                raise ValueError("Incorrect type of pixel values. "
                                 f"Got type: {type(pixel_values_flat)}")

            if not isinstance(image_num_patches, (torch.Tensor, list)):
                raise ValueError("Incorrect type of image_num_patches. "
                                 f"Got type: {type(image_num_patches)}")

            pixel_values_flat = flatten_bn(pixel_values_flat, concat=True)
            image_num_patches = flatten_bn(image_num_patches, concat=True)

            return NanoNemotronVLImagePixelInputs(
                type="pixel_values",
                pixel_values_flat=pixel_values_flat,
                num_patches=image_num_patches,
            )

        raise AssertionError("This line should be unreachable.")

    def _process_image_input(
            self, image_input: NanoNemotronVLImageInputs) -> torch.Tensor:
        if image_input["type"] == "image_embeds":
            return image_input["data"]

        assert self.vision_model is not None

        image_embeds = self.extract_feature(image_input["pixel_values_flat"])
        num_patches = image_input["num_patches"]

        # Only one image in the current batch
        if len(num_patches) == 1:
            return (image_embeds.view(-1,
                                      self.config.text_config.hidden_size), )

        # NOTE: Image embeddings are split into separate tensors for each image
        # by the size of each embedding.
        feature_size = image_embeds.shape[1]
        image_embeds = image_embeds.view(-1,
                                         self.config.text_config.hidden_size)
        image_feature_sizes = [
            num_patches * feature_size for num_patches in num_patches
        ]
        return image_embeds.split(image_feature_sizes)

    def _parse_and_validate_video_input(
            self,
            **kwargs: object) -> Optional[NanoNemotronVLVideoPixelInputs]:
        pixel_values_flat_video = kwargs.pop("pixel_values_flat_video", None)
        video_num_patches = kwargs.pop("video_num_patches", None)
        video_embeds = kwargs.pop("video_embeds", None)

        if pixel_values_flat_video is None and video_embeds is None:
            return None

        if video_embeds is not None:
            return NanoNemotronVLVideoEmbeddingInputs(
                type="video_embeds",
                data=flatten_bn(video_embeds),
            )

        video_token_id = kwargs["video_token_id"]
        assert isinstance(video_token_id, torch.Tensor)
        self.video_context_token_id = video_token_id.flatten().unique().item()

        if pixel_values_flat_video is not None:
            if not isinstance(pixel_values_flat_video, (torch.Tensor, list)):
                raise ValueError("Incorrect type of pixel values. "
                                 f"Got type: {type(pixel_values_flat_video)}")

            if not isinstance(video_num_patches, (torch.Tensor, list)):
                raise ValueError("Incorrect type of image_num_patches. "
                                 f"Got type: {type(video_num_patches)}")

            pixel_values_flat_video = flatten_bn(pixel_values_flat_video,
                                                 concat=True)
            video_num_patches = flatten_bn(video_num_patches, concat=True)
            expected_h = expected_w = self.config.force_image_size
            resolve_bindings = {"h": expected_h, "w": expected_w}

            return NanoNemotronVLVideoPixelInputs(
                type="pixel_values_videos",
                pixel_values_flat=pixel_values_flat_video,
                num_patches=video_num_patches,
                resolve_bindings=resolve_bindings,
            )

        raise AssertionError("This line should be unreachable.")

    def _parse_and_validate_multimodal_inputs(self, **kwargs: object) -> dict:
        modalities = {}
        # Preserve the order of modalities if there are multiple of them
        # from the order of kwargs.
        for input_key in kwargs:
            if input_key in ("pixel_values_flat",
                             "image_embeds") and "images" not in modalities:
                modalities["images"] = self._parse_and_validate_image_input(
                    **kwargs)
            if input_key in ("pixel_values_flat_video",
                             ) and "videos" not in modalities:
                modalities["videos"] = self._parse_and_validate_video_input(
                    **kwargs)

        return modalities

    def get_multimodal_embeddings(self,
                                  **kwargs: object) -> MultiModalEmbeddings:
        # Validate the multimodal input keyword arguments
        modalities = self._parse_and_validate_multimodal_inputs(**kwargs)
        if modalities is None:
            return []

        # # The result multimodal_embeddings is tuple of tensors, with each
        # tensor correspoending to a multimodal data item (image or video).
        multimodal_embeddings: tuple[torch.Tensor, ...] = ()

        # NOTE: It is important to iterate over the keys in this dictionary
        # to preserve the order of the modalities.
        for modality in modalities:
            if modality == "images":
                image_input = modalities["images"]
                vision_embeddings = self._process_image_input(image_input)
                multimodal_embeddings += vision_embeddings
            if modality == "videos":
                video_input = modalities["videos"]
                video_embeddings = self._process_image_input(video_input)
                multimodal_embeddings += video_embeddings

        return multimodal_embeddings

    def get_language_model(self) -> torch.nn.Module:
        return self.language_model

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        intermediate_tensors: Optional[IntermediateTensors] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        **kwargs: object,
    ) -> Union[torch.Tensor, IntermediateTensors]:
        if intermediate_tensors is not None:
            input_ids = None
            inputs_embeds = None

        hidden_states = self.language_model(
            input_ids=input_ids,
            positions=positions,
            intermediate_tensors=intermediate_tensors,
            inputs_embeds=inputs_embeds,
            **kwargs,
        )

        return hidden_states

    def get_mm_mapping(self) -> MultiModelKeys:
        """
        Get the module prefix in multimodal models
        """
        return MultiModelKeys.from_string_field(
            language_model="language_model",
            connector="mlp1",
            tower_model="vision_model",
        )

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
    ) -> Optional[torch.Tensor]:
        return self.language_model.compute_logits(hidden_states)

    def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]):
        adapter_dict = dict(self.mlp1.named_parameters())

        def is_llm(name: str) -> bool:
            return name.startswith("language_model")

        def is_adapter_weights(weight: tuple[str, torch.Tensor]):
            return weight[0].startswith("mlp1")

        def is_vision_weights(name: str) -> bool:
            return name.startswith("vision_model.radio_model.")

        # Separate weights by component
        llm_weights = []
        vision_weights = []

        for name, w in weights:
            if is_llm(name):
                # Strip 'language_model.' prefix for LLM weights
                llm_weights.append((".".join(name.split(".")[1:]), w))
            elif is_adapter_weights((name, w)):
                # Load vision-language adapter weights directly
                trimmed_name = ".".join(name.split(".")[1:])
                param = adapter_dict[trimmed_name]
                with torch.no_grad():
                    default_weight_loader(param, w)
            elif is_vision_weights(name):
                # Convert: vision_model.radio_model.* → radio_model.*
                hf_key = name[len(
                    "vision_model."):]  # Remove "vision_model." prefix
                vision_weights.append((hf_key, w))

        self.language_model.load_weights(llm_weights)
        self.vision_model.load_weights(vision_weights)

    def print_architecture(self,
                           detailed: bool = True,
                           save_to_file: str = None):
        """
        Print model architecture with parameter names, shapes, and sizes.

        Args:
            detailed: If True, show detailed parameter breakdown
            save_to_file: If provided, save output to this file path
        """
        import sys
        from io import StringIO

        # Capture output if saving to file
        original_stdout = sys.stdout
        if save_to_file:
            sys.stdout = StringIO()

        try:
            print("=" * 100)
            print("NemotronH_Nano_VL_V2 Model Architecture")
            print("=" * 100)

            total_params = 0
            param_groups = {
                "language_model": [],
                "vision_model": [],
                "mlp1": [],
                "other": [],
            }

            for name, param in self.named_parameters():
                param_size = param.numel()
                total_params += param_size

                # Group parameters by main component
                if name.startswith("language_model"):
                    param_groups["language_model"].append(
                        (name, param.shape, param_size, param.dtype))
                elif name.startswith("vision_model"):
                    param_groups["vision_model"].append(
                        (name, param.shape, param_size, param.dtype))
                elif name.startswith("mlp1"):
                    param_groups["mlp1"].append(
                        (name, param.shape, param_size, param.dtype))
                else:
                    param_groups["other"].append(
                        (name, param.shape, param_size, param.dtype))

                if detailed:
                    print(f"{name:<70} | Shape: {str(param.shape):<25} | "
                          f"Size: {param_size:>12,} | Dtype: {param.dtype}")

            print("=" * 100)
            print("Summary by Component:")
            print("-" * 60)

            for component, params in param_groups.items():
                if params:  # Only show components that have parameters
                    component_total = sum(size for _, _, size, _ in params)
                    percentage = ((component_total / total_params) *
                                  100 if total_params > 0 else 0)
                    print(f"{component:<20} | Parameters: {len(params):>4} | "
                          f"Total Size: {component_total:>15,} | "
                          f"{percentage:>6.2f}%")

            print("-" * 60)
            print(f"{'Total Parameters':<20} | {total_params:>15,}")

            # Estimate memory usage (assuming bfloat16 = 2 bytes per parameter)
            memory_mb = total_params * 2 / (1024**2)
            memory_gb = memory_mb / 1024
            print(f"{'Est. Memory (MB)':<20} | {memory_mb:>15.2f}")
            print(f"{'Est. Memory (GB)':<20} | {memory_gb:>15.2f}")
            print("=" * 100)

            # Save to file if requested
            if save_to_file:
                output = sys.stdout.getvalue()
                sys.stdout = original_stdout
                with open(save_to_file, "w") as f:
                    f.write(output)
                print(f"Architecture saved to: {save_to_file}")
                print(output)  # Also print to console

        finally:
            if save_to_file and sys.stdout != original_stdout:
                sys.stdout = original_stdout

    def get_model_info(self):
        """
        Get basic model information as a dictionary.
        """
        total_params = sum(p.numel() for p in self.parameters())

        component_info = {}
        for name, param in self.named_parameters():
            component = name.split(".")[0]
            if component not in component_info:
                component_info[component] = {"params": 0, "size": 0}
            component_info[component]["params"] += 1
            component_info[component]["size"] += param.numel()

        return {
            "model_name": "NemotronH_Nano_VL_V2",
            "total_parameters": total_params,
            "memory_estimate_mb": total_params * 2 / (1024**2),  # bfloat16
            "components": component_info,
            "config": {
                "image_size": getattr(self.config, "force_image_size", None),
                "patch_size": getattr(self.config, "patch_size", None),
                "num_image_token": self.num_image_token,
                "downsample_ratio": self.downsample_ratio,
            },
        }

    def get_vit_model_from_radio_config(self, hf_config):
        hf_config_vision = hf_config.vision_config
        model_name = hf_config_vision.args.get("model")
        if model_name is None:
            raise ValueError(f'Unsupported vit model type: {model_name}')

        preferred_resolution = getattr(hf_config_vision,
                                       "preferred_resolution", None)
        image_size = preferred_resolution[0] if preferred_resolution else 224
        patch_size = getattr(hf_config_vision, "patch_size", 16)

        radio_config = RadioConfig(
            model_name=model_name,
            image_size=image_size,
            patch_size=patch_size,
            norm_mean=hf_config.norm_mean,
            norm_std=hf_config.norm_std,
            reg_tokens=(hf_config_vision.args.get("register_multiple")
                        if hasattr(hf_config_vision, "args")
                        and isinstance(hf_config_vision.args, dict) else None),
        )

        return RadioModel(config=radio_config)

    def copy_inputs_before_cuda_graphs(self, input_buffers, **kwargs):
        return self.language_model.mamba_cache.copy_inputs_before_cuda_graphs(
            input_buffers, **kwargs)

    def get_seqlen_agnostic_capture_inputs(self, batch_size: int):
        return (self.language_model.mamba_cache.
                get_seqlen_agnostic_capture_inputs(batch_size))

    @classmethod
    def get_mamba_state_shape_from_config(cls, vllm_config: "VllmConfig"):
        text_config = vllm_config.model_config.hf_config.text_config
        temp_vllm_config = copy.deepcopy(vllm_config)
        temp_vllm_config.model_config.hf_config = text_config
        return NemotronHForCausalLM.get_mamba_state_shape_from_config(
            temp_vllm_config)

    @classmethod
    def get_mamba_state_dtype_from_config(cls, vllm_config: "VllmConfig"):
        text_config = vllm_config.model_config.hf_config.text_config
        temp_vllm_config = copy.deepcopy(vllm_config)
        temp_vllm_config.model_config.hf_config = text_config
        return NemotronHForCausalLM.get_mamba_state_dtype_from_config(
            temp_vllm_config)

config instance-attribute

config = config

downsample_ratio instance-attribute

downsample_ratio = downsample_ratio

image_tag_type instance-attribute

image_tag_type = image_tag_type

img_context_token_id instance-attribute

img_context_token_id = None

language_model instance-attribute

language_model = init_vllm_registered_model(
    vllm_config=vllm_config,
    hf_config=text_config,
    prefix=maybe_prefix(prefix, "language_model"),
)

mlp1 instance-attribute

mlp1 = to(torch_dtype)

num_image_token instance-attribute

num_image_token = int(
    (image_size // patch_size) ** 2 * downsample_ratio**2
)

patch_size instance-attribute

patch_size = patch_size

ps_version instance-attribute

ps_version = ps_version

template instance-attribute

template = template

video_context_token_id instance-attribute

video_context_token_id = None

vision_model instance-attribute

vision_model = to(torch_dtype)

__init__

__init__(*, vllm_config: VllmConfig, prefix: str = '')
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
    super().__init__()
    config = vllm_config.model_config.hf_config

    image_size = config.force_image_size
    patch_size = config.patch_size
    self.patch_size = patch_size
    self.template = config.template
    self.num_image_token = int(
        (image_size // patch_size)**2 * (config.downsample_ratio**2))
    self.downsample_ratio = config.downsample_ratio
    self.ps_version = config.ps_version
    self.image_tag_type = config.image_tag_type

    self.language_model = init_vllm_registered_model(
        vllm_config=vllm_config,
        hf_config=config.text_config,
        prefix=maybe_prefix(prefix, "language_model"),
    )
    self.vision_model = self.get_vit_model_from_radio_config(config).to(
        self.language_model.config.torch_dtype)

    # Construct the vision projection.
    vit_hidden_size = config.vit_hidden_size
    vision_projection_hidden_size = config.projector_hidden_size
    llm_hidden_size = config.text_config.hidden_size

    self.mlp1 = nn.Sequential(
        RMSNorm(hidden_size=vit_hidden_size *
                int(1 / self.downsample_ratio)**2,
                eps=1e-5),
        nn.Linear(
            vit_hidden_size * int(1 / self.downsample_ratio)**2,
            vision_projection_hidden_size,
            bias=False,
        ),
        ReLUSquaredActivation(),
        nn.Linear(vision_projection_hidden_size,
                  llm_hidden_size,
                  bias=False),
    )
    self.mlp1 = self.mlp1.to(self.language_model.config.torch_dtype)

    self.img_context_token_id = None
    self.video_context_token_id = None
    self.config = config

_parse_and_validate_image_input

_parse_and_validate_image_input(
    **kwargs: object,
) -> Optional[NanoNemotronVLImageInputs]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _parse_and_validate_image_input(
        self, **kwargs: object) -> Optional[NanoNemotronVLImageInputs]:
    pixel_values_flat = kwargs.pop("pixel_values_flat", None)
    image_num_patches = kwargs.pop("image_num_patches", None)
    image_embeds = kwargs.pop("image_embeds", None)

    if pixel_values_flat is None and image_embeds is None:
        return None

    if image_embeds is not None:
        if not isinstance(image_embeds, (torch.Tensor, list)):
            raise ValueError("Incorrect type of image embeddings. "
                             f"Got type: {type(image_embeds)}")

        return NanoNemotronVLImageEmbeddinInputs(
            type="image_embeds",
            data=flatten_bn(image_embeds),
        )

    image_token_id = kwargs["image_token_id"]
    assert isinstance(image_token_id, torch.Tensor)
    self.img_context_token_id = image_token_id.flatten().unique().item()

    if pixel_values_flat is not None:
        if not isinstance(pixel_values_flat, (torch.Tensor, list)):
            raise ValueError("Incorrect type of pixel values. "
                             f"Got type: {type(pixel_values_flat)}")

        if not isinstance(image_num_patches, (torch.Tensor, list)):
            raise ValueError("Incorrect type of image_num_patches. "
                             f"Got type: {type(image_num_patches)}")

        pixel_values_flat = flatten_bn(pixel_values_flat, concat=True)
        image_num_patches = flatten_bn(image_num_patches, concat=True)

        return NanoNemotronVLImagePixelInputs(
            type="pixel_values",
            pixel_values_flat=pixel_values_flat,
            num_patches=image_num_patches,
        )

    raise AssertionError("This line should be unreachable.")

_parse_and_validate_multimodal_inputs

_parse_and_validate_multimodal_inputs(
    **kwargs: object,
) -> dict
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _parse_and_validate_multimodal_inputs(self, **kwargs: object) -> dict:
    modalities = {}
    # Preserve the order of modalities if there are multiple of them
    # from the order of kwargs.
    for input_key in kwargs:
        if input_key in ("pixel_values_flat",
                         "image_embeds") and "images" not in modalities:
            modalities["images"] = self._parse_and_validate_image_input(
                **kwargs)
        if input_key in ("pixel_values_flat_video",
                         ) and "videos" not in modalities:
            modalities["videos"] = self._parse_and_validate_video_input(
                **kwargs)

    return modalities

_parse_and_validate_video_input

_parse_and_validate_video_input(
    **kwargs: object,
) -> Optional[NanoNemotronVLVideoPixelInputs]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _parse_and_validate_video_input(
        self,
        **kwargs: object) -> Optional[NanoNemotronVLVideoPixelInputs]:
    pixel_values_flat_video = kwargs.pop("pixel_values_flat_video", None)
    video_num_patches = kwargs.pop("video_num_patches", None)
    video_embeds = kwargs.pop("video_embeds", None)

    if pixel_values_flat_video is None and video_embeds is None:
        return None

    if video_embeds is not None:
        return NanoNemotronVLVideoEmbeddingInputs(
            type="video_embeds",
            data=flatten_bn(video_embeds),
        )

    video_token_id = kwargs["video_token_id"]
    assert isinstance(video_token_id, torch.Tensor)
    self.video_context_token_id = video_token_id.flatten().unique().item()

    if pixel_values_flat_video is not None:
        if not isinstance(pixel_values_flat_video, (torch.Tensor, list)):
            raise ValueError("Incorrect type of pixel values. "
                             f"Got type: {type(pixel_values_flat_video)}")

        if not isinstance(video_num_patches, (torch.Tensor, list)):
            raise ValueError("Incorrect type of image_num_patches. "
                             f"Got type: {type(video_num_patches)}")

        pixel_values_flat_video = flatten_bn(pixel_values_flat_video,
                                             concat=True)
        video_num_patches = flatten_bn(video_num_patches, concat=True)
        expected_h = expected_w = self.config.force_image_size
        resolve_bindings = {"h": expected_h, "w": expected_w}

        return NanoNemotronVLVideoPixelInputs(
            type="pixel_values_videos",
            pixel_values_flat=pixel_values_flat_video,
            num_patches=video_num_patches,
            resolve_bindings=resolve_bindings,
        )

    raise AssertionError("This line should be unreachable.")

_process_image_input

_process_image_input(
    image_input: NanoNemotronVLImageInputs,
) -> Tensor
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _process_image_input(
        self, image_input: NanoNemotronVLImageInputs) -> torch.Tensor:
    if image_input["type"] == "image_embeds":
        return image_input["data"]

    assert self.vision_model is not None

    image_embeds = self.extract_feature(image_input["pixel_values_flat"])
    num_patches = image_input["num_patches"]

    # Only one image in the current batch
    if len(num_patches) == 1:
        return (image_embeds.view(-1,
                                  self.config.text_config.hidden_size), )

    # NOTE: Image embeddings are split into separate tensors for each image
    # by the size of each embedding.
    feature_size = image_embeds.shape[1]
    image_embeds = image_embeds.view(-1,
                                     self.config.text_config.hidden_size)
    image_feature_sizes = [
        num_patches * feature_size for num_patches in num_patches
    ]
    return image_embeds.split(image_feature_sizes)

compute_logits

compute_logits(hidden_states: Tensor) -> Optional[Tensor]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def compute_logits(
    self,
    hidden_states: torch.Tensor,
) -> Optional[torch.Tensor]:
    return self.language_model.compute_logits(hidden_states)

copy_inputs_before_cuda_graphs

copy_inputs_before_cuda_graphs(input_buffers, **kwargs)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def copy_inputs_before_cuda_graphs(self, input_buffers, **kwargs):
    return self.language_model.mamba_cache.copy_inputs_before_cuda_graphs(
        input_buffers, **kwargs)

extract_feature

extract_feature(pixel_values)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def extract_feature(self, pixel_values):
    vit_embeds = self.vision_model(pixel_values)
    vit_embeds = vit_embeds.to(dtype=torch.bfloat16)
    h = w = int(vit_embeds.shape[1]**0.5)
    vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1)
    vit_embeds = self.pixel_shuffle(vit_embeds,
                                    scale_factor=self.downsample_ratio)
    vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1,
                                    vit_embeds.shape[-1])
    vit_embeds = self.mlp1(vit_embeds)
    return vit_embeds

forward

forward(
    input_ids: Tensor,
    positions: Tensor,
    intermediate_tensors: Optional[
        IntermediateTensors
    ] = None,
    inputs_embeds: Optional[Tensor] = None,
    **kwargs: object,
) -> Union[Tensor, IntermediateTensors]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def forward(
    self,
    input_ids: torch.Tensor,
    positions: torch.Tensor,
    intermediate_tensors: Optional[IntermediateTensors] = None,
    inputs_embeds: Optional[torch.Tensor] = None,
    **kwargs: object,
) -> Union[torch.Tensor, IntermediateTensors]:
    if intermediate_tensors is not None:
        input_ids = None
        inputs_embeds = None

    hidden_states = self.language_model(
        input_ids=input_ids,
        positions=positions,
        intermediate_tensors=intermediate_tensors,
        inputs_embeds=inputs_embeds,
        **kwargs,
    )

    return hidden_states

get_language_model

get_language_model() -> Module
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_language_model(self) -> torch.nn.Module:
    return self.language_model

get_mamba_state_dtype_from_config classmethod

get_mamba_state_dtype_from_config(vllm_config: VllmConfig)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@classmethod
def get_mamba_state_dtype_from_config(cls, vllm_config: "VllmConfig"):
    text_config = vllm_config.model_config.hf_config.text_config
    temp_vllm_config = copy.deepcopy(vllm_config)
    temp_vllm_config.model_config.hf_config = text_config
    return NemotronHForCausalLM.get_mamba_state_dtype_from_config(
        temp_vllm_config)

get_mamba_state_shape_from_config classmethod

get_mamba_state_shape_from_config(vllm_config: VllmConfig)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@classmethod
def get_mamba_state_shape_from_config(cls, vllm_config: "VllmConfig"):
    text_config = vllm_config.model_config.hf_config.text_config
    temp_vllm_config = copy.deepcopy(vllm_config)
    temp_vllm_config.model_config.hf_config = text_config
    return NemotronHForCausalLM.get_mamba_state_shape_from_config(
        temp_vllm_config)

get_mm_mapping

get_mm_mapping() -> MultiModelKeys

Get the module prefix in multimodal models

Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_mm_mapping(self) -> MultiModelKeys:
    """
    Get the module prefix in multimodal models
    """
    return MultiModelKeys.from_string_field(
        language_model="language_model",
        connector="mlp1",
        tower_model="vision_model",
    )

get_model_info

get_model_info()

Get basic model information as a dictionary.

Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_model_info(self):
    """
    Get basic model information as a dictionary.
    """
    total_params = sum(p.numel() for p in self.parameters())

    component_info = {}
    for name, param in self.named_parameters():
        component = name.split(".")[0]
        if component not in component_info:
            component_info[component] = {"params": 0, "size": 0}
        component_info[component]["params"] += 1
        component_info[component]["size"] += param.numel()

    return {
        "model_name": "NemotronH_Nano_VL_V2",
        "total_parameters": total_params,
        "memory_estimate_mb": total_params * 2 / (1024**2),  # bfloat16
        "components": component_info,
        "config": {
            "image_size": getattr(self.config, "force_image_size", None),
            "patch_size": getattr(self.config, "patch_size", None),
            "num_image_token": self.num_image_token,
            "downsample_ratio": self.downsample_ratio,
        },
    }

get_multimodal_embeddings

get_multimodal_embeddings(
    **kwargs: object,
) -> MultiModalEmbeddings
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_multimodal_embeddings(self,
                              **kwargs: object) -> MultiModalEmbeddings:
    # Validate the multimodal input keyword arguments
    modalities = self._parse_and_validate_multimodal_inputs(**kwargs)
    if modalities is None:
        return []

    # # The result multimodal_embeddings is tuple of tensors, with each
    # tensor correspoending to a multimodal data item (image or video).
    multimodal_embeddings: tuple[torch.Tensor, ...] = ()

    # NOTE: It is important to iterate over the keys in this dictionary
    # to preserve the order of the modalities.
    for modality in modalities:
        if modality == "images":
            image_input = modalities["images"]
            vision_embeddings = self._process_image_input(image_input)
            multimodal_embeddings += vision_embeddings
        if modality == "videos":
            video_input = modalities["videos"]
            video_embeddings = self._process_image_input(video_input)
            multimodal_embeddings += video_embeddings

    return multimodal_embeddings

get_placeholder_str classmethod

get_placeholder_str(modality: str, i: int) -> Optional[str]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@classmethod
def get_placeholder_str(cls, modality: str, i: int) -> Optional[str]:
    if modality.startswith("image"):
        return "<image>"
    if modality.startswith("video"):
        return "<video>"
    return None

get_seqlen_agnostic_capture_inputs

get_seqlen_agnostic_capture_inputs(batch_size: int)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_seqlen_agnostic_capture_inputs(self, batch_size: int):
    return (self.language_model.mamba_cache.
            get_seqlen_agnostic_capture_inputs(batch_size))

get_vit_model_from_radio_config

get_vit_model_from_radio_config(hf_config)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_vit_model_from_radio_config(self, hf_config):
    hf_config_vision = hf_config.vision_config
    model_name = hf_config_vision.args.get("model")
    if model_name is None:
        raise ValueError(f'Unsupported vit model type: {model_name}')

    preferred_resolution = getattr(hf_config_vision,
                                   "preferred_resolution", None)
    image_size = preferred_resolution[0] if preferred_resolution else 224
    patch_size = getattr(hf_config_vision, "patch_size", 16)

    radio_config = RadioConfig(
        model_name=model_name,
        image_size=image_size,
        patch_size=patch_size,
        norm_mean=hf_config.norm_mean,
        norm_std=hf_config.norm_std,
        reg_tokens=(hf_config_vision.args.get("register_multiple")
                    if hasattr(hf_config_vision, "args")
                    and isinstance(hf_config_vision.args, dict) else None),
    )

    return RadioModel(config=radio_config)

load_weights

load_weights(weights: Iterable[tuple[str, Tensor]])
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]):
    adapter_dict = dict(self.mlp1.named_parameters())

    def is_llm(name: str) -> bool:
        return name.startswith("language_model")

    def is_adapter_weights(weight: tuple[str, torch.Tensor]):
        return weight[0].startswith("mlp1")

    def is_vision_weights(name: str) -> bool:
        return name.startswith("vision_model.radio_model.")

    # Separate weights by component
    llm_weights = []
    vision_weights = []

    for name, w in weights:
        if is_llm(name):
            # Strip 'language_model.' prefix for LLM weights
            llm_weights.append((".".join(name.split(".")[1:]), w))
        elif is_adapter_weights((name, w)):
            # Load vision-language adapter weights directly
            trimmed_name = ".".join(name.split(".")[1:])
            param = adapter_dict[trimmed_name]
            with torch.no_grad():
                default_weight_loader(param, w)
        elif is_vision_weights(name):
            # Convert: vision_model.radio_model.* → radio_model.*
            hf_key = name[len(
                "vision_model."):]  # Remove "vision_model." prefix
            vision_weights.append((hf_key, w))

    self.language_model.load_weights(llm_weights)
    self.vision_model.load_weights(vision_weights)

pixel_shuffle

pixel_shuffle(x, scale_factor=0.5)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def pixel_shuffle(self, x, scale_factor=0.5):
    n, w, h, c = x.size()
    # N, W, H, C --> N, W, H * scale, C // scale
    x = x.view(
        n,
        w,
        int(h * scale_factor),
        int(c / scale_factor),
    )
    # N, W, H * scale, C // scale --> N, H * scale, W, C // scale
    x = x.permute(0, 2, 1, 3).contiguous()
    # N, H * scale, W, C // scale -->
    # N, H * scale, W * scale, C // (scale ** 2)
    x = x.view(
        n,
        int(h * scale_factor),
        int(w * scale_factor),
        int(c / (scale_factor * scale_factor)),
    )
    if self.ps_version == "v1":
        warnings.warn(
            "In ps_version 'v1', the height and width have not "
            "been swapped back, which results in a transposed image.",
            stacklevel=2,
        )
    else:
        x = x.permute(0, 2, 1, 3).contiguous()
    return x

print_architecture

print_architecture(
    detailed: bool = True, save_to_file: str = None
)

Print model architecture with parameter names, shapes, and sizes.

Parameters:

Name Type Description Default
detailed bool

If True, show detailed parameter breakdown

True
save_to_file str

If provided, save output to this file path

None
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def print_architecture(self,
                       detailed: bool = True,
                       save_to_file: str = None):
    """
    Print model architecture with parameter names, shapes, and sizes.

    Args:
        detailed: If True, show detailed parameter breakdown
        save_to_file: If provided, save output to this file path
    """
    import sys
    from io import StringIO

    # Capture output if saving to file
    original_stdout = sys.stdout
    if save_to_file:
        sys.stdout = StringIO()

    try:
        print("=" * 100)
        print("NemotronH_Nano_VL_V2 Model Architecture")
        print("=" * 100)

        total_params = 0
        param_groups = {
            "language_model": [],
            "vision_model": [],
            "mlp1": [],
            "other": [],
        }

        for name, param in self.named_parameters():
            param_size = param.numel()
            total_params += param_size

            # Group parameters by main component
            if name.startswith("language_model"):
                param_groups["language_model"].append(
                    (name, param.shape, param_size, param.dtype))
            elif name.startswith("vision_model"):
                param_groups["vision_model"].append(
                    (name, param.shape, param_size, param.dtype))
            elif name.startswith("mlp1"):
                param_groups["mlp1"].append(
                    (name, param.shape, param_size, param.dtype))
            else:
                param_groups["other"].append(
                    (name, param.shape, param_size, param.dtype))

            if detailed:
                print(f"{name:<70} | Shape: {str(param.shape):<25} | "
                      f"Size: {param_size:>12,} | Dtype: {param.dtype}")

        print("=" * 100)
        print("Summary by Component:")
        print("-" * 60)

        for component, params in param_groups.items():
            if params:  # Only show components that have parameters
                component_total = sum(size for _, _, size, _ in params)
                percentage = ((component_total / total_params) *
                              100 if total_params > 0 else 0)
                print(f"{component:<20} | Parameters: {len(params):>4} | "
                      f"Total Size: {component_total:>15,} | "
                      f"{percentage:>6.2f}%")

        print("-" * 60)
        print(f"{'Total Parameters':<20} | {total_params:>15,}")

        # Estimate memory usage (assuming bfloat16 = 2 bytes per parameter)
        memory_mb = total_params * 2 / (1024**2)
        memory_gb = memory_mb / 1024
        print(f"{'Est. Memory (MB)':<20} | {memory_mb:>15.2f}")
        print(f"{'Est. Memory (GB)':<20} | {memory_gb:>15.2f}")
        print("=" * 100)

        # Save to file if requested
        if save_to_file:
            output = sys.stdout.getvalue()
            sys.stdout = original_stdout
            with open(save_to_file, "w") as f:
                f.write(output)
            print(f"Architecture saved to: {save_to_file}")
            print(output)  # Also print to console

    finally:
        if save_to_file and sys.stdout != original_stdout:
            sys.stdout = original_stdout

dynamic_preprocess

dynamic_preprocess(
    image,
    *,
    image_size=512,
    max_num_tiles=12,
    use_thumbnail=True,
    idx=0,
)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def dynamic_preprocess(image,
                       *,
                       image_size=512,
                       max_num_tiles=12,
                       use_thumbnail=True,
                       idx=0):
    orig_width, orig_height = image.size

    target_ratios = get_internvl_target_ratios(1, max_num_tiles)

    blocks, target_width, target_height = calculate_internvl_targets(
        orig_width=orig_width,
        orig_height=orig_height,
        target_ratios=target_ratios,
        image_size=image_size,
        use_thumbnail=False)
    # resize the image
    resized_img = image.resize((target_width, target_height))
    processed_images = []
    for i in range(blocks):
        box = (
            (i % (target_width // image_size)) * image_size,
            (i // (target_width // image_size)) * image_size,
            ((i % (target_width // image_size)) + 1) * image_size,
            ((i // (target_width // image_size)) + 1) * image_size,
        )
        # split the image
        split_img = resized_img.crop(box)
        processed_images.append(split_img)
    assert len(processed_images) == blocks
    if use_thumbnail and len(processed_images) != 1:
        thumbnail_img = image.resize((image_size, image_size))
        processed_images.append(thumbnail_img)

    processed_images = [
        img.convert("RGB") if img.mode != "RGB" else img
        for img in processed_images
    ]
    processed_images = [
        T.Resize((image_size, image_size),
                 interpolation=T.InterpolationMode.BICUBIC)(img)
        for img in processed_images
    ]
    processed_images = [T.ToTensor()(img) for img in processed_images]
    return processed_images

image_to_pixel_values

image_to_pixel_values(
    image: Image,
    *,
    input_size: int,
    max_num: int,
    use_thumbnail: bool,
    idx: int,
) -> Tensor
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def image_to_pixel_values(
    image: Image.Image,
    *,
    input_size: int,
    max_num: int,
    use_thumbnail: bool,
    idx: int,
) -> torch.Tensor:
    images = dynamic_preprocess(
        image,
        image_size=input_size,
        max_num_tiles=max_num,
        use_thumbnail=use_thumbnail,
        idx=idx,
    )

    pixel_values = torch.stack(images)
    return pixel_values

video_to_pixel_values

video_to_pixel_values(
    video: NDArray,
    *,
    input_size: int,
    max_num_tiles: int = 1,
    use_thumbnail: bool,
) -> Tensor
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def video_to_pixel_values(
    video: npt.NDArray,
    *,
    input_size: int,
    max_num_tiles: int = 1,
    use_thumbnail: bool,
) -> torch.Tensor:
    # Convert each frame to a single resized tile tensor consistent
    # with image path
    frames_tensors: list[torch.Tensor] = []
    for frame in video:
        pil_frame = dynamic_preprocess(
            Image.fromarray(frame, mode="RGB"),
            image_size=input_size,
            max_num_tiles=max_num_tiles,
            use_thumbnail=use_thumbnail,
            idx=0,
        )
        # dynamic_preprocess returns tensors already; take the single tile
        assert len(pil_frame) >= 1
        frames_tensors.append(pil_frame[0])

    return torch.stack(frames_tensors)