Skip to content

vllm.multimodal.utils

_M module-attribute

_M = TypeVar('_M')

global_thread_pool module-attribute

global_thread_pool = ThreadPoolExecutor(
    max_workers=VLLM_MEDIA_LOADING_THREAD_COUNT
)

MediaConnector

Source code in vllm/multimodal/utils.py
class MediaConnector:

    def __init__(
        self,
        media_io_kwargs: Optional[dict[str, dict[str, Any]]] = None,
        connection: HTTPConnection = global_http_connection,
        *,
        allowed_local_media_path: str = "",
        allowed_media_domains: Optional[list[str]] = None,
    ) -> None:
        """
        Args:
            media_io_kwargs: Additional args passed to process media 
                             inputs, keyed by modalities. For example, 
                             to set num_frames for video, set 
                             `--media-io-kwargs '{"video":{"num_frames":40}}'`
            connection: HTTP connection client to download media contents.
            allowed_local_media_path: A local directory to load media files
                                      from.
        """
        super().__init__()

        self.media_io_kwargs: dict[str, dict[
            str, Any]] = media_io_kwargs if media_io_kwargs else {}
        self.connection = connection

        if allowed_local_media_path:
            allowed_local_media_path_ = Path(allowed_local_media_path)

            if not allowed_local_media_path_.exists():
                raise ValueError(
                    "Invalid `--allowed-local-media-path`: The path "
                    f"{allowed_local_media_path_} does not exist.")
            if not allowed_local_media_path_.is_dir():
                raise ValueError(
                    "Invalid `--allowed-local-media-path`: The path "
                    f"{allowed_local_media_path_} must be a directory.")
        else:
            allowed_local_media_path_ = None

        self.allowed_local_media_path = allowed_local_media_path_
        if allowed_media_domains is None:
            allowed_media_domains = []
        self.allowed_media_domains = allowed_media_domains

    def _load_data_url(
        self,
        url_spec: ParseResult,
        media_io: MediaIO[_M],
    ) -> _M:  # type: ignore[type-var]
        data_spec, data = url_spec.path.split(",", 1)
        media_type, data_type = data_spec.split(";", 1)

        if data_type != "base64":
            msg = "Only base64 data URLs are supported for now."
            raise NotImplementedError(msg)

        return media_io.load_base64(media_type, data)

    def _load_file_url(
        self,
        url_spec: ParseResult,
        media_io: MediaIO[_M],
    ) -> _M:  # type: ignore[type-var]
        allowed_local_media_path = self.allowed_local_media_path
        if allowed_local_media_path is None:
            raise RuntimeError("Cannot load local files without "
                               "`--allowed-local-media-path`.")

        filepath = Path(url2pathname(url_spec.path))
        if allowed_local_media_path not in filepath.resolve().parents:
            raise ValueError(
                f"The file path {filepath} must be a subpath "
                f"of `--allowed-local-media-path` {allowed_local_media_path}.")

        return media_io.load_file(filepath)

    def _assert_url_in_allowed_media_domains(self, url_spec) -> None:
        if self.allowed_media_domains and url_spec.hostname not in \
            self.allowed_media_domains:
            raise ValueError(
                f"The URL must be from one of the allowed domains: "
                f"{self.allowed_media_domains}. Input URL domain: "
                f"{url_spec.hostname}")

    def load_from_url(
        self,
        url: str,
        media_io: MediaIO[_M],
        *,
        fetch_timeout: Optional[int] = None,
    ) -> _M:  # type: ignore[type-var]
        url_spec = urlparse(url)

        if url_spec.scheme.startswith("http"):
            self._assert_url_in_allowed_media_domains(url_spec)

            connection = self.connection
            data = connection.get_bytes(url, timeout=fetch_timeout)

            return media_io.load_bytes(data)

        if url_spec.scheme == "data":
            return self._load_data_url(url_spec, media_io)

        if url_spec.scheme == "file":
            return self._load_file_url(url_spec, media_io)

        msg = "The URL must be either a HTTP, data or file URL."
        raise ValueError(msg)

    async def load_from_url_async(
        self,
        url: str,
        media_io: MediaIO[_M],
        *,
        fetch_timeout: Optional[int] = None,
    ) -> _M:
        url_spec = urlparse(url)
        loop = asyncio.get_running_loop()

        if url_spec.scheme.startswith("http"):
            self._assert_url_in_allowed_media_domains(url_spec)

            connection = self.connection
            data = await connection.async_get_bytes(url, timeout=fetch_timeout)
            future = loop.run_in_executor(global_thread_pool,
                                          media_io.load_bytes, data)
            return await future

        if url_spec.scheme == "data":
            future = loop.run_in_executor(global_thread_pool,
                                          self._load_data_url, url_spec,
                                          media_io)
            return await future

        if url_spec.scheme == "file":
            future = loop.run_in_executor(global_thread_pool,
                                          self._load_file_url, url_spec,
                                          media_io)
            return await future
        msg = "The URL must be either a HTTP, data or file URL."
        raise ValueError(msg)

    def fetch_audio(
        self,
        audio_url: str,
    ) -> tuple[np.ndarray, Union[int, float]]:
        """
        Load audio from a URL.
        """
        audio_io = AudioMediaIO(**self.media_io_kwargs.get("audio", {}))

        return self.load_from_url(
            audio_url,
            audio_io,
            fetch_timeout=envs.VLLM_AUDIO_FETCH_TIMEOUT,
        )

    async def fetch_audio_async(
        self,
        audio_url: str,
    ) -> tuple[np.ndarray, Union[int, float]]:
        """
        Asynchronously fetch audio from a URL.
        """
        audio_io = AudioMediaIO(**self.media_io_kwargs.get("audio", {}))

        return await self.load_from_url_async(
            audio_url,
            audio_io,
            fetch_timeout=envs.VLLM_AUDIO_FETCH_TIMEOUT,
        )

    def fetch_image(
        self,
        image_url: str,
        *,
        image_mode: str = "RGB",
    ) -> Image.Image:
        """
        Load a PIL image from an HTTP or base64 data URL.

        By default, the image is converted into RGB format.
        """
        image_io = ImageMediaIO(image_mode=image_mode,
                                **self.media_io_kwargs.get("image", {}))

        try:
            return self.load_from_url(
                image_url,
                image_io,
                fetch_timeout=envs.VLLM_IMAGE_FETCH_TIMEOUT,
            )
        except UnidentifiedImageError as e:
            # convert to ValueError to be properly caught upstream
            raise ValueError(str(e)) from e

    async def fetch_image_async(
        self,
        image_url: str,
        *,
        image_mode: str = "RGB",
    ) -> Image.Image:
        """
        Asynchronously load a PIL image from an HTTP or base64 data URL.

        By default, the image is converted into RGB format.
        """
        image_io = ImageMediaIO(image_mode=image_mode,
                                **self.media_io_kwargs.get("image", {}))

        try:
            return await self.load_from_url_async(
                image_url,
                image_io,
                fetch_timeout=envs.VLLM_IMAGE_FETCH_TIMEOUT,
            )
        except UnidentifiedImageError as e:
            # convert to ValueError to be properly caught upstream
            raise ValueError(str(e)) from e

    def fetch_video(
        self,
        video_url: str,
        *,
        image_mode: str = "RGB",
    ) -> tuple[npt.NDArray, dict[str, Any]]:
        """
        Load video from an HTTP or base64 data URL.
        """
        image_io = ImageMediaIO(image_mode=image_mode,
                                **self.media_io_kwargs.get("image", {}))
        video_io = VideoMediaIO(image_io,
                                **self.media_io_kwargs.get("video", {}))

        return self.load_from_url(
            video_url,
            video_io,
            fetch_timeout=envs.VLLM_VIDEO_FETCH_TIMEOUT,
        )

    async def fetch_video_async(
        self,
        video_url: str,
        *,
        image_mode: str = "RGB",
    ) -> tuple[npt.NDArray, dict[str, Any]]:
        """
        Asynchronously load video from an HTTP or base64 data URL.

        By default, the image is converted into RGB format.
        """
        image_io = ImageMediaIO(image_mode=image_mode,
                                **self.media_io_kwargs.get("image", {}))
        video_io = VideoMediaIO(image_io,
                                **self.media_io_kwargs.get("video", {}))

        return await self.load_from_url_async(
            video_url,
            video_io,
            fetch_timeout=envs.VLLM_VIDEO_FETCH_TIMEOUT,
        )

    def fetch_image_embedding(
        self,
        data: str,
    ) -> torch.Tensor:
        """
        Load image embedding from a URL.
        """
        image_embedding_io = ImageEmbeddingMediaIO()

        return image_embedding_io.load_base64("", data)

