Skip to content

vllm.v1.kv_offload.worker.worker

TransferResult module-attribute

TransferResult = tuple[int, bool]

TransferSpec module-attribute

TransferType module-attribute

TransferType = tuple[str, str]

logger module-attribute

logger = init_logger(__name__)

OffloadingHandler

Bases: ABC

OffloadingHandler class for managing asynchronous KV data transfers

This class runs in the worker. It kicks off async KV data transfer requests, and allows collecting back completion statuses.

The class provides the following primitives

transfer_async() - kicks off a new transfer job get_finished() - returns a list of newly finished job IDs.

Source code in vllm/v1/kv_offload/worker/worker.py
class OffloadingHandler(ABC):
    """
    OffloadingHandler class for managing asynchronous KV data transfers

    This class runs in the worker.
    It kicks off async KV data transfer requests, and allows
    collecting back completion statuses.

    The class provides the following primitives:
        transfer_async() - kicks off a new transfer job
        get_finished() - returns a list of newly finished job IDs.
    """

    @abstractmethod
    def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
        """
        Initiates an asynchronous transfer of KV data.

        Args:
            job_id: a unique ID that will be used when notifying back on
                transfer completion.
            spec: the (src, dst) spec of the KV data transfer.

        Returns:
            True if transfer was submitted successfully.
        """
        pass

    @abstractmethod
    def get_finished(self) -> list[TransferResult]:
        """
        Get transfers finished since last call.

        Returns:
            A list of (job_id, success) of transfers.
        """
        pass

get_finished abstractmethod

get_finished() -> list[TransferResult]

Get transfers finished since last call.

Returns:

Type Description
list[TransferResult]

A list of (job_id, success) of transfers.

Source code in vllm/v1/kv_offload/worker/worker.py
@abstractmethod
def get_finished(self) -> list[TransferResult]:
    """
    Get transfers finished since last call.

    Returns:
        A list of (job_id, success) of transfers.
    """
    pass

transfer_async abstractmethod

transfer_async(job_id: int, spec: TransferSpec) -> bool

Initiates an asynchronous transfer of KV data.

Parameters:

Name Type Description Default
job_id int

a unique ID that will be used when notifying back on transfer completion.

required
spec TransferSpec

the (src, dst) spec of the KV data transfer.

required

Returns:

Type Description
bool

True if transfer was submitted successfully.

Source code in vllm/v1/kv_offload/worker/worker.py
@abstractmethod
def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
    """
    Initiates an asynchronous transfer of KV data.

    Args:
        job_id: a unique ID that will be used when notifying back on
            transfer completion.
        spec: the (src, dst) spec of the KV data transfer.

    Returns:
        True if transfer was submitted successfully.
    """
    pass

OffloadingWorker

OffloadingWorker class for managing asynchronous KV data transfers using multiple OffloadingHandlers

This class runs in the worker. It kicks off async KV data transfer requests, by delegating to one of its registered OffloadingHandlers, based on the transfer type.

The class provides the following primitives

register_handler() - registers a new handler to handle a specific transfer type transfer_async() - kicks off a new transfer job using one of the registered handlers. get_finished() - returns a list of newly finished job IDs from all handlers.

Source code in vllm/v1/kv_offload/worker/worker.py
class OffloadingWorker:
    """
    OffloadingWorker class for managing asynchronous KV data transfers
    using multiple OffloadingHandlers

    This class runs in the worker.
    It kicks off async KV data transfer requests, by delegating
    to one of its registered OffloadingHandlers, based on the transfer type.

    The class provides the following primitives:
        register_handler() - registers a new handler to handle
            a specific transfer type
        transfer_async() - kicks off a new transfer job
            using one of the registered handlers.
        get_finished() - returns a list of newly finished job IDs
            from all handlers.
    """

    def __init__(self):
        self.handlers: set[OffloadingHandler] = set()
        self.transfer_type_to_handler: dict[TransferType,
                                            OffloadingHandler] = {}

    def register_handler(self, src_cls: type[LoadStoreSpec],
                         dst_cls: type[LoadStoreSpec],
                         handler: OffloadingHandler) -> None:
        """
        Registers a new handler.

        Args:
            src_cls: the source type of transfers handled by this handler.
            dst_cls: the destination type of transfers handled by this handler.
            handler: the handler that will handle transfers.
        """
        transfer_type = (src_cls.medium(), dst_cls.medium())
        assert transfer_type not in self.transfer_type_to_handler
        self.handlers.add(handler)
        self.transfer_type_to_handler[transfer_type] = handler

    def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
        """
        Initiates an asynchronous transfer of KV data.

        Args:
            job_id: a unique ID that will be used when notifying back on
                transfer completion.
            spec: the (src, dst) spec of the KV data transfer.

        Returns:
            True if transfer was submitted successfully.
        """
        src, dst = spec
        transfer_type = (src.medium(), dst.medium())
        handler = self.transfer_type_to_handler.get(transfer_type)
        assert handler is not None

        try:
            success = handler.transfer_async(job_id, spec)
        except Exception as e:
            logger.warning("Exception in %r transfer %d: %r",
                           transfer_type,
                           job_id,
                           e,
                           exc_info=True)
            return False

        if not success:
            logger.warning("Failed to submit %r transfer %d", transfer_type,
                           job_id)
        else:
            logger.debug("Submitted %r transfer %d: %r", transfer_type, job_id,
                         spec)

        return success

    def get_finished(self) -> list[TransferResult]:
        """
        Get transfers finished since last call.

        Returns:
            A list of (job_id, success) of transfers.
        """
        finished = []
        for handler in self.handlers:
            finished.extend(handler.get_finished())
        return finished

