Skip to content

vllm.v1.kv_offload.lru_manager

LRUOffloadingManager

Bases: OffloadingManager

An OffloadingManager with a pluggable backend, which evicts blocks by LRU.

Source code in vllm/v1/kv_offload/lru_manager.py
class LRUOffloadingManager(OffloadingManager):
    """
    An OffloadingManager with a pluggable backend, which evicts blocks by LRU.
    """

    def __init__(self, backend: Backend, enable_events: bool = False):
        self.backend: Backend = backend
        # block_hash -> BlockStatus
        self.blocks: OrderedDict[BlockHash, BlockStatus] = OrderedDict()
        self.events: Optional[list[OffloadingEvent]] = \
            [] if enable_events else None

    def lookup(self, block_hashes: Iterable[BlockHash]) -> int:
        hit_count = 0
        for block_hash in block_hashes:
            block = self.blocks.get(block_hash)
            if block is None or not block.is_ready:
                break
            hit_count += 1
        return hit_count

    def prepare_load(self, block_hashes: Iterable[BlockHash]) -> LoadStoreSpec:
        blocks = []
        for block_hash in block_hashes:
            block = self.blocks[block_hash]
            assert block.is_ready
            block.ref_cnt += 1
            blocks.append(block)

        return self.backend.get_load_store_spec(block_hashes, blocks)

    def touch(self, block_hashes: Iterable[BlockHash]):
        for block_hash in reversed(list(block_hashes)):
            if self.blocks.get(block_hash):
                self.blocks.move_to_end(block_hash)

    def complete_load(self, block_hashes: Iterable[BlockHash]):
        for block_hash in block_hashes:
            block = self.blocks[block_hash]
            assert block.ref_cnt > 0
            block.ref_cnt -= 1

    def prepare_store(
            self,
            block_hashes: Iterable[BlockHash]) -> Optional[PrepareStoreOutput]:
        # filter out blocks that are already stored
        block_hashes_to_store = [
            block_hash for block_hash in block_hashes
            if block_hash not in self.blocks
        ]

        num_blocks_to_evict = (len(block_hashes_to_store) -
                               self.backend.get_num_free_blocks())

        # build list of blocks to evict
        to_evict = []
        if num_blocks_to_evict > 0:
            for block_hash, block in self.blocks.items():
                if block.ref_cnt == 0:
                    to_evict.append(block_hash)
                    num_blocks_to_evict -= 1
                    if num_blocks_to_evict == 0:
                        break
            else:
                # we could not evict enough blocks
                return None

        # evict blocks
        for block_hash in to_evict:
            self.backend.free(self.blocks.pop(block_hash))

        if to_evict and self.events is not None:
            self.events.append(
                OffloadingEvent(block_hashes=to_evict,
                                block_size=self.backend.block_size,
                                medium=self.backend.medium,
                                removed=True))

        blocks = self.backend.allocate_blocks(block_hashes_to_store)
        assert len(blocks) == len(block_hashes_to_store)

        for block_hash, block in zip(block_hashes_to_store, blocks):
            self.blocks[block_hash] = block

        # build store specs for allocated blocks
        store_spec = self.backend.get_load_store_spec(block_hashes_to_store,
                                                      blocks)

        return PrepareStoreOutput(block_hashes_to_store=block_hashes_to_store,
                                  store_spec=store_spec,
                                  block_hashes_evicted=to_evict)

    def complete_store(self,
                       block_hashes: Iterable[BlockHash],
                       success: bool = True):
        stored_block_hashes: list[BlockHash] = []
        if success:
            for block_hash in block_hashes:
                block = self.blocks[block_hash]
                if not block.is_ready:
                    block.ref_cnt = 0
                    stored_block_hashes.append(block_hash)
        else:
            for block_hash in block_hashes:
                block = self.blocks[block_hash]
                if not block.is_ready:
                    self.backend.free(block)
                    del self.blocks[block_hash]

        if stored_block_hashes and self.events is not None:
            self.events.append(
                OffloadingEvent(block_hashes=stored_block_hashes,
                                block_size=self.backend.block_size,
                                medium=self.backend.medium,
                                removed=False))

    def take_events(self) -> Iterable[OffloadingEvent]:
        if self.events is not None:
            yield from self.events
            self.events.clear()

backend instance-attribute

backend: Backend = backend

blocks instance-attribute

events instance-attribute

events: Optional[list[OffloadingEvent]] = (
    [] if enable_events else None
)

__init__

__init__(backend: Backend, enable_events: bool = False)
Source code in vllm/v1/kv_offload/lru_manager.py
def __init__(self, backend: Backend, enable_events: bool = False):
    self.backend: Backend = backend
    # block_hash -> BlockStatus
    self.blocks: OrderedDict[BlockHash, BlockStatus] = OrderedDict()
    self.events: Optional[list[OffloadingEvent]] = \
        [] if enable_events else None

