Skip to content

vllm.v1.structured_output.utils

CACHE module-attribute

CACHE = None

logger module-attribute

logger = init_logger(__name__)

re_llama_byte_token module-attribute

re_llama_byte_token = compile('^<0x[0-9A-F]{2}>$')

re_replacement_seq module-attribute

re_replacement_seq = compile('^.{0,6}�+.{0,6}$')

OutlinesVocabulary

Wrapper class for outlines_core.Vocabulary, which allows us to store a hash with the vocabulary

Source code in vllm/v1/structured_output/utils.py
class OutlinesVocabulary:
    """
    Wrapper class for `outlines_core.Vocabulary`,
    which allows us to store a hash with the vocabulary
    """

    def __init__(self, vocabulary: oc.Vocabulary) -> None:
        # Actual vocabulary object
        self.inner = vocabulary
        # Have to do abs(hash()) because python hashes can
        # be negative, and we are using hash as a cache key.
        hex_str = hashlib.sha256(
            vocabulary.__repr__().encode('utf-8')).hexdigest()
        hash_int = int(hex_str, 16)
        self._hash = hash_int

_hash instance-attribute

_hash = hash_int

inner instance-attribute

inner = vocabulary

__init__

__init__(vocabulary: Vocabulary) -> None
Source code in vllm/v1/structured_output/utils.py
def __init__(self, vocabulary: oc.Vocabulary) -> None:
    # Actual vocabulary object
    self.inner = vocabulary
    # Have to do abs(hash()) because python hashes can
    # be negative, and we are using hash as a cache key.
    hex_str = hashlib.sha256(
        vocabulary.__repr__().encode('utf-8')).hexdigest()
    hash_int = int(hex_str, 16)
    self._hash = hash_int

_reduced_vocabulary

_reduced_vocabulary(
    tokenizer: AnyTokenizer, eos_token_id: int
) -> dict[bytes, list[int]]

Create a map from vocabulary tokens to lists of equivalent token ids.

Returns:

Type Description
dict[bytes, list[int]]

A Dict of token string -> equivalent token ids

Source code in vllm/v1/structured_output/utils.py
def _reduced_vocabulary(
    tokenizer: AnyTokenizer,
    eos_token_id: int,
) -> dict[bytes, list[int]]:
    """Create a map from vocabulary tokens to lists of equivalent token ids.

    Returns:
        A Dict of token string -> equivalent token ids
    """

    unicode_to_bytes = {
        v: k
        for k, v in tokenization_gpt2.bytes_to_unicode().items()
    }

    def convert_token_to_string(token: str) -> str:

        string = tokenizer.convert_tokens_to_string([token])

        # A hack to handle missing spaces to HF's Llama tokenizers
        if (type(token) is str
                and token.startswith(file_utils.SPIECE_UNDERLINE)
                or token == "<0x20>"):
            return " " + string

        return string

    vocabulary: dict[bytes, list[int]] = {}
    empty_token_ids: list[int] = []
    for token, token_idx in tokenizer.get_vocab().items():
        if token in tokenizer.all_special_tokens:  # type: ignore
            continue

        token_str = convert_token_to_string(token)
        if token_str:
            if isinstance(token, (bytes, bytearray)):
                # For BPE tokenizers where tokens are stored as bytes.

                # safe to ignore since token_str is of type (bytearray, bytes)
                # by this point.
                token_bytes = bytes(token_str)  # type: ignore[arg-type]

            elif "\ufffd" in token_str and not re_replacement_seq.match(
                    token_str):
                # Handle tokens with invalid UTF-8 sequences.
                if re_llama_byte_token.match(token):
                    # Llama-like tokenizers use <0xXX> for incomplete sequences.
                    token_bytes = bytes([int(token[3:5], 16)])
                else:
                    # GPT2 tokenizers: map each byte back using unicode_to_bytes
                    byte_vals = [unicode_to_bytes.get(c) for c in token]
                    if None in byte_vals:
                        raise RuntimeError(
                            f"Cannot convert token `{token}`"
                            f" ({token_idx}) to bytes: {token_str}")
                    # safe to ignore, since if None in byte_vals,
                    # an error is thrown.
                    token_bytes = bytes(byte_vals)  # type: ignore[arg-type]
            else:
                token_bytes = token_str.encode('utf-8')

            if token_idx != eos_token_id:
                vocabulary.setdefault(token_bytes, []).append(token_idx)
        else:
            empty_token_ids.append(token_idx)

    return vocabulary

apply_grammar_bitmask