allowed_local_media_path instance-attribute

allowed_local_media_path = allowed_local_media_path_

allowed_media_domains instance-attribute

allowed_media_domains = allowed_media_domains

connection instance-attribute

connection = connection

media_io_kwargs instance-attribute

media_io_kwargs: dict[str, dict[str, Any]] = (
    media_io_kwargs if media_io_kwargs else {}
)

__init__

__init__(
    media_io_kwargs: Optional[
        dict[str, dict[str, Any]]
    ] = None,
    connection: HTTPConnection = global_http_connection,
    *,
    allowed_local_media_path: str = "",
    allowed_media_domains: Optional[list[str]] = None,
) -> None

Parameters:

Name Type Description Default
media_io_kwargs Optional[dict[str, dict[str, Any]]]

Additional args passed to process media inputs, keyed by modalities. For example, to set num_frames for video, set --media-io-kwargs '{"video":{"num_frames":40}}'

None
connection HTTPConnection

HTTP connection client to download media contents.

global_http_connection
allowed_local_media_path str

A local directory to load media files from.

''
Source code in vllm/multimodal/utils.py
def __init__(
    self,
    media_io_kwargs: Optional[dict[str, dict[str, Any]]] = None,
    connection: HTTPConnection = global_http_connection,
    *,
    allowed_local_media_path: str = "",
    allowed_media_domains: Optional[list[str]] = None,
) -> None:
    """
    Args:
        media_io_kwargs: Additional args passed to process media 
                         inputs, keyed by modalities. For example, 
                         to set num_frames for video, set 
                         `--media-io-kwargs '{"video":{"num_frames":40}}'`
        connection: HTTP connection client to download media contents.
        allowed_local_media_path: A local directory to load media files
                                  from.
    """
    super().__init__()

    self.media_io_kwargs: dict[str, dict[
        str, Any]] = media_io_kwargs if media_io_kwargs else {}
    self.connection = connection

    if allowed_local_media_path:
        allowed_local_media_path_ = Path(allowed_local_media_path)

        if not allowed_local_media_path_.exists():
            raise ValueError(
                "Invalid `--allowed-local-media-path`: The path "
                f"{allowed_local_media_path_} does not exist.")
        if not allowed_local_media_path_.is_dir():
            raise ValueError(
                "Invalid `--allowed-local-media-path`: The path "
                f"{allowed_local_media_path_} must be a directory.")
    else:
        allowed_local_media_path_ = None

    self.allowed_local_media_path = allowed_local_media_path_
    if allowed_media_domains is None:
        allowed_media_domains = []
    self.allowed_media_domains = allowed_media_domains