complete_load

complete_load(block_hashes: Iterable[BlockHash])
Source code in vllm/v1/kv_offload/lru_manager.py
def complete_load(self, block_hashes: Iterable[BlockHash]):
    for block_hash in block_hashes:
        block = self.blocks[block_hash]
        assert block.ref_cnt > 0
        block.ref_cnt -= 1

complete_store

complete_store(
    block_hashes: Iterable[BlockHash], success: bool = True
)
Source code in vllm/v1/kv_offload/lru_manager.py
def complete_store(self,
                   block_hashes: Iterable[BlockHash],
                   success: bool = True):
    stored_block_hashes: list[BlockHash] = []
    if success:
        for block_hash in block_hashes:
            block = self.blocks[block_hash]
            if not block.is_ready:
                block.ref_cnt = 0
                stored_block_hashes.append(block_hash)
    else:
        for block_hash in block_hashes:
            block = self.blocks[block_hash]
            if not block.is_ready:
                self.backend.free(block)
                del self.blocks[block_hash]

    if stored_block_hashes and self.events is not None:
        self.events.append(
            OffloadingEvent(block_hashes=stored_block_hashes,
                            block_size=self.backend.block_size,
                            medium=self.backend.medium,
                            removed=False))

lookup

lookup(block_hashes: Iterable[BlockHash]) -> int
Source code in vllm/v1/kv_offload/lru_manager.py
def lookup(self, block_hashes: Iterable[BlockHash]) -> int:
    hit_count = 0
    for block_hash in block_hashes:
        block = self.blocks.get(block_hash)
        if block is None or not block.is_ready:
            break
        hit_count += 1
    return hit_count

prepare_load

prepare_load(
    block_hashes: Iterable[BlockHash],
) -> LoadStoreSpec
Source code in vllm/v1/kv_offload/lru_manager.py
def prepare_load(self, block_hashes: Iterable[BlockHash]) -> LoadStoreSpec:
    blocks = []
    for block_hash in block_hashes:
        block = self.blocks[block_hash]
        assert block.is_ready
        block.ref_cnt += 1
        blocks.append(block)

    return self.backend.get_load_store_spec(block_hashes, blocks)

prepare_store

prepare_store(
    block_hashes: Iterable[BlockHash],
) -> Optional[PrepareStoreOutput]
Source code in vllm/v1/kv_offload/lru_manager.py
def prepare_store(
        self,
        block_hashes: Iterable[BlockHash]) -> Optional[PrepareStoreOutput]:
    # filter out blocks that are already stored
    block_hashes_to_store = [
        block_hash for block_hash in block_hashes
        if block_hash not in self.blocks
    ]

    num_blocks_to_evict = (len(block_hashes_to_store) -
                           self.backend.get_num_free_blocks())

    # build list of blocks to evict
    to_evict = []
    if num_blocks_to_evict > 0:
        for block_hash, block in self.blocks.items():
            if block.ref_cnt == 0:
                to_evict.append(block_hash)
                num_blocks_to_evict -= 1
                if num_blocks_to_evict == 0:
                    break
        else:
            # we could not evict enough blocks
            return None

    # evict blocks
    for block_hash in to_evict:
        self.backend.free(self.blocks.pop(block_hash))

    if to_evict and self.events is not None:
        self.events.append(
            OffloadingEvent(block_hashes=to_evict,
                            block_size=self.backend.block_size,
                            medium=self.backend.medium,
                            removed=True))

    blocks = self.backend.allocate_blocks(block_hashes_to_store)
    assert len(blocks) == len(block_hashes_to_store)

    for block_hash, block in zip(block_hashes_to_store, blocks):
        self.blocks[block_hash] = block

    # build store specs for allocated blocks
    store_spec = self.backend.get_load_store_spec(block_hashes_to_store,
                                                  blocks)

    return PrepareStoreOutput(block_hashes_to_store=block_hashes_to_store,
                              store_spec=store_spec,
                              block_hashes_evicted=to_evict)

take_events

take_events() -> Iterable[OffloadingEvent]
Source code in vllm/v1/kv_offload/lru_manager.py
def take_events(self) -> Iterable[OffloadingEvent]:
    if self.events is not None:
        yield from self.events
        self.events.clear()

touch

touch(block_hashes: Iterable[BlockHash])
Source code in vllm/v1/kv_offload/lru_manager.py
def touch(self, block_hashes: Iterable[BlockHash]):
    for block_hash in reversed(list(block_hashes)):
        if self.blocks.get(block_hash):
            self.blocks.move_to_end(block_hash)