apply_grammar_bitmask(
    scheduler_output: SchedulerOutput,
    input_batch: InputBatch,
    logits: Tensor,
    device: device,
) -> None

Apply grammar bitmask to output logits of the model with xgrammar function.

Parameters:

Name Type Description Default
scheduler_output SchedulerOutput

The result of engine scheduling.

required
input_batch InputBatch

The input of model runner.

required
logits Tensor

The output logits of model forward.

required
device device

The device that model runner running on.

required
Source code in vllm/v1/structured_output/utils.py
def apply_grammar_bitmask(
    scheduler_output: SchedulerOutput,
    input_batch: InputBatch,
    logits: torch.Tensor,
    device: torch.device,
) -> None:
    """
    Apply grammar bitmask to output logits of the model with xgrammar function.

    Args:
        scheduler_output (SchedulerOutput): The result of engine scheduling.
        input_batch (InputBatch): The input of model runner.
        logits (torch.Tensor): The output logits of model forward.
        device (torch.device): The device that model runner running on.
    """
    grammar_bitmask = scheduler_output.grammar_bitmask
    if grammar_bitmask is None:
        return

    # We receive the structured output bitmask from the scheduler,
    # compacted to contain bitmasks only for structured output requests.
    # The order of the requests in the bitmask is not guaranteed to be the
    # same as the order of the requests in the gpu runner's batch. We need
    # to sort the bitmask to match the order of the requests used here.

    # Get the batch indices of the structured output requests.
    # Keep track of the number of speculative tokens scheduled for every
    # request in the batch, as the logit indices are offset by this amount.
    struct_out_req_batch_indices: dict[str, int] = {}
    cumulative_offset = 0
    seq = sorted(input_batch.req_id_to_index.items(), key=lambda x: x[1])
    for req_id, batch_index in seq:
        logit_index = batch_index + cumulative_offset
        cumulative_offset += len(
            scheduler_output.scheduled_spec_decode_tokens.get(req_id, []))
        if req_id in scheduler_output.structured_output_request_ids:
            struct_out_req_batch_indices[req_id] = logit_index

    out_indices = []

    # Reorder the bitmask to match the order of the requests in the batch.
    sorted_bitmask = np.full(shape=(logits.shape[0], grammar_bitmask.shape[1]),
                             fill_value=-1,
                             dtype=grammar_bitmask.dtype)
    cumulative_index = 0
    seq = sorted(scheduler_output.structured_output_request_ids.items(),
                 key=lambda x: x[1])
    for req_id, _ in seq:
        num_spec_tokens = len(
            scheduler_output.scheduled_spec_decode_tokens.get(req_id, []))
        if req_id in struct_out_req_batch_indices:
            logit_index = struct_out_req_batch_indices[req_id]
            for i in range(1 + num_spec_tokens):
                sorted_bitmask[logit_index + i] = \
                    grammar_bitmask[cumulative_index + i]
                out_indices.append(logit_index + i)
        cumulative_index += 1 + num_spec_tokens
    grammar_bitmask = sorted_bitmask

    # If the length of out indices and the logits have the same shape
    # we don't need to pass indices to the kernel,
    # since the bitmask is already aligned with the logits.
    skip_out_indices = len(out_indices) == logits.shape[0]

    # Serialization of np.ndarray is much more efficient than a tensor,
    # so we receive it in that format.
    grammar_bitmask = torch.from_numpy(grammar_bitmask).contiguous()

    xgr.apply_token_bitmask_inplace(
        logits,
        grammar_bitmask.to(device, non_blocking=True),
        indices=out_indices if not skip_out_indices else None,
    )

choice_as_grammar

choice_as_grammar(choice: list[str]) -> str
Source code in vllm/v1/structured_output/utils.py
def choice_as_grammar(choice: list[str]) -> str:

    def escape_ebnf_string(s: str) -> str:
        """Escape special characters in a EBNF string."""
        # Escape double quotes and backslashes
        return re.sub(r'(["\\])', r'\\\1', s)

    escaped_choices = (escape_ebnf_string(c) for c in choice)
    grammar = ('root ::= ' + ' | '.join(f'"{c}"' for c in escaped_choices))
    return grammar

convert_lark_to_ebnf

convert_lark_to_ebnf(grammar_str: str) -> str

Convert a Lark grammar string to EBNF format.

EBNF reference: https://github.com/ggerganov/llama.cpp/blob/master/grammars/README.md Lark grammar reference: https://lark-parser.readthedocs.io/en/latest/grammar.html

Parameters:

Name Type Description Default
grammar_str str

Input grammar in Lark format

required

Returns:

Name Type Description
str str

Converted grammar in EBNF format

Examples:

>>> print(convert_lark_to_ebnf("rule: 'hello'"))
root ::= rule
rule ::= "hello"
Source code in vllm/v1/structured_output/utils.py
def convert_lark_to_ebnf(grammar_str: str) -> str:
    """
    Convert a Lark grammar string to EBNF format.

    EBNF reference:
    https://github.com/ggerganov/llama.cpp/blob/master/grammars/README.md
    Lark grammar reference:
    https://lark-parser.readthedocs.io/en/latest/grammar.html

    Args:
        grammar_str: Input grammar in Lark format

    Returns:
        str: Converted grammar in EBNF format

    Examples:
        >>> print(convert_lark_to_ebnf("rule: 'hello'"))
        root ::= rule
        rule ::= "hello"
    """
    if not isinstance(grammar_str, str):
        raise ValueError(f"Grammar must be a string, got {type(grammar_str)}")
    if not grammar_str.strip():
        raise ValueError("Grammar string cannot be empty")

    defined_rules = set()
    referenced_rules = set()
    output_lines = []

    def clean_line(line: str) -> str:
        """Remove comments and whitespace from line."""
        return re.sub(r'(#|//).*$', '', line).strip()

    def check_quotes(text: str, rule_name: str, line_num: int) -> None:
        """Validate quote matching in text."""
        if text.count("'") % 2 != 0 or text.count('"') % 2 != 0:
            raise ValueError(
                f"Mismatched quotes in {rule_name} on line {line_num}")

    def extract_references(text: str) -> set[str]:
        """Extract rule references from text."""
        # Remove quoted strings and special characters
        text = re.sub(r'"[^"]*"', '', text)
        text = re.sub(r'[+*?()|\[\]{}]', ' ', text)
        return set(re.findall(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b', text))

    # First pass: Find root rule and validate rule definitions
    lines = [clean_line(line) for line in grammar_str.split('\n')]
    first_rule = None

    for line_num, line in enumerate(lines, 1):
        if not line or line.startswith('|'):
            continue

        if ':' in line:
            try:
                name = line.split(':', 1)[0].strip().strip('?')
                defined_rules.add(name)
                if first_rule is None:
                    first_rule = name
                if name == 'start':
                    first_rule = 'start'
            except IndexError as e:
                raise ValueError(f"Invalid rule format on line {line_num}. "
                                 "Expected 'rule_name: definition'") from e

    if not defined_rules:
        raise ValueError("No valid rules found in grammar")

    # Add root rule
    output_lines.append(f"root ::= {first_rule}")

    # Second pass: Process rule definitions and alternatives
    current_rule = None
    current_definition = []

    for line_num, line in enumerate(lines, 1):
        if not line:
            continue

        try:
            if ':' in line and not line.startswith('|'):
                # Save previous rule if exists
                if current_rule:
                    output_lines.append(
                        f"{current_rule} ::= {' | '.join(current_definition)}")

                # Process new rule
                name, definition = line.split(':', 1)
                current_rule = name.strip().strip('?')

                check_quotes(definition, f"rule '{current_rule}'", line_num)
                definition = re.sub(r"'([^']*)'", r'"\1"', definition)
                referenced_rules.update(extract_references(definition))
                current_definition = [definition.strip()]

            elif line.startswith('|'):
                if not current_rule:
                    raise ValueError(f"Alternative '|' on line {line_num} "
                                     "without a preceding rule definition")

                alt_def = line[1:].strip()
                check_quotes(alt_def, f"alternative for rule '{current_rule}'",
                             line_num)
                alt_def = re.sub(r"'([^']*)'", r'"\1"', alt_def)
                referenced_rules.update(extract_references(alt_def))
                current_definition.append(alt_def)

        except ValueError as e:
            raise ValueError(f"Error on line {line_num}: {str(e)}") from e

    # Add final rule if exists
    if current_rule:
        output_lines.append(
            f"{current_rule} ::= {' | '.join(current_definition)}")

    # Validate all rules are defined
    undefined_rules = referenced_rules - defined_rules - {'root'}
    if undefined_rules:
        raise ValueError("Referenced rules are not defined: "
                         f"{', '.join(sorted(undefined_rules))}")

    return '\n'.join(output_lines)

get_outlines_cache

get_outlines_cache()

Get the Cache instance to be used for index caching

Source code in vllm/v1/structured_output/utils.py
def get_outlines_cache():
    """Get the Cache instance to be used for index caching"""

    cache_dir = get_outlines_cache_path()
    if envs.VLLM_V1_USE_OUTLINES_CACHE:
        logger.warning("Enabling outlines cache. This is an unbounded on-disk "
                       "cache. It may consume a lot of disk space and should "
                       "not be used with untrusted clients.")
        cache = Cache(cache_dir, eviction_policy="none", cull_limit=0)
        outlines_version = importlib.metadata.version("outlines_core")

        cached_version = cache.get('__version__', None)
        if cached_version != outlines_version:
            cache.clear()
        cache.set('__version__', outlines_version)
        return cache
    else:
        return LRUCache(maxsize=128)

get_outlines_cache_path

get_outlines_cache_path() -> str

Get the context object that contains previously-computed return values

Source code in vllm/v1/structured_output/utils.py
def get_outlines_cache_path() -> str:
    """Get the context object that contains previously-computed return values"""
    outlines_cache_dir = os.getenv("OUTLINES_CACHE_DIR")
    xdg_cache_home = os.getenv("XDG_CACHE_HOME")
    home_dir = os.path.expanduser("~")

    if outlines_cache_dir:
        # OUTLINES_CACHE_DIR takes precedence
        return outlines_cache_dir
    elif xdg_cache_home:
        return os.path.join(xdg_cache_home, ".cache", "outlines")
    # If homedir is "/", we may be inside a container, and thus writing to
    # root would be problematic, so we fall back to using a tempfile.
    # Also validate the path exists, since os.path.expanduser does
    # not guarantee existence.
    elif os.path.isdir(home_dir) and home_dir != "/":
        # Default Unix fallback: ~/.cache/outlines
        return os.path.join(home_dir, ".cache", "outlines")
    else:
        import tempfile

        # home_dir may be / inside a docker container without existing user
        tempdir = tempfile.gettempdir()
        return os.path.join(tempdir, ".cache", "outlines")

get_outlines_vocabulary

get_outlines_vocabulary(
    tokenizer: AnyTokenizer,
) -> Vocabulary

Get the Vocabulary object for a given tokenizer.

Source code in vllm/v1/structured_output/utils.py
def get_outlines_vocabulary(tokenizer: AnyTokenizer) -> oc.Vocabulary:
    """Get the `Vocabulary` object for a given tokenizer.
    """
    if hasattr(tokenizer, "_outlines_vocabulary"):
        return tokenizer._outlines_vocabulary  # type: ignore

    try:
        if hasattr(
                tokenizer,
                "eos_token_id",
        ) and tokenizer.eos_token_id is not None:
            eos_token_id = tokenizer.eos_token_id
        else:
            raise ValueError(
                f"Error during structured outputs setup for outlines: Tokenizer ({type(tokenizer)}) has no `eos_token_id` property, but `eos_token_id` is required for structured outputs to work properly."  # noqa: E501
            )

        reduced_vocab = _reduced_vocabulary(
            tokenizer,
            eos_token_id  #type: ignore
        )
        vocabulary = OutlinesVocabulary(
            oc.Vocabulary(eos_token_id, reduced_vocab))
        tokenizer._outlines_vocabulary = vocabulary  # type: ignore

        return vocabulary
    except AttributeError as e:
        raise ValueError(f"Cannot get the vocabulary of the tokenizer "
                         f"({type(tokenizer)}). The tokenizer should have a "
                         "get_vocab method.") from e

grammar_is_likely_lark

grammar_is_likely_lark(grammar_str: str) -> bool

Check if grammar appears to use Lark syntax.

Parameters:

Name Type Description Default
grammar_str str

Input grammar string

required

Returns:

Name Type Description
bool bool

True if grammar appears to be in Lark format, False otherwise

Examples:

>>> grammar_is_likely_lark("rule: 'abc'")
True
>>> grammar_is_likely_lark("rule ::= 'abc'")
False
Source code in vllm/v1/structured_output/utils.py
def grammar_is_likely_lark(grammar_str: str) -> bool:
    """
    Check if grammar appears to use Lark syntax.

    Args:
        grammar_str: Input grammar string

    Returns:
        bool: True if grammar appears to be in Lark format, False otherwise

    Examples:
        >>> grammar_is_likely_lark("rule: 'abc'")
        True
        >>> grammar_is_likely_lark("rule ::= 'abc'")
        False
    """
    if not grammar_str or not isinstance(grammar_str, str):
        return False

    for line in grammar_str.split('\n'):
        # Remove both comment styles
        line = re.sub(r'(#|//).*$', '', line).strip()
        if not line:
            continue

        # Look for EBNF rule definition
        if '::=' in line:
            return False

    return True