_assert_url_in_allowed_media_domains

_assert_url_in_allowed_media_domains(url_spec) -> None
Source code in vllm/multimodal/utils.py
def _assert_url_in_allowed_media_domains(self, url_spec) -> None:
    if self.allowed_media_domains and url_spec.hostname not in \
        self.allowed_media_domains:
        raise ValueError(
            f"The URL must be from one of the allowed domains: "
            f"{self.allowed_media_domains}. Input URL domain: "
            f"{url_spec.hostname}")

_load_data_url

_load_data_url(
    url_spec: ParseResult, media_io: MediaIO[_M]
) -> _M
Source code in vllm/multimodal/utils.py
def _load_data_url(
    self,
    url_spec: ParseResult,
    media_io: MediaIO[_M],
) -> _M:  # type: ignore[type-var]
    data_spec, data = url_spec.path.split(",", 1)
    media_type, data_type = data_spec.split(";", 1)

    if data_type != "base64":
        msg = "Only base64 data URLs are supported for now."
        raise NotImplementedError(msg)

    return media_io.load_base64(media_type, data)

_load_file_url

_load_file_url(
    url_spec: ParseResult, media_io: MediaIO[_M]
) -> _M
Source code in vllm/multimodal/utils.py
def _load_file_url(
    self,
    url_spec: ParseResult,
    media_io: MediaIO[_M],
) -> _M:  # type: ignore[type-var]
    allowed_local_media_path = self.allowed_local_media_path
    if allowed_local_media_path is None:
        raise RuntimeError("Cannot load local files without "
                           "`--allowed-local-media-path`.")

    filepath = Path(url2pathname(url_spec.path))
    if allowed_local_media_path not in filepath.resolve().parents:
        raise ValueError(
            f"The file path {filepath} must be a subpath "
            f"of `--allowed-local-media-path` {allowed_local_media_path}.")

    return media_io.load_file(filepath)