handlers instance-attribute

handlers: set[OffloadingHandler] = set()

transfer_type_to_handler instance-attribute

transfer_type_to_handler: dict[
    TransferType, OffloadingHandler
] = {}

__init__

__init__()
Source code in vllm/v1/kv_offload/worker/worker.py
def __init__(self):
    self.handlers: set[OffloadingHandler] = set()
    self.transfer_type_to_handler: dict[TransferType,
                                        OffloadingHandler] = {}

get_finished

get_finished() -> list[TransferResult]

Get transfers finished since last call.

Returns:

Type Description
list[TransferResult]

A list of (job_id, success) of transfers.

Source code in vllm/v1/kv_offload/worker/worker.py
def get_finished(self) -> list[TransferResult]:
    """
    Get transfers finished since last call.

    Returns:
        A list of (job_id, success) of transfers.
    """
    finished = []
    for handler in self.handlers:
        finished.extend(handler.get_finished())
    return finished

register_handler

register_handler(
    src_cls: type[LoadStoreSpec],
    dst_cls: type[LoadStoreSpec],
    handler: OffloadingHandler,
) -> None

Registers a new handler.

Parameters:

Name Type Description Default
src_cls type[LoadStoreSpec]

the source type of transfers handled by this handler.

required
dst_cls type[LoadStoreSpec]

the destination type of transfers handled by this handler.

required
handler OffloadingHandler

the handler that will handle transfers.

required
Source code in vllm/v1/kv_offload/worker/worker.py
def register_handler(self, src_cls: type[LoadStoreSpec],
                     dst_cls: type[LoadStoreSpec],
                     handler: OffloadingHandler) -> None:
    """
    Registers a new handler.

    Args:
        src_cls: the source type of transfers handled by this handler.
        dst_cls: the destination type of transfers handled by this handler.
        handler: the handler that will handle transfers.
    """
    transfer_type = (src_cls.medium(), dst_cls.medium())
    assert transfer_type not in self.transfer_type_to_handler
    self.handlers.add(handler)
    self.transfer_type_to_handler[transfer_type] = handler

transfer_async

transfer_async(job_id: int, spec: TransferSpec) -> bool

Initiates an asynchronous transfer of KV data.

Parameters:

Name Type Description Default
job_id int

a unique ID that will be used when notifying back on transfer completion.

required
spec TransferSpec

the (src, dst) spec of the KV data transfer.

required

Returns:

Type Description
bool

True if transfer was submitted successfully.

Source code in vllm/v1/kv_offload/worker/worker.py
def transfer_async(self, job_id: int, spec: TransferSpec) -> bool:
    """
    Initiates an asynchronous transfer of KV data.

    Args:
        job_id: a unique ID that will be used when notifying back on
            transfer completion.
        spec: the (src, dst) spec of the KV data transfer.

    Returns:
        True if transfer was submitted successfully.
    """
    src, dst = spec
    transfer_type = (src.medium(), dst.medium())
    handler = self.transfer_type_to_handler.get(transfer_type)
    assert handler is not None

    try:
        success = handler.transfer_async(job_id, spec)
    except Exception as e:
        logger.warning("Exception in %r transfer %d: %r",
                       transfer_type,
                       job_id,
                       e,
                       exc_info=True)
        return False

    if not success:
        logger.warning("Failed to submit %r transfer %d", transfer_type,
                       job_id)
    else:
        logger.debug("Submitted %r transfer %d: %r", transfer_type, job_id,
                     spec)

    return success