fetch_audio

fetch_audio(
    audio_url: str,
) -> tuple[ndarray, Union[int, float]]

Load audio from a URL.

Source code in vllm/multimodal/utils.py
def fetch_audio(
    self,
    audio_url: str,
) -> tuple[np.ndarray, Union[int, float]]:
    """
    Load audio from a URL.
    """
    audio_io = AudioMediaIO(**self.media_io_kwargs.get("audio", {}))

    return self.load_from_url(
        audio_url,
        audio_io,
        fetch_timeout=envs.VLLM_AUDIO_FETCH_TIMEOUT,
    )

fetch_audio_async async

fetch_audio_async(
    audio_url: str,
) -> tuple[ndarray, Union[int, float]]

Asynchronously fetch audio from a URL.

Source code in vllm/multimodal/utils.py
async def fetch_audio_async(
    self,
    audio_url: str,
) -> tuple[np.ndarray, Union[int, float]]:
    """
    Asynchronously fetch audio from a URL.
    """
    audio_io = AudioMediaIO(**self.media_io_kwargs.get("audio", {}))

    return await self.load_from_url_async(
        audio_url,
        audio_io,
        fetch_timeout=envs.VLLM_AUDIO_FETCH_TIMEOUT,
    )

fetch_image

fetch_image(
    image_url: str, *, image_mode: str = "RGB"
) -> Image

Load a PIL image from an HTTP or base64 data URL.

By default, the image is converted into RGB format.

Source code in vllm/multimodal/utils.py
def fetch_image(
    self,
    image_url: str,
    *,
    image_mode: str = "RGB",
) -> Image.Image:
    """
    Load a PIL image from an HTTP or base64 data URL.

    By default, the image is converted into RGB format.
    """
    image_io = ImageMediaIO(image_mode=image_mode,
                            **self.media_io_kwargs.get("image", {}))

    try:
        return self.load_from_url(
            image_url,
            image_io,
            fetch_timeout=envs.VLLM_IMAGE_FETCH_TIMEOUT,
        )
    except UnidentifiedImageError as e:
        # convert to ValueError to be properly caught upstream
        raise ValueError(str(e)) from e

fetch_image_async async

fetch_image_async(
    image_url: str, *, image_mode: str = "RGB"
) -> Image

Asynchronously load a PIL image from an HTTP or base64 data URL.

By default, the image is converted into RGB format.

Source code in vllm/multimodal/utils.py
async def fetch_image_async(
    self,
    image_url: str,
    *,
    image_mode: str = "RGB",
) -> Image.Image:
    """
    Asynchronously load a PIL image from an HTTP or base64 data URL.

    By default, the image is converted into RGB format.
    """
    image_io = ImageMediaIO(image_mode=image_mode,
                            **self.media_io_kwargs.get("image", {}))

    try:
        return await self.load_from_url_async(
            image_url,
            image_io,
            fetch_timeout=envs.VLLM_IMAGE_FETCH_TIMEOUT,
        )
    except UnidentifiedImageError as e:
        # convert to ValueError to be properly caught upstream
        raise ValueError(str(e)) from e

fetch_image_embedding

fetch_image_embedding(data: str) -> Tensor

Load image embedding from a URL.

Source code in vllm/multimodal/utils.py
def fetch_image_embedding(
    self,
    data: str,
) -> torch.Tensor:
    """
    Load image embedding from a URL.
    """
    image_embedding_io = ImageEmbeddingMediaIO()

    return image_embedding_io.load_base64("", data)

fetch_video

fetch_video(
    video_url: str, *, image_mode: str = "RGB"
) -> tuple[NDArray, dict[str, Any]]

Load video from an HTTP or base64 data URL.

Source code in vllm/multimodal/utils.py
def fetch_video(
    self,
    video_url: str,
    *,
    image_mode: str = "RGB",
) -> tuple[npt.NDArray, dict[str, Any]]:
    """
    Load video from an HTTP or base64 data URL.
    """
    image_io = ImageMediaIO(image_mode=image_mode,
                            **self.media_io_kwargs.get("image", {}))
    video_io = VideoMediaIO(image_io,
                            **self.media_io_kwargs.get("video", {}))

    return self.load_from_url(
        video_url,
        video_io,
        fetch_timeout=envs.VLLM_VIDEO_FETCH_TIMEOUT,
    )

fetch_video_async async

fetch_video_async(
    video_url: str, *, image_mode: str = "RGB"
) -> tuple[NDArray, dict[str, Any]]

Asynchronously load video from an HTTP or base64 data URL.

By default, the image is converted into RGB format.

Source code in vllm/multimodal/utils.py
async def fetch_video_async(
    self,
    video_url: str,
    *,
    image_mode: str = "RGB",
) -> tuple[npt.NDArray, dict[str, Any]]:
    """
    Asynchronously load video from an HTTP or base64 data URL.

    By default, the image is converted into RGB format.
    """
    image_io = ImageMediaIO(image_mode=image_mode,
                            **self.media_io_kwargs.get("image", {}))
    video_io = VideoMediaIO(image_io,
                            **self.media_io_kwargs.get("video", {}))

    return await self.load_from_url_async(
        video_url,
        video_io,
        fetch_timeout=envs.VLLM_VIDEO_FETCH_TIMEOUT,
    )

load_from_url

load_from_url(
    url: str,
    media_io: MediaIO[_M],
    *,
    fetch_timeout: Optional[int] = None,
) -> _M
Source code in vllm/multimodal/utils.py
def load_from_url(
    self,
    url: str,
    media_io: MediaIO[_M],
    *,
    fetch_timeout: Optional[int] = None,
) -> _M:  # type: ignore[type-var]
    url_spec = urlparse(url)

    if url_spec.scheme.startswith("http"):
        self._assert_url_in_allowed_media_domains(url_spec)

        connection = self.connection
        data = connection.get_bytes(url, timeout=fetch_timeout)

        return media_io.load_bytes(data)

    if url_spec.scheme == "data":
        return self._load_data_url(url_spec, media_io)

    if url_spec.scheme == "file":
        return self._load_file_url(url_spec, media_io)

    msg = "The URL must be either a HTTP, data or file URL."
    raise ValueError(msg)

load_from_url_async async

load_from_url_async(
    url: str,
    media_io: MediaIO[_M],
    *,
    fetch_timeout: Optional[int] = None,
) -> _M
Source code in vllm/multimodal/utils.py
async def load_from_url_async(
    self,
    url: str,
    media_io: MediaIO[_M],
    *,
    fetch_timeout: Optional[int] = None,
) -> _M:
    url_spec = urlparse(url)
    loop = asyncio.get_running_loop()

    if url_spec.scheme.startswith("http"):
        self._assert_url_in_allowed_media_domains(url_spec)

        connection = self.connection
        data = await connection.async_get_bytes(url, timeout=fetch_timeout)
        future = loop.run_in_executor(global_thread_pool,
                                      media_io.load_bytes, data)
        return await future

    if url_spec.scheme == "data":
        future = loop.run_in_executor(global_thread_pool,
                                      self._load_data_url, url_spec,
                                      media_io)
        return await future

    if url_spec.scheme == "file":
        future = loop.run_in_executor(global_thread_pool,
                                      self._load_file_url, url_spec,
                                      media_io)
        return await future
    msg = "The URL must be either a HTTP, data or file URL."
    raise ValueError(msg)

argsort_mm_positions

argsort_mm_positions(
    mm_positions: MultiModalPlaceholderDict,
) -> list[tuple[str, int]]

Given a MultiModalPlaceholderDict, output a sequence of keys to sort the dictionary by offset (starting index in the input sequence) in ascending order.

Returns:

Type Description
list[tuple[str, int]]

A list of (modality, idx), which can be used to access an item

list[tuple[str, int]]

by mm_positions[modality][idx].

Source code in vllm/multimodal/utils.py
def argsort_mm_positions(
        mm_positions: MultiModalPlaceholderDict) -> list[tuple[str, int]]:
    """
    Given a `MultiModalPlaceholderDict`, output a sequence of keys to
    sort the dictionary by `offset` (starting index in the input sequence)
    in ascending order.

    Returns:
        A list of `(modality, idx)`, which can be used to access an item
        by `mm_positions[modality][idx]`.
    """
    flat_items = ((modality, idx, item)
                  for modality, items in mm_positions.items()
                  for idx, item in enumerate(items))

    sorted_flat_items = sorted(flat_items, key=lambda x: x[2].offset)

    return [(modality, idx) for modality, idx, _ in sorted_flat_items]

encode_audio_base64

encode_audio_base64(
    audio: ndarray, sampling_rate: int
) -> str

Encode audio as base64.

Source code in vllm/multimodal/utils.py
def encode_audio_base64(
    audio: np.ndarray,
    sampling_rate: int,
) -> str:
    """Encode audio as base64."""
    audio_io = AudioMediaIO()
    return audio_io.encode_base64((audio, sampling_rate))

encode_image_base64

encode_image_base64(
    image: Image,
    *,
    image_mode: str = "RGB",
    format: str = "JPEG",
) -> str

Encode a pillow image to base64 format.

By default, the image is converted into RGB format before being encoded.

Source code in vllm/multimodal/utils.py
def encode_image_base64(
    image: Image.Image,
    *,
    image_mode: str = "RGB",
    format: str = "JPEG",
) -> str:
    """
    Encode a pillow image to base64 format.

    By default, the image is converted into RGB format before being encoded.
    """
    image_io = ImageMediaIO(image_mode=image_mode)
    return image_io.encode_base64(image, image_format=format)

encode_video_base64

encode_video_base64(frames: NDArray) -> str
Source code in vllm/multimodal/utils.py
def encode_video_base64(frames: npt.NDArray) -> str:
    image_io = ImageMediaIO()
    video_io = VideoMediaIO(image_io)
    return video_io.encode_base64(frames)

fetch_audio

fetch_audio(
    audio_url: str,
    audio_io_kwargs: Optional[dict[str, Any]] = None,
) -> tuple[ndarray, Union[int, float]]

Parameters:

Name Type Description Default
audio_url str

URL of the audio file to fetch.

required
audio_io_kwargs Optional[dict[str, Any]]

Additional kwargs passed to handle audio IO.

None
Source code in vllm/multimodal/utils.py
def fetch_audio(
    audio_url: str,
    audio_io_kwargs: Optional[dict[str, Any]] = None,
) -> tuple[np.ndarray, Union[int, float]]:
    """
    Args:
        audio_url: URL of the audio file to fetch.
        audio_io_kwargs: Additional kwargs passed to handle audio IO.
    """
    media_io_kwargs = None if not audio_io_kwargs else {
        "audio": audio_io_kwargs
    }
    media_connector = MediaConnector(media_io_kwargs=media_io_kwargs)
    return media_connector.fetch_audio(audio_url)

fetch_image

fetch_image(
    image_url: str,
    image_io_kwargs: Optional[dict[str, Any]] = None,
) -> Image

Parameters:

Name Type Description Default
image_url str

URL of the image file to fetch.

required
image_io_kwargs Optional[dict[str, Any]]

Additional kwargs passed to handle image IO.

None
Source code in vllm/multimodal/utils.py
def fetch_image(
    image_url: str,
    image_io_kwargs: Optional[dict[str, Any]] = None,
) -> Image.Image:
    """
    Args:
        image_url: URL of the image file to fetch.
        image_io_kwargs: Additional kwargs passed to handle image IO.
    """
    media_io_kwargs = None if not image_io_kwargs else {
        "image": image_io_kwargs
    }
    media_connector = MediaConnector(media_io_kwargs=media_io_kwargs)
    return media_connector.fetch_image(image_url)

fetch_video

fetch_video(
    video_url: str,
    video_io_kwargs: Optional[dict[str, Any]] = None,
) -> tuple[NDArray, dict[str, Any]]

Parameters:

Name Type Description Default
video_url str

URL of the video file to fetch.

required
video_io_kwargs Optional[dict[str, Any]]

Additional kwargs passed to handle video IO.

None
Source code in vllm/multimodal/utils.py
def fetch_video(
    video_url: str,
    video_io_kwargs: Optional[dict[str, Any]] = None,
) -> tuple[npt.NDArray, dict[str, Any]]:
    """
    Args:
        video_url: URL of the video file to fetch.
        video_io_kwargs: Additional kwargs passed to handle video IO.
    """
    media_io_kwargs = None if not video_io_kwargs else {
        "video": video_io_kwargs
    }
    media_connector = MediaConnector(media_io_kwargs=media_io_kwargs)
    return media_connector.fetch_video(video_url)

group_mm_inputs_by_modality

group_mm_inputs_by_modality(
    mm_inputs: list[MultiModalKwargsItems],
) -> list[list[MultiModalKwargsItems]]
Source code in vllm/multimodal/utils.py
@deprecated("`group_mm_inputs_by_modality` is superseded by "
            "`group_mm_kwargs_by_modality` and will be removed in v0.13. "
            "Please use `group_mm_kwargs_by_modality` instead.")
def group_mm_inputs_by_modality(
    mm_inputs: list[MultiModalKwargsItems]
) -> list[list[MultiModalKwargsItems]]:
    if not mm_inputs:
        return []

    def modality_group_func(
            mm_input: MultiModalKwargsItems) -> Union[str, int]:
        # If the input has multiple modalities, return an id as the unique key
        # for the mm_input input.
        if len(mm_input) > 1:
            return id(mm_input)

        elif len(mm_input) == 1:
            return next(iter(mm_input.keys()))

        raise AssertionError("This line should be unreachable.")

    return [
        list(group) for _, group in groupby(mm_inputs, key=modality_group_func)
    ]

group_mm_kwargs_by_modality

group_mm_kwargs_by_modality(
    mm_kwargs: list[MultiModalKwargsItem],
    *,
    device: Device = None,
    pin_memory: bool = False,
    merge_by_field_config: bool = False,
) -> Iterable[tuple[str, int, BatchedTensorInputs]]

Group consecutive MultiModalKwargsItems from mm_kwargs with the same modality together into the same MultiModalKwargs instance.

Parameters:

Name Type Description Default
mm_kwargs list[MultiModalKwargsItem]

List of MultiModalKwargsItem.

required
device Device

The device to place the grouped tensors on.

None
pin_memory bool

Whether to pin memory for faster host-to-device transfer.

False

Yields:

Type Description
Iterable[tuple[str, int, BatchedTensorInputs]]

A tuple (modality, num_items, grouped_kwargs).

Source code in vllm/multimodal/utils.py
def group_mm_kwargs_by_modality(
    mm_kwargs: list[MultiModalKwargsItem],
    *,
    device: torch.types.Device = None,
    pin_memory: bool = False,
    merge_by_field_config: bool = False,
) -> Iterable[tuple[str, int, BatchedTensorInputs]]:
    """Group consecutive `MultiModalKwargsItem`s from `mm_kwargs` with the same
    modality together into the same `MultiModalKwargs` instance.

    Args:
        mm_kwargs: List of `MultiModalKwargsItem`.
        device: The device to place the grouped tensors on.
        pin_memory: Whether to pin memory for faster host-to-device transfer.

    Yields:
        A tuple `(modality, num_items, grouped_kwargs)`.
    """
    from vllm.multimodal.inputs import MultiModalKwargs, MultiModalKwargsItems

    for modality, items in groupby(mm_kwargs, key=lambda item: item.modality):
        items_lst = list(items)

        # TODO: Enable `merge_by_field_config` for all models
        # to avoid creating an extra batch dimension (except for fields
        # that are meant to be stacked anyway).
        # We will also need to update each model to remove `flatten_bn`.
        if merge_by_field_config:
            mm_kwargs_group: BatchedTensorInputs = dict(
                MultiModalKwargsItems.from_seq(items_lst).get_data(
                    pin_memory=pin_memory))

            if device is not None:
                mm_kwargs_group = json_map_leaves(
                    lambda x: x.to(device=device),
                    mm_kwargs_group,
                )
        else:
            mm_kwargs_group = MultiModalKwargs.as_kwargs(
                MultiModalKwargs.batch(
                    [
                        MultiModalKwargsItems.from_seq([item]).get_data()
                        for item in items_lst
                    ],
                    pin_memory=pin_memory,
                ),
                device=device,
            )

        yield modality, len(items_lst), mm_kwargs_group