Skip to content

llm

AnyscaleLLM

Bases: OpenAILLM

Source code in src/distilabel/llm/anyscale.py
class AnyscaleLLM(OpenAILLM):
    def __init__(
        self,
        task: "Task",
        model: str,
        client: Union["OpenAI", None] = None,
        api_key: Union[str, None] = None,
        max_new_tokens: int = 128,
        frequency_penalty: float = 0.0,
        presence_penalty: float = 0.0,
        temperature: float = 1.0,
        top_p: float = 1.0,
        num_threads: Union[int, None] = None,
        prompt_format: Union["SupportedFormats", None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """Initializes the AnyscaleLLM class.

        Args:
            task (Task): the task to be performed by the LLM.
            model (str, optional): the model to be used for generation.
            client (Union[OpenAI, None], optional): an OpenAI client to be used for generation.
                If `None`, a new client will be created. Defaults to `None`.
            api_key (Union[str, None], optional): the Anyscale API key to be used for generation.
                If `None`, the `ANYSCALE_API_KEY` environment variable will be used. Defaults to `None`.
                Visit "https://docs.endpoints.anyscale.com/guides/authenticate/" for more information.
            max_new_tokens (int, optional): the maximum number of tokens to be generated.
                Defaults to 128.
            frequency_penalty (float, optional): the frequency penalty to be used for generation.
                Defaults to 0.0.
            presence_penalty (float, optional): the presence penalty to be used for generation.
                Defaults to 0.0.
            temperature (float, optional): the temperature to be used for generation.
                Defaults to 1.0.
            top_p (float, optional): the top-p value to be used for generation.
                Defaults to 1.0.
            num_threads (Union[int, None], optional): the number of threads to be used
                for parallel generation. If `None`, no parallel generation will be performed.
                Defaults to `None`.
            prompt_format (Union[SupportedFormats, None], optional): the format to be used
                for the prompt. If `None`, the default format of the task will be used, available
                formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
                but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
                will be used if no `prompt_formatting_fn` is provided.
            prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
                applied to the prompt before generation. If `None`, no formatting will be applied.
                Defaults to `None`.

        Raises:
            AssertionError: if the provided `model` is not available in your OpenAI account.

        Examples:
            >>> import os
            >>> from distilabel.tasks import TextGenerationTask
            >>> from distilabel.llm import AnyscaleLLM
            >>> llm = AnyscaleLLM(model="HuggingFaceH4/zephyr-7b-beta", task=TextGenerationTask(), openai_api_key=os.getenv("ANYSCALE_API_KEY", None))
            >>> llm.generate([{"input": "What's the capital of Spain?"}])
        """
        LLM.__init__(
            self,
            task=task,
            num_threads=num_threads,
            prompt_format=prompt_format,
            prompt_formatting_fn=prompt_formatting_fn,
        )

        if not _OPENAI_AVAILABLE:
            raise ImportError(
                "`AnyscaleLLM` cannot be used as `openai` is not installed, please "
                " install it with `pip install openai`."
            )

        self.max_tokens = max_new_tokens
        self.frequency_penalty = frequency_penalty
        self.presence_penalty = presence_penalty
        self.temperature = temperature
        self.top_p = top_p

        self.client = client or OpenAI(
            api_key=api_key or os.getenv("ANYSCALE_API_KEY"),
            max_retries=6,
            base_url="https://api.endpoints.anyscale.com/v1",
        )

        assert (
            model in self.available_models
        ), f"Provided `model` is not available in your Anyscale account, available models are {self.available_models}"
        self.model = model

__init__(task, model, client=None, api_key=None, max_new_tokens=128, frequency_penalty=0.0, presence_penalty=0.0, temperature=1.0, top_p=1.0, num_threads=None, prompt_format=None, prompt_formatting_fn=None)

Initializes the AnyscaleLLM class.

Parameters:

Name Type Description Default
task Task

the task to be performed by the LLM.

required
model str

the model to be used for generation.

required
client Union[OpenAI, None]

an OpenAI client to be used for generation. If None, a new client will be created. Defaults to None.

None
api_key Union[str, None]

the Anyscale API key to be used for generation. If None, the ANYSCALE_API_KEY environment variable will be used. Defaults to None. Visit "https://docs.endpoints.anyscale.com/guides/authenticate/" for more information.

None
max_new_tokens int

the maximum number of tokens to be generated. Defaults to 128.

128
frequency_penalty float

the frequency penalty to be used for generation. Defaults to 0.0.

0.0
presence_penalty float

the presence penalty to be used for generation. Defaults to 0.0.

0.0
temperature float

the temperature to be used for generation. Defaults to 1.0.

1.0
top_p float

the top-p value to be used for generation. Defaults to 1.0.

1.0
num_threads Union[int, None]

the number of threads to be used for parallel generation. If None, no parallel generation will be performed. Defaults to None.

None
prompt_format Union[SupportedFormats, None]

the format to be used for the prompt. If None, the default format of the task will be used, available formats are openai, chatml, llama2, zephyr, and default. Defaults to None, but default (concatenation of system_prompt and formatted_prompt with a line-break) will be used if no prompt_formatting_fn is provided.

None
prompt_formatting_fn Union[Callable[..., str], None]

a function to be applied to the prompt before generation. If None, no formatting will be applied. Defaults to None.

None

Raises:

Type Description
AssertionError

if the provided model is not available in your OpenAI account.

Examples:

>>> import os
>>> from distilabel.tasks import TextGenerationTask
>>> from distilabel.llm import AnyscaleLLM
>>> llm = AnyscaleLLM(model="HuggingFaceH4/zephyr-7b-beta", task=TextGenerationTask(), openai_api_key=os.getenv("ANYSCALE_API_KEY", None))
>>> llm.generate([{"input": "What's the capital of Spain?"}])
Source code in src/distilabel/llm/anyscale.py
def __init__(
    self,
    task: "Task",
    model: str,
    client: Union["OpenAI", None] = None,
    api_key: Union[str, None] = None,
    max_new_tokens: int = 128,
    frequency_penalty: float = 0.0,
    presence_penalty: float = 0.0,
    temperature: float = 1.0,
    top_p: float = 1.0,
    num_threads: Union[int, None] = None,
    prompt_format: Union["SupportedFormats", None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """Initializes the AnyscaleLLM class.

    Args:
        task (Task): the task to be performed by the LLM.
        model (str, optional): the model to be used for generation.
        client (Union[OpenAI, None], optional): an OpenAI client to be used for generation.
            If `None`, a new client will be created. Defaults to `None`.
        api_key (Union[str, None], optional): the Anyscale API key to be used for generation.
            If `None`, the `ANYSCALE_API_KEY` environment variable will be used. Defaults to `None`.
            Visit "https://docs.endpoints.anyscale.com/guides/authenticate/" for more information.
        max_new_tokens (int, optional): the maximum number of tokens to be generated.
            Defaults to 128.
        frequency_penalty (float, optional): the frequency penalty to be used for generation.
            Defaults to 0.0.
        presence_penalty (float, optional): the presence penalty to be used for generation.
            Defaults to 0.0.
        temperature (float, optional): the temperature to be used for generation.
            Defaults to 1.0.
        top_p (float, optional): the top-p value to be used for generation.
            Defaults to 1.0.
        num_threads (Union[int, None], optional): the number of threads to be used
            for parallel generation. If `None`, no parallel generation will be performed.
            Defaults to `None`.
        prompt_format (Union[SupportedFormats, None], optional): the format to be used
            for the prompt. If `None`, the default format of the task will be used, available
            formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
            but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
            will be used if no `prompt_formatting_fn` is provided.
        prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
            applied to the prompt before generation. If `None`, no formatting will be applied.
            Defaults to `None`.

    Raises:
        AssertionError: if the provided `model` is not available in your OpenAI account.

    Examples:
        >>> import os
        >>> from distilabel.tasks import TextGenerationTask
        >>> from distilabel.llm import AnyscaleLLM
        >>> llm = AnyscaleLLM(model="HuggingFaceH4/zephyr-7b-beta", task=TextGenerationTask(), openai_api_key=os.getenv("ANYSCALE_API_KEY", None))
        >>> llm.generate([{"input": "What's the capital of Spain?"}])
    """
    LLM.__init__(
        self,
        task=task,
        num_threads=num_threads,
        prompt_format=prompt_format,
        prompt_formatting_fn=prompt_formatting_fn,
    )

    if not _OPENAI_AVAILABLE:
        raise ImportError(
            "`AnyscaleLLM` cannot be used as `openai` is not installed, please "
            " install it with `pip install openai`."
        )

    self.max_tokens = max_new_tokens
    self.frequency_penalty = frequency_penalty
    self.presence_penalty = presence_penalty
    self.temperature = temperature
    self.top_p = top_p

    self.client = client or OpenAI(
        api_key=api_key or os.getenv("ANYSCALE_API_KEY"),
        max_retries=6,
        base_url="https://api.endpoints.anyscale.com/v1",
    )

    assert (
        model in self.available_models
    ), f"Provided `model` is not available in your Anyscale account, available models are {self.available_models}"
    self.model = model

InferenceEndpointsLLM

Bases: LLM

Source code in src/distilabel/llm/huggingface/inference_endpoints.py
class InferenceEndpointsLLM(LLM):
    def __init__(
        self,
        endpoint_name_or_model_id: str,
        task: "Task",
        endpoint_namespace: Union[str, None] = None,
        token: Union[str, None] = None,
        max_new_tokens: int = 128,
        repetition_penalty: Union[float, None] = None,
        seed: Union[int, None] = None,
        do_sample: bool = False,
        temperature: Union[float, None] = None,
        top_k: Union[int, None] = None,
        top_p: Union[float, None] = None,
        typical_p: Union[float, None] = None,
        stop_sequences: Union[List[str], None] = None,
        num_threads: Union[int, None] = None,
        prompt_format: Union["SupportedFormats", None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """Initializes the InferenceEndpointsLLM class.

        Args:
            endpoint_name_or_model_id (str): The name of the endpoint or a Hugging Face Model Id.
            task (Task): The task to be performed by the LLM.
            endpoint_namespace (Union[str, None]): The namespace of the endpoint. Defaults to None.
            token (Union[str, None]): The token for the endpoint. Defaults to None.
            max_new_tokens (int): The maximum number of tokens to be generated. Defaults to 128.
            repetition_penalty (Union[float, None]): The repetition penalty to be used for generation. Defaults to None.
            seed (Union[int, None]): The seed for generation. Defaults to None.
            do_sample (bool): Whether to do sampling. Defaults to False.
            temperature (Union[float, None]): The temperature for generation. Defaults to None.
            top_k (Union[int, None]): The top_k for generation. Defaults to None.
            top_p (Union[float, None]): The top_p for generation. Defaults to None.
            typical_p (Union[float, None]): The typical_p for generation. Defaults to None.
            stop_sequences (Union[List[str], None]): The stop sequences for generation. Defaults to None.
            num_threads (Union[int, None]): The number of threads. Defaults to None.
            prompt_format (Union["SupportedFormats", None]): The format of the prompt. Defaults to None.
            prompt_formatting_fn (Union[Callable[..., str], None]): The function for formatting the prompt. Defaults to None.

        Examples:
            >>> # Inference Endpoint example
            >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
            >>> from distilabel.llm import InferenceEndpointsLLM
            >>> task = Task()
            >>> llm = InferenceEndpointsLLM(
            ...     endpoint_name_or_model_id="<MODEL_ID_OR_INFERENCE_ENDPOINT>",
            ...     task=task,
            ... )
        """
        super().__init__(
            task=task,
            num_threads=num_threads,
            prompt_format=prompt_format,
            prompt_formatting_fn=prompt_formatting_fn,
        )

        if not _HUGGINGFACE_HUB_AVAILABLE:
            raise ImportError(
                "`InferenceEndpointsLLM` cannot be used as `huggingface-hub` is not "
                "installed, please install it with `pip install huggingface-hub`."
            )

        self.do_sample = do_sample
        self.max_new_tokens = max_new_tokens
        self.repetition_penalty = repetition_penalty
        self.seed = seed
        self.temperature = temperature
        self.top_k = top_k
        self.top_p = top_p
        self.typical_p = typical_p
        self.stop_sequences = stop_sequences

        if is_serverless_endpoint_available(model_id=endpoint_name_or_model_id):
            logger.info("Using Serverless Inference Endpoint")
            self.client = InferenceClient(model=endpoint_name_or_model_id, token=token)
            self._model_name = endpoint_name_or_model_id
        else:
            logger.info("Using Dedicated Inference Endpoint")
            inference_endpoint = get_inference_endpoint(
                name=endpoint_name_or_model_id,
                namespace=endpoint_namespace,
                token=token,
            )
            if inference_endpoint.status in ["paused", "scaledToZero"]:
                logger.info("Waiting for Inference Endpoint to be ready...")
                inference_endpoint.resume().wait(timeout=30)

            self.client = inference_endpoint.client
            self._model_name = inference_endpoint.repository

    def __rich_repr__(self) -> Generator[Any, None, None]:
        yield from super().__rich_repr__()
        yield (
            "parameters",
            {
                "do_sample": self.do_sample,
                "max_new_tokens": self.max_new_tokens,
                "repetition_penalty": self.repetition_penalty,
                "seed": self.seed,
                "temperature": self.temperature,
                "top_k": self.top_k,
                "top_p": self.top_p,
                "typical_p": self.typical_p,
                "stop_sequences": self.stop_sequences,
            },
        )

    @property
    def model_name(self) -> str:
        """Returns the model name of the endpoint."""
        return self._model_name

    @retry(
        retry=retry_if_exception_type(_INFERENCE_ENDPOINTS_API_RETRY_ON_EXCEPTIONS),
        stop=stop_after_attempt(_INFERENCE_ENDPOINTS_API_STOP_AFTER_ATTEMPT),
        wait=wait_random_exponential(
            multiplier=_INFERENCE_ENDPOINTS_API_WAIT_RANDOM_EXPONENTIAL_MULTIPLIER,
            max=_INFERENCE_ENDPOINTS_API_WAIT_RANDOM_EXPONENTIAL_MAX,
        ),
        before_sleep=before_sleep_log(logger, logging.INFO),
        after=after_log(logger, logging.INFO),
    )
    def _text_generation_with_backoff(self, **kwargs: Any) -> Any:
        """Performs text generation with backoff in case of an error."""
        return self.client.text_generation(**kwargs)  # type: ignore

    def _generate(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List[LLMOutput]]:
        """Generates `num_generations` for each input in `inputs`.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            num_generations (int, optional): the number of generations to be performed for each
                input. Defaults to 1.

        Returns:
            List[List[LLMOutput]]: the outputs of the LLM.
        """
        prompts = self._generate_prompts(inputs, default_format=None)
        outputs = []
        for prompt in prompts:
            raw_responses = [
                self._text_generation_with_backoff(
                    prompt=prompt,
                    do_sample=self.do_sample,
                    max_new_tokens=self.max_new_tokens,
                    repetition_penalty=self.repetition_penalty,
                    seed=self.seed,
                    temperature=self.temperature,
                    top_k=self.top_k,
                    top_p=self.top_p,
                    typical_p=self.typical_p,
                    stop_sequences=self.stop_sequences,
                )
                for _ in range(num_generations)
            ]
            output = []
            for raw_response in raw_responses:
                try:
                    parsed_response = self.task.parse_output(raw_response)
                except Exception as e:
                    logger.error(f"Error parsing Inference Endpoints output: {e}")
                    parsed_response = None
                output.append(
                    LLMOutput(
                        model_name=self.model_name,
                        prompt_used=prompt,
                        raw_output=raw_response,
                        parsed_output=parsed_response,
                    )
                )
            outputs.append(output)
        return outputs

model_name: str property

Returns the model name of the endpoint.

__init__(endpoint_name_or_model_id, task, endpoint_namespace=None, token=None, max_new_tokens=128, repetition_penalty=None, seed=None, do_sample=False, temperature=None, top_k=None, top_p=None, typical_p=None, stop_sequences=None, num_threads=None, prompt_format=None, prompt_formatting_fn=None)

Initializes the InferenceEndpointsLLM class.

Parameters:

Name Type Description Default
endpoint_name_or_model_id str

The name of the endpoint or a Hugging Face Model Id.

required
task Task

The task to be performed by the LLM.

required
endpoint_namespace Union[str, None]

The namespace of the endpoint. Defaults to None.

None
token Union[str, None]

The token for the endpoint. Defaults to None.

None
max_new_tokens int

The maximum number of tokens to be generated. Defaults to 128.

128
repetition_penalty Union[float, None]

The repetition penalty to be used for generation. Defaults to None.

None
seed Union[int, None]

The seed for generation. Defaults to None.

None
do_sample bool

Whether to do sampling. Defaults to False.

False
temperature Union[float, None]

The temperature for generation. Defaults to None.

None
top_k Union[int, None]

The top_k for generation. Defaults to None.

None
top_p Union[float, None]

The top_p for generation. Defaults to None.

None
typical_p Union[float, None]

The typical_p for generation. Defaults to None.

None
stop_sequences Union[List[str], None]

The stop sequences for generation. Defaults to None.

None
num_threads Union[int, None]

The number of threads. Defaults to None.

None
prompt_format Union[SupportedFormats, None]

The format of the prompt. Defaults to None.

None
prompt_formatting_fn Union[Callable[..., str], None]

The function for formatting the prompt. Defaults to None.

None

Examples:

>>> # Inference Endpoint example
>>> from distilabel.tasks.text_generation import TextGenerationTask as Task
>>> from distilabel.llm import InferenceEndpointsLLM
>>> task = Task()
>>> llm = InferenceEndpointsLLM(
...     endpoint_name_or_model_id="<MODEL_ID_OR_INFERENCE_ENDPOINT>",
...     task=task,
... )
Source code in src/distilabel/llm/huggingface/inference_endpoints.py
def __init__(
    self,
    endpoint_name_or_model_id: str,
    task: "Task",
    endpoint_namespace: Union[str, None] = None,
    token: Union[str, None] = None,
    max_new_tokens: int = 128,
    repetition_penalty: Union[float, None] = None,
    seed: Union[int, None] = None,
    do_sample: bool = False,
    temperature: Union[float, None] = None,
    top_k: Union[int, None] = None,
    top_p: Union[float, None] = None,
    typical_p: Union[float, None] = None,
    stop_sequences: Union[List[str], None] = None,
    num_threads: Union[int, None] = None,
    prompt_format: Union["SupportedFormats", None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """Initializes the InferenceEndpointsLLM class.

    Args:
        endpoint_name_or_model_id (str): The name of the endpoint or a Hugging Face Model Id.
        task (Task): The task to be performed by the LLM.
        endpoint_namespace (Union[str, None]): The namespace of the endpoint. Defaults to None.
        token (Union[str, None]): The token for the endpoint. Defaults to None.
        max_new_tokens (int): The maximum number of tokens to be generated. Defaults to 128.
        repetition_penalty (Union[float, None]): The repetition penalty to be used for generation. Defaults to None.
        seed (Union[int, None]): The seed for generation. Defaults to None.
        do_sample (bool): Whether to do sampling. Defaults to False.
        temperature (Union[float, None]): The temperature for generation. Defaults to None.
        top_k (Union[int, None]): The top_k for generation. Defaults to None.
        top_p (Union[float, None]): The top_p for generation. Defaults to None.
        typical_p (Union[float, None]): The typical_p for generation. Defaults to None.
        stop_sequences (Union[List[str], None]): The stop sequences for generation. Defaults to None.
        num_threads (Union[int, None]): The number of threads. Defaults to None.
        prompt_format (Union["SupportedFormats", None]): The format of the prompt. Defaults to None.
        prompt_formatting_fn (Union[Callable[..., str], None]): The function for formatting the prompt. Defaults to None.

    Examples:
        >>> # Inference Endpoint example
        >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
        >>> from distilabel.llm import InferenceEndpointsLLM
        >>> task = Task()
        >>> llm = InferenceEndpointsLLM(
        ...     endpoint_name_or_model_id="<MODEL_ID_OR_INFERENCE_ENDPOINT>",
        ...     task=task,
        ... )
    """
    super().__init__(
        task=task,
        num_threads=num_threads,
        prompt_format=prompt_format,
        prompt_formatting_fn=prompt_formatting_fn,
    )

    if not _HUGGINGFACE_HUB_AVAILABLE:
        raise ImportError(
            "`InferenceEndpointsLLM` cannot be used as `huggingface-hub` is not "
            "installed, please install it with `pip install huggingface-hub`."
        )

    self.do_sample = do_sample
    self.max_new_tokens = max_new_tokens
    self.repetition_penalty = repetition_penalty
    self.seed = seed
    self.temperature = temperature
    self.top_k = top_k
    self.top_p = top_p
    self.typical_p = typical_p
    self.stop_sequences = stop_sequences

    if is_serverless_endpoint_available(model_id=endpoint_name_or_model_id):
        logger.info("Using Serverless Inference Endpoint")
        self.client = InferenceClient(model=endpoint_name_or_model_id, token=token)
        self._model_name = endpoint_name_or_model_id
    else:
        logger.info("Using Dedicated Inference Endpoint")
        inference_endpoint = get_inference_endpoint(
            name=endpoint_name_or_model_id,
            namespace=endpoint_namespace,
            token=token,
        )
        if inference_endpoint.status in ["paused", "scaledToZero"]:
            logger.info("Waiting for Inference Endpoint to be ready...")
            inference_endpoint.resume().wait(timeout=30)

        self.client = inference_endpoint.client
        self._model_name = inference_endpoint.repository

LLM

Bases: ABC

Source code in src/distilabel/llm/base.py
class LLM(ABC):
    def __init__(
        self,
        task: Task,
        num_threads: Union[int, None] = None,
        prompt_format: Union["SupportedFormats", None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """Initializes the LLM base class.

        Note:
            This class is intended to be used internally, but you anyone can still create
            a subclass, implement the `abstractmethod`s and use it.

        Args:
            task (Task): the task to be performed by the LLM.
            num_threads (Union[int, None], optional): the number of threads to be used
                for parallel generation. If `None`, no parallel generation will be performed.
                Defaults to `None`.
            prompt_format (Union["SupportedFormats", None], optional): the format to be used
                for the prompt. If `None`, the default format of the task will be used, available
                formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
                but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
                will be used if no `prompt_formatting_fn` is provided.
            prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
                applied to the prompt before generation. If `None`, no formatting will be applied.
                Defaults to `None`.
        """
        self.task = task

        self.thread_pool_executor = (
            ThreadPoolExecutor(max_workers=num_threads)
            if num_threads is not None
            else None
        )

        self.prompt_format = prompt_format
        self.prompt_formatting_fn = prompt_formatting_fn

    def __del__(self) -> None:
        """Shuts down the thread pool executor if it is not `None`."""
        if self.thread_pool_executor is not None:
            self.thread_pool_executor.shutdown()

    @property
    def num_threads(self) -> Union[int, None]:
        if self.thread_pool_executor:
            return self.thread_pool_executor._max_workers

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(task={self.task.__class__.__name__}, num_threads={self.num_threads}, promp_format='{self.prompt_format}', model='{self.model_name}')"

    def __rich_repr__(self) -> Generator[Any, None, None]:
        yield "task", self.task
        yield "num_threads", self.num_threads
        yield "prompt_format", self.prompt_format
        if self.prompt_formatting_fn is not None:
            args = f"({', '.join(self.prompt_formatting_fn.__code__.co_varnames)})"
            representation = self.prompt_formatting_fn.__name__ + args
            yield "prompt_formatting_fn", representation
        yield "model", self.model_name

    @property
    @abstractmethod
    def model_name(self) -> str:
        pass

    def _generate_prompts(
        self,
        inputs: List[Dict[str, Any]],
        default_format: Union["SupportedFormats", None] = None,
    ) -> List[Any]:
        """Generates the prompts to be used for generation.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            default_format (Union["SupportedFormats", None], optional): the default format to be used
                for the prompt if no `prompt_format` is specified. Defaults to `None`.

        Returns:
            List[Any]: the generated prompts.

        Raises:
            ValueError: if the generated prompt is not of the expected type.
        """
        prompts = []
        for input in inputs:
            prompt = self.task.generate_prompt(**input)
            if not isinstance(prompt, Prompt) and self.prompt_formatting_fn is not None:
                warnings.warn(
                    "The method `generate_prompt` is not returning a `Prompt` class but a prompt"
                    f" of `type={type(prompt)}`, meaning that a pre-formatting has already been"
                    " applied in the `task.generate_prompt` method, so the usage of a `prompt_formatting_fn`"
                    " is discouraged.",
                    UserWarning,
                    stacklevel=2,
                )
                prompt = self.prompt_formatting_fn(prompt)
            elif isinstance(prompt, Prompt) and self.prompt_formatting_fn is None:
                if self.prompt_format is not None or default_format is not None:
                    prompt = prompt.format_as(
                        format=self.prompt_format or default_format  # type: ignore
                    )
                else:
                    warnings.warn(
                        "No `prompt_format` has been specified and no `default_format` is set, so"
                        " the prompt will be concatenated with a line-break and no specific formatting"
                        " by default.",
                        UserWarning,
                        stacklevel=2,
                    )
                    prompt = prompt.format_as(format="default")
            prompts.append(prompt)
        return prompts

    @abstractmethod
    def _generate(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List["LLMOutput"]]:
        pass

    def _get_valid_inputs(
        self, inputs: List[Dict[str, Any]]
    ) -> Tuple[List[Dict[str, Any]], List[int]]:
        """Returns the valid inputs and the indices of the invalid inputs.

        A valid input is an input that contains all the arguments required by the task.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.

        Returns:
            Tuple[List[Dict[str, Any]], List[int]]: a tuple containing the valid inputs and
                the indices of the invalid inputs.
        """

        valid_inputs = []
        not_valid_inputs_indices = []
        for i, input in enumerate(inputs):
            if not all(input_arg in input for input_arg in self.task.input_args_names):
                logger.warn(
                    f"Missing {self.task.__class__.__name__} input argument in batch element {i}"
                )
                not_valid_inputs_indices.append(i)
                continue

            valid_inputs.append(input)

        return valid_inputs, not_valid_inputs_indices

    def _fill_missing_inputs(
        self,
        generations: List[List[LLMOutput]],
        invalid_inputs_indices: List[int],
        num_generations: int,
    ) -> List[List[LLMOutput]]:
        """Fills the `generations` list with empty `LLMOutput`s for the inputs that were
        not valid for the associated task of this `LLM`.

        Args:
            generations (List[List[LLMOutput]]): the generations to be filled.
            invalid_inputs_indices (List[int]): the indices of the inputs that were not
                valid for the associated task of this `LLM`.
            num_generations (int): the number of generations to be performed for each input.

        Returns:
            List[List[LLMOutput]]: the filled generations.
        """

        filled_generations = generations.copy()
        for idx in invalid_inputs_indices:
            filled_generations.insert(
                idx,
                [
                    LLMOutput(
                        model_name=self.model_name,
                        prompt_used=None,
                        raw_output=None,
                        parsed_output=None,
                    )
                    for _ in range(num_generations)
                ],
            )
        return filled_generations

    def generate(
        self,
        inputs: List[Dict[str, Any]],
        num_generations: int = 1,
        progress_callback_func: Union[Callable, None] = None,
    ) -> Union[List[List["LLMOutput"]], Future[List[List["LLMOutput"]]]]:
        """Generates the outputs for the given inputs using the LLM.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            num_generations (int, optional): the number of generations to be performed for each input.
                Defaults to `1`.
            progress_callback_func (Union[Callable, None], optional): a function to be called at each
                generation step. Defaults to `None`.

        Returns:
            Union[List[Future[List["LLMOutput"]]], List[List["LLMOutput"]]]: the generated outputs.
        """

        def _progress():
            if progress_callback_func is not None:
                progress_callback_func(advance=num_generations * len(inputs))

        valid_inputs, invalid_inputs_indices = self._get_valid_inputs(inputs)

        if self.thread_pool_executor is not None:
            futures = []
            for input in valid_inputs:
                future = self.thread_pool_executor.submit(
                    self._generate, [input], num_generations
                )
                futures.append(future)
            future = when_all_complete(
                futures=futures,
                callback=lambda generations: self._fill_missing_inputs(
                    generations, invalid_inputs_indices, num_generations
                ),
            )
            future.add_done_callback(lambda _: _progress())
            return future

        generations = self._generate(valid_inputs, num_generations)

        generations = self._fill_missing_inputs(
            generations, invalid_inputs_indices, num_generations
        )

        _progress()
        return generations

    @property
    def return_futures(self) -> bool:
        """Whether the `LLM` returns futures"""
        return self.thread_pool_executor is not None

return_futures: bool property

Whether the LLM returns futures

__del__()

Shuts down the thread pool executor if it is not None.

Source code in src/distilabel/llm/base.py
def __del__(self) -> None:
    """Shuts down the thread pool executor if it is not `None`."""
    if self.thread_pool_executor is not None:
        self.thread_pool_executor.shutdown()

__init__(task, num_threads=None, prompt_format=None, prompt_formatting_fn=None)

Initializes the LLM base class.

Note

This class is intended to be used internally, but you anyone can still create a subclass, implement the abstractmethods and use it.

Parameters:

Name Type Description Default
task Task

the task to be performed by the LLM.

required
num_threads Union[int, None]

the number of threads to be used for parallel generation. If None, no parallel generation will be performed. Defaults to None.

None
prompt_format Union['SupportedFormats', None]

the format to be used for the prompt. If None, the default format of the task will be used, available formats are openai, chatml, llama2, zephyr, and default. Defaults to None, but default (concatenation of system_prompt and formatted_prompt with a line-break) will be used if no prompt_formatting_fn is provided.

None
prompt_formatting_fn Union[Callable[..., str], None]

a function to be applied to the prompt before generation. If None, no formatting will be applied. Defaults to None.

None
Source code in src/distilabel/llm/base.py
def __init__(
    self,
    task: Task,
    num_threads: Union[int, None] = None,
    prompt_format: Union["SupportedFormats", None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """Initializes the LLM base class.

    Note:
        This class is intended to be used internally, but you anyone can still create
        a subclass, implement the `abstractmethod`s and use it.

    Args:
        task (Task): the task to be performed by the LLM.
        num_threads (Union[int, None], optional): the number of threads to be used
            for parallel generation. If `None`, no parallel generation will be performed.
            Defaults to `None`.
        prompt_format (Union["SupportedFormats", None], optional): the format to be used
            for the prompt. If `None`, the default format of the task will be used, available
            formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
            but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
            will be used if no `prompt_formatting_fn` is provided.
        prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
            applied to the prompt before generation. If `None`, no formatting will be applied.
            Defaults to `None`.
    """
    self.task = task

    self.thread_pool_executor = (
        ThreadPoolExecutor(max_workers=num_threads)
        if num_threads is not None
        else None
    )

    self.prompt_format = prompt_format
    self.prompt_formatting_fn = prompt_formatting_fn

generate(inputs, num_generations=1, progress_callback_func=None)

Generates the outputs for the given inputs using the LLM.

Parameters:

Name Type Description Default
inputs List[Dict[str, Any]]

the inputs to be used for generation.

required
num_generations int

the number of generations to be performed for each input. Defaults to 1.

1
progress_callback_func Union[Callable, None]

a function to be called at each generation step. Defaults to None.

None

Returns:

Type Description
Union[List[List['LLMOutput']], Future[List[List['LLMOutput']]]]

Union[List[Future[List["LLMOutput"]]], List[List["LLMOutput"]]]: the generated outputs.

Source code in src/distilabel/llm/base.py
def generate(
    self,
    inputs: List[Dict[str, Any]],
    num_generations: int = 1,
    progress_callback_func: Union[Callable, None] = None,
) -> Union[List[List["LLMOutput"]], Future[List[List["LLMOutput"]]]]:
    """Generates the outputs for the given inputs using the LLM.

    Args:
        inputs (List[Dict[str, Any]]): the inputs to be used for generation.
        num_generations (int, optional): the number of generations to be performed for each input.
            Defaults to `1`.
        progress_callback_func (Union[Callable, None], optional): a function to be called at each
            generation step. Defaults to `None`.

    Returns:
        Union[List[Future[List["LLMOutput"]]], List[List["LLMOutput"]]]: the generated outputs.
    """

    def _progress():
        if progress_callback_func is not None:
            progress_callback_func(advance=num_generations * len(inputs))

    valid_inputs, invalid_inputs_indices = self._get_valid_inputs(inputs)

    if self.thread_pool_executor is not None:
        futures = []
        for input in valid_inputs:
            future = self.thread_pool_executor.submit(
                self._generate, [input], num_generations
            )
            futures.append(future)
        future = when_all_complete(
            futures=futures,
            callback=lambda generations: self._fill_missing_inputs(
                generations, invalid_inputs_indices, num_generations
            ),
        )
        future.add_done_callback(lambda _: _progress())
        return future

    generations = self._generate(valid_inputs, num_generations)

    generations = self._fill_missing_inputs(
        generations, invalid_inputs_indices, num_generations
    )

    _progress()
    return generations

LLMPool

LLMPool is a class that wraps multiple ProcessLLMs and performs generation in parallel using them. Depending on the number of LLMs and the parameter num_generations, the LLMPool will decide how many generations to perform for each LLM:

  • If num_generations is less than the number of LLMs, then num_generations LLMs will be chosen randomly and each of them will perform 1 generation.

  • If num_generations is equal to the number of LLMs, then each LLM will perform 1 generation.

  • If num_generations is greater than the number of LLMs, then each LLM will perform num_generations // num_llms generations, and the remaining num_generations % num_llms generations will be performed by num_generations % num_llms randomly chosen LLMs.

Attributes:

Name Type Description
llms List[ProcessLLM]

the ProcessLLMs to be used for generation.

Source code in src/distilabel/llm/base.py
class LLMPool:
    """LLMPool is a class that wraps multiple `ProcessLLM`s and performs generation in
    parallel using them. Depending on the number of `LLM`s and the parameter `num_generations`,
    the `LLMPool` will decide how many generations to perform for each `LLM`:

    - If `num_generations` is less than the number of `LLM`s, then `num_generations` LLMs
    will be chosen randomly and each of them will perform 1 generation.


    - If `num_generations` is equal to the number of `LLM`s, then each `LLM` will perform
    1 generation.

    - If `num_generations` is greater than the number of `LLM`s, then each `LLM` will
    perform `num_generations // num_llms` generations, and the remaining `num_generations % num_llms`
    generations will be performed by `num_generations % num_llms` randomly chosen `LLM`s.

    Attributes:
        llms (List[ProcessLLM]): the `ProcessLLM`s to be used for generation.
    """

    def __init__(self, llms: List[ProcessLLM]) -> None:
        """Initializes the `LLMPool` class.

        Args:
            llms: the `ProcessLLM`s to be used for generation. The list must contain at
                least 2 `ProcessLLM`s.

        Raises:
            ValueError: if the `llms` argument contains less than 2 `ProcessLLM`s, the
                `llms` argument contains `ProcessLLM`s that are not `ProcessLLM`s, or
                if the `llms` argument contains `ProcessLLM`s with different tasks.
        """
        if len(llms) < 2:
            raise ValueError(
                "The `llms` argument must contain at least 2 `ProcessLLM`s. If you want"
                " to use a single `ProcessLLM`, use the `ProcessLLM` directly instead."
            )

        if not all(isinstance(llm, ProcessLLM) for llm in llms):
            raise ValueError("The `llms` argument must contain only `ProcessLLM`s.")

        # Note: The following piece of code is used to check that all the `ProcessLLM`s
        # have the same task or a subclass of it.
        mros = [(type(llm.task), len(type(llm.task).mro())) for llm in llms]
        min_common_class = min(mros, key=lambda x: x[1])[0]
        if not all(isinstance(llm.task, min_common_class) for llm in llms):
            # This can fail for example with 3 different TextGenerationTasks
            # Task1(TextGenerationTask), Task2(TextGenerationTask), Task2(TextGenerationTask)
            # because they share the same parent class but we don't check the common one
            # TODO(plaguss): We check that they all have the same parent class, this should be simplified
            # with the previous check
            parent_classes = [type(llm.task).mro()[1] for llm in llms]
            if not len(set(parent_classes)) == 1:
                raise ValueError(
                    "All the `ProcessLLM` in `llms` must share the same task (either as the instance or the parent class)."
                )

        self.llms = llms
        self.num_llms = len(llms)

    def _get_num_generations_per_llm(self, num_generations: int) -> Dict[int, int]:
        """Returns the number of generations to be performed by each `LLM`.

        Args:
            num_generations: the number of generations to be performed.

        Returns:
            Dict[int, int]: a dictionary where the keys are the ids of the `LLM`s and the
            values are the number of generations to be performed by each `LLM`.
        """
        llms_ids = list(range(self.num_llms))
        generations_per_llm = {i: num_generations // self.num_llms for i in llms_ids}

        for i in random.sample(llms_ids, k=num_generations % self.num_llms):
            generations_per_llm[i] += 1

        return generations_per_llm

    def generate(
        self,
        inputs: List[Dict[str, Any]],
        num_generations: int = 1,
        progress_callback_func: Union[Callable, None] = None,
    ) -> List[List["LLMOutput"]]:
        """Generates the outputs for the given inputs using the pool of `ProcessLLM`s.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            num_generations (int, optional): the number of generations to be performed for each input.
                Defaults to `1`.
            progress_callback_func (Union[Callable, None], optional): a function to be called at each
                generation step. Defaults to `None`.

        Returns:
            Future[List[List["LLMOutput"]]]: the generated outputs as a `Future`.
        """
        num_generations_per_llm = self._get_num_generations_per_llm(num_generations)

        futures = [
            llm.generate(
                inputs,
                num_generations=num_generations_per_llm[i],
                progress_callback_func=progress_callback_func,
            )
            for i, llm in enumerate(self.llms)
            if num_generations_per_llm[i] > 0
        ]
        llms_generations = [future.result() for future in futures]

        generations = []
        for llms_row_generations in zip(*llms_generations):
            row_generations = []
            for llm_row_generations in llms_row_generations:
                for generation in llm_row_generations:
                    row_generations.append(generation)
            generations.append(row_generations)

        return generations

    def teardown(self) -> None:
        """Stops the `ProcessLLM`s."""
        for llm in self.llms:
            llm.teardown()

    @property
    def task(self) -> "Task":
        """Returns the task that will be used by the `ProcessLLM`s of this pool.

        Returns:
            Task: the task that will be used by the `ProcessLLM`s of this pool.
        """
        return self.llms[0].task

    @property
    def return_futures(self) -> bool:
        """Whether the `LLM` returns futures"""
        return False

return_futures: bool property

Whether the LLM returns futures

task: 'Task' property

Returns the task that will be used by the ProcessLLMs of this pool.

Returns:

Name Type Description
Task 'Task'

the task that will be used by the ProcessLLMs of this pool.

__init__(llms)

Initializes the LLMPool class.

Parameters:

Name Type Description Default
llms List[ProcessLLM]

the ProcessLLMs to be used for generation. The list must contain at least 2 ProcessLLMs.

required

Raises:

Type Description
ValueError

if the llms argument contains less than 2 ProcessLLMs, the llms argument contains ProcessLLMs that are not ProcessLLMs, or if the llms argument contains ProcessLLMs with different tasks.

Source code in src/distilabel/llm/base.py
def __init__(self, llms: List[ProcessLLM]) -> None:
    """Initializes the `LLMPool` class.

    Args:
        llms: the `ProcessLLM`s to be used for generation. The list must contain at
            least 2 `ProcessLLM`s.

    Raises:
        ValueError: if the `llms` argument contains less than 2 `ProcessLLM`s, the
            `llms` argument contains `ProcessLLM`s that are not `ProcessLLM`s, or
            if the `llms` argument contains `ProcessLLM`s with different tasks.
    """
    if len(llms) < 2:
        raise ValueError(
            "The `llms` argument must contain at least 2 `ProcessLLM`s. If you want"
            " to use a single `ProcessLLM`, use the `ProcessLLM` directly instead."
        )

    if not all(isinstance(llm, ProcessLLM) for llm in llms):
        raise ValueError("The `llms` argument must contain only `ProcessLLM`s.")

    # Note: The following piece of code is used to check that all the `ProcessLLM`s
    # have the same task or a subclass of it.
    mros = [(type(llm.task), len(type(llm.task).mro())) for llm in llms]
    min_common_class = min(mros, key=lambda x: x[1])[0]
    if not all(isinstance(llm.task, min_common_class) for llm in llms):
        # This can fail for example with 3 different TextGenerationTasks
        # Task1(TextGenerationTask), Task2(TextGenerationTask), Task2(TextGenerationTask)
        # because they share the same parent class but we don't check the common one
        # TODO(plaguss): We check that they all have the same parent class, this should be simplified
        # with the previous check
        parent_classes = [type(llm.task).mro()[1] for llm in llms]
        if not len(set(parent_classes)) == 1:
            raise ValueError(
                "All the `ProcessLLM` in `llms` must share the same task (either as the instance or the parent class)."
            )

    self.llms = llms
    self.num_llms = len(llms)

generate(inputs, num_generations=1, progress_callback_func=None)

Generates the outputs for the given inputs using the pool of ProcessLLMs.

Parameters:

Name Type Description Default
inputs List[Dict[str, Any]]

the inputs to be used for generation.

required
num_generations int

the number of generations to be performed for each input. Defaults to 1.

1
progress_callback_func Union[Callable, None]

a function to be called at each generation step. Defaults to None.

None

Returns:

Type Description
List[List['LLMOutput']]

Future[List[List["LLMOutput"]]]: the generated outputs as a Future.

Source code in src/distilabel/llm/base.py
def generate(
    self,
    inputs: List[Dict[str, Any]],
    num_generations: int = 1,
    progress_callback_func: Union[Callable, None] = None,
) -> List[List["LLMOutput"]]:
    """Generates the outputs for the given inputs using the pool of `ProcessLLM`s.

    Args:
        inputs (List[Dict[str, Any]]): the inputs to be used for generation.
        num_generations (int, optional): the number of generations to be performed for each input.
            Defaults to `1`.
        progress_callback_func (Union[Callable, None], optional): a function to be called at each
            generation step. Defaults to `None`.

    Returns:
        Future[List[List["LLMOutput"]]]: the generated outputs as a `Future`.
    """
    num_generations_per_llm = self._get_num_generations_per_llm(num_generations)

    futures = [
        llm.generate(
            inputs,
            num_generations=num_generations_per_llm[i],
            progress_callback_func=progress_callback_func,
        )
        for i, llm in enumerate(self.llms)
        if num_generations_per_llm[i] > 0
    ]
    llms_generations = [future.result() for future in futures]

    generations = []
    for llms_row_generations in zip(*llms_generations):
        row_generations = []
        for llm_row_generations in llms_row_generations:
            for generation in llm_row_generations:
                row_generations.append(generation)
        generations.append(row_generations)

    return generations

teardown()

Stops the ProcessLLMs.

Source code in src/distilabel/llm/base.py
def teardown(self) -> None:
    """Stops the `ProcessLLM`s."""
    for llm in self.llms:
        llm.teardown()

LlamaCppLLM

Bases: LLM

Source code in src/distilabel/llm/llama_cpp.py
class LlamaCppLLM(LLM):
    def __init__(
        self,
        model: "Llama",
        task: "Task",
        max_new_tokens: int = 128,
        temperature: float = 0.8,
        top_p: float = 0.95,
        top_k: int = 40,
        repeat_penalty: float = 1.1,
        seed: int = 1337,
        prompt_format: Union[SupportedFormats, None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """Initializes the LlamaCppLLM class.

        Args:
            model (Llama): the llama-cpp model to be used.
            task (Task): the task to be performed by the LLM.
            max_new_tokens (int, optional): the maximum number of tokens to be generated.
                Defaults to 128.
            temperature (float, optional): the temperature to be used for generation.
                Defaults to 0.8.
            top_p (float, optional): the top-p value to be used for generation.
                Defaults to 0.95.
            top_k (int, optional): the top-k value to be used for generation.
                Defaults to 40.
            repeat_penalty (float, optional): the repeat penalty to be used for generation.
                Defaults to 1.1.
            seed (int, optional): the seed to be used for generation, setting it to -1 implies
                that a different response will be generated on each generation, similarly to
                HuggingFace's `do_sample` arg. Defaults to 1337.
            prompt_format (Union[SupportedFormats, None], optional): the format to be used
                for the prompt. If `None`, the default format of the task will be used, available
                formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
                but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
                will be used if no `prompt_formatting_fn` is provided.
            prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
                applied to the prompt before generation. If `None`, no formatting will be applied.
                Defaults to `None`.

        Examples:
            >>> from llama_cpp import Llama
            >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
            >>> from distilabel.llm import LlamaCppLLM
            >>> model = Llama(model_path="path/to/model")
            >>> task = Task()
            >>> llm = LlamaCppLLM(model=model, task=task)
        """
        super().__init__(
            task=task,
            prompt_format=prompt_format,
            prompt_formatting_fn=prompt_formatting_fn,
        )

        if not _LLAMA_CPP_AVAILABLE:
            raise ImportError(
                "`LlamaCppLLM` cannot be used as `llama_cpp` is not installed, please "
                " install it with `pip install llama-cpp-python`."
            )

        self.max_tokens = max_new_tokens
        self.temperature = temperature
        self.top_p = top_p
        self.top_k = top_k
        self.repeat_penalty = repeat_penalty
        self.seed = seed

        self.model = model

    def __rich_repr__(self) -> Generator[Any, None, None]:
        yield from super().__rich_repr__()
        yield (
            "parameters",
            {
                "max_new_tokens": self.max_tokens,
                "temperature": self.temperature,
                "top_p": self.top_p,
                "top_k": self.top_k,
                "repeat_penalty": self.repeat_penalty,
            },
        )

    @property
    def model_name(self) -> str:
        """Returns the name of the llama-cpp model, which is the same as the model path."""
        return self.model.model_path

    def _generate(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List[LLMOutput]]:
        """Generates `num_generations` for each input in `inputs`.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            num_generations (int, optional): the number of generations to be performed for each
                input. Defaults to 1.

        Returns:
            List[List[LLMOutput]]: the generated outputs.
        """
        prompts = self._generate_prompts(inputs, default_format=None)
        outputs = []
        for prompt in prompts:
            output = []
            for _ in range(num_generations):
                raw_output = self.model.create_completion(
                    prompt,
                    max_tokens=self.max_tokens,
                    temperature=self.temperature,
                    top_p=self.top_p,
                    top_k=self.top_k,
                    repeat_penalty=self.repeat_penalty,
                )
                try:
                    parsed_output = self.task.parse_output(
                        raw_output["choices"][0]["text"].strip()
                    )
                except Exception as e:
                    logger.error(f"Error parsing llama-cpp output: {e}")
                    parsed_output = None
                output.append(
                    LLMOutput(
                        model_name=self.model_name,
                        prompt_used=prompt,
                        raw_output=raw_output,
                        parsed_output=parsed_output,
                    )
                )
            outputs.append(output)
        return outputs

model_name: str property

Returns the name of the llama-cpp model, which is the same as the model path.

__init__(model, task, max_new_tokens=128, temperature=0.8, top_p=0.95, top_k=40, repeat_penalty=1.1, seed=1337, prompt_format=None, prompt_formatting_fn=None)

Initializes the LlamaCppLLM class.

Parameters:

Name Type Description Default
model Llama

the llama-cpp model to be used.

required
task Task

the task to be performed by the LLM.

required
max_new_tokens int

the maximum number of tokens to be generated. Defaults to 128.

128
temperature float

the temperature to be used for generation. Defaults to 0.8.

0.8
top_p float

the top-p value to be used for generation. Defaults to 0.95.

0.95
top_k int

the top-k value to be used for generation. Defaults to 40.

40
repeat_penalty float

the repeat penalty to be used for generation. Defaults to 1.1.

1.1
seed int

the seed to be used for generation, setting it to -1 implies that a different response will be generated on each generation, similarly to HuggingFace's do_sample arg. Defaults to 1337.

1337
prompt_format Union[SupportedFormats, None]

the format to be used for the prompt. If None, the default format of the task will be used, available formats are openai, chatml, llama2, zephyr, and default. Defaults to None, but default (concatenation of system_prompt and formatted_prompt with a line-break) will be used if no prompt_formatting_fn is provided.

None
prompt_formatting_fn Union[Callable[..., str], None]

a function to be applied to the prompt before generation. If None, no formatting will be applied. Defaults to None.

None

Examples:

>>> from llama_cpp import Llama
>>> from distilabel.tasks.text_generation import TextGenerationTask as Task
>>> from distilabel.llm import LlamaCppLLM
>>> model = Llama(model_path="path/to/model")
>>> task = Task()
>>> llm = LlamaCppLLM(model=model, task=task)
Source code in src/distilabel/llm/llama_cpp.py
def __init__(
    self,
    model: "Llama",
    task: "Task",
    max_new_tokens: int = 128,
    temperature: float = 0.8,
    top_p: float = 0.95,
    top_k: int = 40,
    repeat_penalty: float = 1.1,
    seed: int = 1337,
    prompt_format: Union[SupportedFormats, None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """Initializes the LlamaCppLLM class.

    Args:
        model (Llama): the llama-cpp model to be used.
        task (Task): the task to be performed by the LLM.
        max_new_tokens (int, optional): the maximum number of tokens to be generated.
            Defaults to 128.
        temperature (float, optional): the temperature to be used for generation.
            Defaults to 0.8.
        top_p (float, optional): the top-p value to be used for generation.
            Defaults to 0.95.
        top_k (int, optional): the top-k value to be used for generation.
            Defaults to 40.
        repeat_penalty (float, optional): the repeat penalty to be used for generation.
            Defaults to 1.1.
        seed (int, optional): the seed to be used for generation, setting it to -1 implies
            that a different response will be generated on each generation, similarly to
            HuggingFace's `do_sample` arg. Defaults to 1337.
        prompt_format (Union[SupportedFormats, None], optional): the format to be used
            for the prompt. If `None`, the default format of the task will be used, available
            formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
            but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
            will be used if no `prompt_formatting_fn` is provided.
        prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
            applied to the prompt before generation. If `None`, no formatting will be applied.
            Defaults to `None`.

    Examples:
        >>> from llama_cpp import Llama
        >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
        >>> from distilabel.llm import LlamaCppLLM
        >>> model = Llama(model_path="path/to/model")
        >>> task = Task()
        >>> llm = LlamaCppLLM(model=model, task=task)
    """
    super().__init__(
        task=task,
        prompt_format=prompt_format,
        prompt_formatting_fn=prompt_formatting_fn,
    )

    if not _LLAMA_CPP_AVAILABLE:
        raise ImportError(
            "`LlamaCppLLM` cannot be used as `llama_cpp` is not installed, please "
            " install it with `pip install llama-cpp-python`."
        )

    self.max_tokens = max_new_tokens
    self.temperature = temperature
    self.top_p = top_p
    self.top_k = top_k
    self.repeat_penalty = repeat_penalty
    self.seed = seed

    self.model = model

OllamaLLM

Bases: LLM

Source code in src/distilabel/llm/ollama.py
class OllamaLLM(LLM):
    OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "http://localhost:11434")

    def __init__(
        self,
        model: str,
        task: "Task",
        max_new_tokens: int = None,  # num_predict
        temperature: Union[float, None] = None,
        top_k: Union[int, None] = None,
        top_p: Union[float, None] = None,
        num_threads: Union[int, None] = None,
        prompt_format: Union["SupportedFormats", None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """
        Initializes the OllamaLLM class and align with https://github.com/jmorganca/ollama/blob/main/docs/modelfile.md#valid-parameters-and-values

        Args:
            model (str): the model to be used for generation.
            task (Task): the task to be performed by the LLM.

            max_new_tokens (int, optional): the maximum number of tokens to be generated.
                Defaults to `None`.
            temperature (float, optional): the temperature to be used for generation.
                Defaults to `None`.
            top_k (int, optional): the top-k value to be used for generation.
                Defaults to `None`.
            top_p (float, optional): the top-p value to be used for generation.
                Defaults to `None`.
            num_threads (Union[int, None], optional): the number of threads to be used
                for parallel generation. If `None`, no parallel generation will be performed.
                Defaults to `None`.
            prompt_format (Union[SupportedFormats, None], optional): the format to be used
                for the prompt. If `None`, the default format of the task will be used, available
                formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
                but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
                will be used if no `prompt_formatting_fn` is provided.
            prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
                applied to the prompt before generation. If `None`, no formatting will be applied.
                Defaults to `None`..

        Raises:
            ValueError: if the model is not available.
            ValueError: if the Ollama API request failed.

        Examples:
            >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
            >>> from distilabel.llm import OllamaLLM
            >>> task = Task()
            >>> llm = OllamaLLM(model="notus", task=task)
        """
        super().__init__(
            task=task,
            num_threads=num_threads,
            prompt_format=prompt_format,
            prompt_formatting_fn=prompt_formatting_fn,
        )

        self.model = model
        self.max_new_tokens = max_new_tokens
        self.temperature = temperature
        self.top_k = top_k
        self.top_p = top_p

        self._api_available()
        self._api_model_available()

    @property
    def model_name(self) -> str:
        """Returns the name of the Ollama model."""
        return self.model

    def _api_available(self):
        """calls GET {OLLAMA_HOST}"""
        msg = f"Could not connect to Ollama as {self.OLLAMA_HOST}. Check https://github.com/jmorganca/ollama for deployment guide."
        try:
            response = request.urlopen(self.OLLAMA_HOST)
            if response.getcode() != 200:
                raise Exception
        except Exception as e:
            raise ValueError(msg) from e

    def _api_model_available(self):
        msg = f"Model {self.model} is not available. Run `ollama run {self.model}` to serve the model."
        try:
            self._text_generation_with_backoff(
                prompt=[{"role": "user", "content": "hi"}], max_tokens=1
            )
        except Exception as e:
            raise ValueError(msg) from e

    @retry(
        retry=retry_if_exception_type(_OLLAMA_API_RETRY_ON_EXCEPTIONS),
        stop=stop_after_attempt(_OLLAMA_API_STOP_AFTER_ATTEMPT),
        wait=wait_random_exponential(
            multiplier=_OLLAMA_API_WAIT_RANDOM_EXPONENTIAL_MULTIPLIER,
            max=_OLLAMA_API_WAIT_RANDOM_EXPONENTIAL_MAX,
        ),
        before_sleep=before_sleep_log(logger, logging.INFO),
        after=after_log(logger, logging.INFO),
    )
    def _text_generation_with_backoff(self, prompt: str, **kwargs) -> str:
        """Calls POST {OLLAMA_HOST}/api/chat"""
        # Request payload
        payload = {
            "model": self.model,
            "messages": prompt,
            "stream": False,
        }
        options = {
            "num_predict": kwargs.get("max_new_tokens") or self.max_new_tokens,
            "temperature": self.temperature,
            "top_p": self.top_p,
            "top_k": self.top_k,
        }
        # remove None values
        options = {k: v for k, v in options.items() if v is not None}
        payload["options"] = options

        # Convert payload to JSON
        data = json.dumps(payload).encode("utf-8")

        # Create the request
        url = f"{self.OLLAMA_HOST}/api/chat"
        req = request.Request(
            url, data=data, headers={"Content-Type": "application/json"}
        )
        with request.urlopen(req) as response:
            # Check if the request was successful (status code 200)
            if response.getcode() == 200:
                # Parse and return the response JSON
                return json.loads(response.read().decode("utf-8"))
            elif response.getcode() >= 500:
                # If the request failed, try again with backoff
                raise error.HTTPError(
                    url=url,
                    code=response.getcode(),
                    msg=f"Server Error {response.getcode()}",
                    hdrs=response.getheaders(),
                    fp=None,
                )
            else:
                raise ValueError(
                    f"Ollama API request failed with status_code {response.getcode()}."
                )

    def __rich_repr__(self) -> Generator[Any, None, None]:
        yield from super().__rich_repr__()
        yield (
            "parameters",
            {
                "model": self.model,
                "max_new_tokens": self.max_new_tokens,
                "temperature": self.temperature,
                "top_k": self.top_k,
                "top_p": self.top_p,
            },
        )

    def _generate(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List[LLMOutput]]:
        prompts = self._generate_prompts(inputs, default_format="openai")
        outputs = []
        for prompt in prompts:
            responses = [
                self._text_generation_with_backoff(prompt=prompt)
                for _ in range(num_generations)
            ]
            output = []
            for response in responses:
                raw_output = response.get("message", {}).get("content")
                try:
                    parsed_response = self.task.parse_output(raw_output.strip())
                except Exception as e:
                    logger.error(f"Error parsing OpenAI response: {e}")
                    parsed_response = None
                output.append(
                    LLMOutput(
                        model_name=self.model_name,
                        prompt_used=prompt,
                        raw_output=raw_output,
                        parsed_output=parsed_response,
                    )
                )
            outputs.append(output)
        return outputs

model_name: str property

Returns the name of the Ollama model.

__init__(model, task, max_new_tokens=None, temperature=None, top_k=None, top_p=None, num_threads=None, prompt_format=None, prompt_formatting_fn=None)

Initializes the OllamaLLM class and align with https://github.com/jmorganca/ollama/blob/main/docs/modelfile.md#valid-parameters-and-values

Parameters:

Name Type Description Default
model str

the model to be used for generation.

required
task Task

the task to be performed by the LLM.

required
max_new_tokens int

the maximum number of tokens to be generated. Defaults to None.

None
temperature float

the temperature to be used for generation. Defaults to None.

None
top_k int

the top-k value to be used for generation. Defaults to None.

None
top_p float

the top-p value to be used for generation. Defaults to None.

None
num_threads Union[int, None]

the number of threads to be used for parallel generation. If None, no parallel generation will be performed. Defaults to None.

None
prompt_format Union[SupportedFormats, None]

the format to be used for the prompt. If None, the default format of the task will be used, available formats are openai, chatml, llama2, zephyr, and default. Defaults to None, but default (concatenation of system_prompt and formatted_prompt with a line-break) will be used if no prompt_formatting_fn is provided.

None
prompt_formatting_fn Union[Callable[..., str], None]

a function to be applied to the prompt before generation. If None, no formatting will be applied. Defaults to None..

None

Raises:

Type Description
ValueError

if the model is not available.

ValueError

if the Ollama API request failed.

Examples:

>>> from distilabel.tasks.text_generation import TextGenerationTask as Task
>>> from distilabel.llm import OllamaLLM
>>> task = Task()
>>> llm = OllamaLLM(model="notus", task=task)
Source code in src/distilabel/llm/ollama.py
def __init__(
    self,
    model: str,
    task: "Task",
    max_new_tokens: int = None,  # num_predict
    temperature: Union[float, None] = None,
    top_k: Union[int, None] = None,
    top_p: Union[float, None] = None,
    num_threads: Union[int, None] = None,
    prompt_format: Union["SupportedFormats", None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """
    Initializes the OllamaLLM class and align with https://github.com/jmorganca/ollama/blob/main/docs/modelfile.md#valid-parameters-and-values

    Args:
        model (str): the model to be used for generation.
        task (Task): the task to be performed by the LLM.

        max_new_tokens (int, optional): the maximum number of tokens to be generated.
            Defaults to `None`.
        temperature (float, optional): the temperature to be used for generation.
            Defaults to `None`.
        top_k (int, optional): the top-k value to be used for generation.
            Defaults to `None`.
        top_p (float, optional): the top-p value to be used for generation.
            Defaults to `None`.
        num_threads (Union[int, None], optional): the number of threads to be used
            for parallel generation. If `None`, no parallel generation will be performed.
            Defaults to `None`.
        prompt_format (Union[SupportedFormats, None], optional): the format to be used
            for the prompt. If `None`, the default format of the task will be used, available
            formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
            but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
            will be used if no `prompt_formatting_fn` is provided.
        prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
            applied to the prompt before generation. If `None`, no formatting will be applied.
            Defaults to `None`..

    Raises:
        ValueError: if the model is not available.
        ValueError: if the Ollama API request failed.

    Examples:
        >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
        >>> from distilabel.llm import OllamaLLM
        >>> task = Task()
        >>> llm = OllamaLLM(model="notus", task=task)
    """
    super().__init__(
        task=task,
        num_threads=num_threads,
        prompt_format=prompt_format,
        prompt_formatting_fn=prompt_formatting_fn,
    )

    self.model = model
    self.max_new_tokens = max_new_tokens
    self.temperature = temperature
    self.top_k = top_k
    self.top_p = top_p

    self._api_available()
    self._api_model_available()

OpenAILLM

Bases: LLM

Source code in src/distilabel/llm/openai.py
class OpenAILLM(LLM):
    def __init__(
        self,
        task: "Task",
        model: str = "gpt-3.5-turbo",
        client: Union["OpenAI", None] = None,
        openai_api_key: Union[str, None] = None,
        max_new_tokens: int = 128,
        frequency_penalty: float = 0.0,
        presence_penalty: float = 0.0,
        temperature: float = 1.0,
        top_p: float = 1.0,
        num_threads: Union[int, None] = None,
        prompt_format: Union["SupportedFormats", None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """Initializes the OpenAILLM class.

        Args:
            task (Task): the task to be performed by the LLM.
            model (str, optional): the model to be used for generation. Defaults to "gpt-3.5-turbo".
            client (Union[OpenAI, None], optional): an OpenAI client to be used for generation.
                If `None`, a new client will be created. Defaults to `None`.
            openai_api_key (Union[str, None], optional): the OpenAI API key to be used for generation.
                If `None`, the `OPENAI_API_KEY` environment variable will be used. Defaults to `None`.
            max_new_tokens (int, optional): the maximum number of tokens to be generated.
                Defaults to 128.
            frequency_penalty (float, optional): the frequency penalty to be used for generation.
                Defaults to 0.0.
            presence_penalty (float, optional): the presence penalty to be used for generation.
                Defaults to 0.0.
            temperature (float, optional): the temperature to be used for generation.
                Defaults to 1.0.
            top_p (float, optional): the top-p value to be used for generation.
                Defaults to 1.0.
            num_threads (Union[int, None], optional): the number of threads to be used
                for parallel generation. If `None`, no parallel generation will be performed.
                Defaults to `None`.
            prompt_format (Union[SupportedFormats, None], optional): the format to be used
                for the prompt. If `None`, the default format of the task will be used, available
                formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
                but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
                will be used if no `prompt_formatting_fn` is provided.
            prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
                applied to the prompt before generation. If `None`, no formatting will be applied.
                Defaults to `None`.

        Raises:
            AssertionError: if the provided `model` is not available in your OpenAI account.

        Examples:
            >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
            >>> from distilabel.llm import OpenAILLM
            >>> task = Task()
            >>> llm = OpenAILLM(model="gpt-3.5-turbo", task=task)
        """
        super().__init__(
            task=task,
            num_threads=num_threads,
            prompt_format=prompt_format,
            prompt_formatting_fn=prompt_formatting_fn,
        )

        if not _OPENAI_AVAILABLE:
            raise ImportError(
                "`OpenAILLM` cannot be used as `openai` is not installed, please "
                " install it with `pip install openai`."
            )

        self.max_tokens = max_new_tokens
        self.frequency_penalty = frequency_penalty
        self.presence_penalty = presence_penalty
        self.temperature = temperature
        self.top_p = top_p

        self.client = client or OpenAI(api_key=openai_api_key, max_retries=6)

        assert (
            model in self.available_models
        ), f"Provided `model` is not available in your OpenAI account, available models are {self.available_models}"
        self.model = model

    def __rich_repr__(self) -> Generator[Any, None, None]:
        yield from super().__rich_repr__()
        yield (
            "parameters",
            {
                "max_tokens": self.max_tokens,
                "frequency_penalty": self.frequency_penalty,
                "presence_penalty": self.presence_penalty,
                "temperature": self.temperature,
                "top_p": self.top_p,
            },
        )

    @cached_property
    def available_models(self) -> List[str]:
        """Returns the list of available models in your OpenAI account."""
        return [model.id for model in self.client.models.list().data]

    @property
    def model_name(self) -> str:
        """Returns the name of the OpenAI model."""
        return self.model

    def _generate(
        self,
        inputs: List[Dict[str, Any]],
        num_generations: int = 1,
    ) -> List[List[LLMOutput]]:
        """Generates `num_generations` for each input in `inputs`.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            num_generations (int, optional): the number of generations to be performed for each
                input. Defaults to 1.

        Returns:
            List[List[LLMOutput]]: the generated outputs.
        """
        prompts = self._generate_prompts(inputs, default_format="openai")
        outputs = []
        for prompt in prompts:
            chat_completions = self.client.chat.completions.create(
                messages=prompt,
                model=self.model,
                n=num_generations,
                max_tokens=self.max_tokens,
                frequency_penalty=self.frequency_penalty,
                presence_penalty=self.presence_penalty,
                temperature=self.temperature,
                top_p=self.top_p,
                timeout=50,
            )

            output = []
            for chat_completion in chat_completions.choices:
                try:
                    parsed_response = self.task.parse_output(
                        chat_completion.message.content.strip()
                    )
                except Exception as e:
                    logger.error(f"Error parsing OpenAI response: {e}")
                    parsed_response = None
                output.append(
                    LLMOutput(
                        model_name=self.model_name,
                        prompt_used=prompt,
                        raw_output=chat_completion.message.content,
                        parsed_output=parsed_response,
                    )
                )
            outputs.append(output)
        return outputs

available_models: List[str] cached property

Returns the list of available models in your OpenAI account.

model_name: str property

Returns the name of the OpenAI model.

__init__(task, model='gpt-3.5-turbo', client=None, openai_api_key=None, max_new_tokens=128, frequency_penalty=0.0, presence_penalty=0.0, temperature=1.0, top_p=1.0, num_threads=None, prompt_format=None, prompt_formatting_fn=None)

Initializes the OpenAILLM class.

Parameters:

Name Type Description Default
task Task

the task to be performed by the LLM.

required
model str

the model to be used for generation. Defaults to "gpt-3.5-turbo".

'gpt-3.5-turbo'
client Union[OpenAI, None]

an OpenAI client to be used for generation. If None, a new client will be created. Defaults to None.

None
openai_api_key Union[str, None]

the OpenAI API key to be used for generation. If None, the OPENAI_API_KEY environment variable will be used. Defaults to None.

None
max_new_tokens int

the maximum number of tokens to be generated. Defaults to 128.

128
frequency_penalty float

the frequency penalty to be used for generation. Defaults to 0.0.

0.0
presence_penalty float

the presence penalty to be used for generation. Defaults to 0.0.

0.0
temperature float

the temperature to be used for generation. Defaults to 1.0.

1.0
top_p float

the top-p value to be used for generation. Defaults to 1.0.

1.0
num_threads Union[int, None]

the number of threads to be used for parallel generation. If None, no parallel generation will be performed. Defaults to None.

None
prompt_format Union[SupportedFormats, None]

the format to be used for the prompt. If None, the default format of the task will be used, available formats are openai, chatml, llama2, zephyr, and default. Defaults to None, but default (concatenation of system_prompt and formatted_prompt with a line-break) will be used if no prompt_formatting_fn is provided.

None
prompt_formatting_fn Union[Callable[..., str], None]

a function to be applied to the prompt before generation. If None, no formatting will be applied. Defaults to None.

None

Raises:

Type Description
AssertionError

if the provided model is not available in your OpenAI account.

Examples:

>>> from distilabel.tasks.text_generation import TextGenerationTask as Task
>>> from distilabel.llm import OpenAILLM
>>> task = Task()
>>> llm = OpenAILLM(model="gpt-3.5-turbo", task=task)
Source code in src/distilabel/llm/openai.py
def __init__(
    self,
    task: "Task",
    model: str = "gpt-3.5-turbo",
    client: Union["OpenAI", None] = None,
    openai_api_key: Union[str, None] = None,
    max_new_tokens: int = 128,
    frequency_penalty: float = 0.0,
    presence_penalty: float = 0.0,
    temperature: float = 1.0,
    top_p: float = 1.0,
    num_threads: Union[int, None] = None,
    prompt_format: Union["SupportedFormats", None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """Initializes the OpenAILLM class.

    Args:
        task (Task): the task to be performed by the LLM.
        model (str, optional): the model to be used for generation. Defaults to "gpt-3.5-turbo".
        client (Union[OpenAI, None], optional): an OpenAI client to be used for generation.
            If `None`, a new client will be created. Defaults to `None`.
        openai_api_key (Union[str, None], optional): the OpenAI API key to be used for generation.
            If `None`, the `OPENAI_API_KEY` environment variable will be used. Defaults to `None`.
        max_new_tokens (int, optional): the maximum number of tokens to be generated.
            Defaults to 128.
        frequency_penalty (float, optional): the frequency penalty to be used for generation.
            Defaults to 0.0.
        presence_penalty (float, optional): the presence penalty to be used for generation.
            Defaults to 0.0.
        temperature (float, optional): the temperature to be used for generation.
            Defaults to 1.0.
        top_p (float, optional): the top-p value to be used for generation.
            Defaults to 1.0.
        num_threads (Union[int, None], optional): the number of threads to be used
            for parallel generation. If `None`, no parallel generation will be performed.
            Defaults to `None`.
        prompt_format (Union[SupportedFormats, None], optional): the format to be used
            for the prompt. If `None`, the default format of the task will be used, available
            formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
            but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
            will be used if no `prompt_formatting_fn` is provided.
        prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
            applied to the prompt before generation. If `None`, no formatting will be applied.
            Defaults to `None`.

    Raises:
        AssertionError: if the provided `model` is not available in your OpenAI account.

    Examples:
        >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
        >>> from distilabel.llm import OpenAILLM
        >>> task = Task()
        >>> llm = OpenAILLM(model="gpt-3.5-turbo", task=task)
    """
    super().__init__(
        task=task,
        num_threads=num_threads,
        prompt_format=prompt_format,
        prompt_formatting_fn=prompt_formatting_fn,
    )

    if not _OPENAI_AVAILABLE:
        raise ImportError(
            "`OpenAILLM` cannot be used as `openai` is not installed, please "
            " install it with `pip install openai`."
        )

    self.max_tokens = max_new_tokens
    self.frequency_penalty = frequency_penalty
    self.presence_penalty = presence_penalty
    self.temperature = temperature
    self.top_p = top_p

    self.client = client or OpenAI(api_key=openai_api_key, max_retries=6)

    assert (
        model in self.available_models
    ), f"Provided `model` is not available in your OpenAI account, available models are {self.available_models}"
    self.model = model

ProcessLLM

A class that wraps an LLM and performs generation in a separate process. The result is a Future that will be set when the generation is completed.

This class creates a new child process that will load the LLM and perform the text generation. In order to communicate with this child process, a bridge thread is created in the main process. The bridge thread will send and receive the results from the child process using multiprocessing.Queues. The communication between the bridge thread and the main process is done using Futures. This architecture was inspired by the ProcessPoolExecutor from the concurrent.futures module and it's a simplified version of it.

Source code in src/distilabel/llm/base.py
class ProcessLLM:
    """A class that wraps an `LLM` and performs generation in a separate process. The
    result is a `Future` that will be set when the generation is completed.

    This class creates a new child process that will load the `LLM` and perform the
    text generation. In order to communicate with this child process, a bridge thread
    is created in the main process. The bridge thread will send and receive the results
    from the child process using `multiprocessing.Queue`s. The communication between the
    bridge thread and the main process is done using `Future`s. This architecture was
    inspired by the `ProcessPoolExecutor` from the `concurrent.futures` module and it's
    a simplified version of it.
    """

    def __init__(self, task: Task, load_llm_fn: Callable[[Task], LLM]) -> None:
        """Initializes the `ProcessLLM` class.

        Args:
            task: the task to be performed by the `LLM`. This task will be used by the
                child process when calling the `load_llm_fn`.
            load_llm_fn (Callable[[Task], LLM]): a function that will be executed in the
                child process to load the `LLM`. It must return an `LLM` instance.
        """
        self.task = task

        self._load_llm_fn = load_llm_fn

        # The bridge thread will act as a bridge between the main process and the child
        # process for communication. It will send the generation requests to the child
        # process and receive the results from the child process.
        self._bridge_thread = None

        # The child process which will load the `LLM` and perform the generation.
        self._generation_process = None

        # The `Semaphore` that will be used to synchronize the loading of the `LLM`.
        # `_BridgeThread` will be blocked until `_GenerationProcess` has called the
        # `load_llm_fn` and the `LLM` has been loaded.
        self._load_llm_sem = mp.Semaphore(0)

        # This thread will create text generation requests
        self.pending_text_generation_request: Dict[int, _TextGenerationRequest] = {}
        self.text_generation_request_count = 0
        self.text_generation_request_ids_queue: queue.Queue[int] = queue.Queue()

        # Queues for the communication between the `_BridgeThread` and the `_GenerationProcess`
        self._call_queue = mp.Queue()
        self._result_queue = mp.Queue()

        # Shared memory object for transfering the `model_name` to the main process
        # once the `LLM` is loaded
        self._model_name = mp.Array(c_char, MAX_MODEL_NAME_LENGTH)

    def _start_bridge_thread(self) -> None:
        """Starts the bridge thread and the generation process."""
        if self._bridge_thread is None:
            self._generation_process = _GenerationProcess(self)
            self._generation_process.start()
            pid = self._generation_process.pid
            logger.debug(f"Generation process with PID {pid} started!")

            self._bridge_thread = _BridgeThread(self)
            self._bridge_thread.start()
            logger.debug("Bridge thread for process with PID {pid} started!")

    def _add_text_generation_request(
        self,
        inputs: List[Dict[str, Any]],
        num_generations: int = 1,
        progress_callback_func: Union[Callable, None] = None,
    ) -> Future[List[List["LLMOutput"]]]:
        """Creates and send a new text generation request to the bridge thread. This thread
        and the bridge thread shares a dictionary used to store the text generation requests.
        This thread will add the text generation requests to the dictionary and the bridge
        thread will only read from it. In order for the bridge thread to know that a new
        text generation request has been added to the dictionary, this thread will put the
        id of the request in a queue. The bridge thread will read from this queue and get
        the text generation request from the dictionary.
        """

        def _progress():
            if progress_callback_func is not None:
                progress_callback_func(advance=num_generations * len(inputs))

        text_generation_request = _TextGenerationRequest(
            inputs=inputs, num_generations=num_generations
        )
        # Put the request information in the dictionary associated to the request id
        self.pending_text_generation_request[
            self.text_generation_request_count
        ] = text_generation_request
        # Put the request id in the queue (for the `_BridgeThread` to consume it)
        self.text_generation_request_ids_queue.put(self.text_generation_request_count)
        self.text_generation_request_count += 1
        text_generation_request.future.add_done_callback(lambda _: _progress())
        return text_generation_request.future

    def generate(
        self,
        inputs: List[Dict[str, Any]],
        num_generations: int = 1,
        progress_callback_func: Union[Callable, None] = None,
    ) -> Future[List[List["LLMOutput"]]]:
        """Generates the outputs for the given inputs using the `ProcessLLM` and its loaded
        `LLM`.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            num_generations (int, optional): the number of generations to be performed for each input.
                Defaults to `1`.
            progress_callback_func (Union[Callable, None], optional): a function to be called at each
                generation step. Defaults to `None`.

        Returns:
            Future[List[List["LLMOutput"]]]: the generated outputs as a `Future`.
        """
        self._start_bridge_thread()
        return self._add_text_generation_request(
            inputs, num_generations, progress_callback_func
        )

    def teardown(self) -> None:
        """Stops the bridge thread and the generation process."""
        if self._generation_process is not None:
            self._generation_process.stop()
            self._generation_process.join()

        if self._bridge_thread is not None:
            self._bridge_thread.stop()
            self._bridge_thread.join()

    @cached_property
    def model_name(self) -> str:
        """Returns the model name of the `LLM` once it has been loaded."""
        with self._model_name:
            return "".join([c.decode() for c in self._model_name if c != b"\0"])

    @property
    def return_futures(self) -> bool:
        """Whether the `LLM` returns futures"""
        return True

model_name: str cached property

Returns the model name of the LLM once it has been loaded.

return_futures: bool property

Whether the LLM returns futures

__init__(task, load_llm_fn)

Initializes the ProcessLLM class.

Parameters:

Name Type Description Default
task Task

the task to be performed by the LLM. This task will be used by the child process when calling the load_llm_fn.

required
load_llm_fn Callable[[Task], LLM]

a function that will be executed in the child process to load the LLM. It must return an LLM instance.

required
Source code in src/distilabel/llm/base.py
def __init__(self, task: Task, load_llm_fn: Callable[[Task], LLM]) -> None:
    """Initializes the `ProcessLLM` class.

    Args:
        task: the task to be performed by the `LLM`. This task will be used by the
            child process when calling the `load_llm_fn`.
        load_llm_fn (Callable[[Task], LLM]): a function that will be executed in the
            child process to load the `LLM`. It must return an `LLM` instance.
    """
    self.task = task

    self._load_llm_fn = load_llm_fn

    # The bridge thread will act as a bridge between the main process and the child
    # process for communication. It will send the generation requests to the child
    # process and receive the results from the child process.
    self._bridge_thread = None

    # The child process which will load the `LLM` and perform the generation.
    self._generation_process = None

    # The `Semaphore` that will be used to synchronize the loading of the `LLM`.
    # `_BridgeThread` will be blocked until `_GenerationProcess` has called the
    # `load_llm_fn` and the `LLM` has been loaded.
    self._load_llm_sem = mp.Semaphore(0)

    # This thread will create text generation requests
    self.pending_text_generation_request: Dict[int, _TextGenerationRequest] = {}
    self.text_generation_request_count = 0
    self.text_generation_request_ids_queue: queue.Queue[int] = queue.Queue()

    # Queues for the communication between the `_BridgeThread` and the `_GenerationProcess`
    self._call_queue = mp.Queue()
    self._result_queue = mp.Queue()

    # Shared memory object for transfering the `model_name` to the main process
    # once the `LLM` is loaded
    self._model_name = mp.Array(c_char, MAX_MODEL_NAME_LENGTH)

generate(inputs, num_generations=1, progress_callback_func=None)

Generates the outputs for the given inputs using the ProcessLLM and its loaded LLM.

Parameters:

Name Type Description Default
inputs List[Dict[str, Any]]

the inputs to be used for generation.

required
num_generations int

the number of generations to be performed for each input. Defaults to 1.

1
progress_callback_func Union[Callable, None]

a function to be called at each generation step. Defaults to None.

None

Returns:

Type Description
Future[List[List['LLMOutput']]]

Future[List[List["LLMOutput"]]]: the generated outputs as a Future.

Source code in src/distilabel/llm/base.py
def generate(
    self,
    inputs: List[Dict[str, Any]],
    num_generations: int = 1,
    progress_callback_func: Union[Callable, None] = None,
) -> Future[List[List["LLMOutput"]]]:
    """Generates the outputs for the given inputs using the `ProcessLLM` and its loaded
    `LLM`.

    Args:
        inputs (List[Dict[str, Any]]): the inputs to be used for generation.
        num_generations (int, optional): the number of generations to be performed for each input.
            Defaults to `1`.
        progress_callback_func (Union[Callable, None], optional): a function to be called at each
            generation step. Defaults to `None`.

    Returns:
        Future[List[List["LLMOutput"]]]: the generated outputs as a `Future`.
    """
    self._start_bridge_thread()
    return self._add_text_generation_request(
        inputs, num_generations, progress_callback_func
    )

teardown()

Stops the bridge thread and the generation process.

Source code in src/distilabel/llm/base.py
def teardown(self) -> None:
    """Stops the bridge thread and the generation process."""
    if self._generation_process is not None:
        self._generation_process.stop()
        self._generation_process.join()

    if self._bridge_thread is not None:
        self._bridge_thread.stop()
        self._bridge_thread.join()

TogetherInferenceLLM

Bases: LLM

Source code in src/distilabel/llm/together.py
class TogetherInferenceLLM(LLM):
    def __init__(
        self,
        task: "Task",
        model: str,
        api_key: Union[str, None] = None,
        max_new_tokens: int = 128,
        repetition_penalty: float = 1.0,
        temperature: float = 1.0,
        top_p: float = 1.0,
        top_k: int = 1,
        stop: Union[List[str], None] = None,
        logprobs: int = 0,
        num_threads: Union[int, None] = None,
        prompt_format: Union["SupportedFormats", None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """Initializes the OpenAILLM class.

        Args:
            task (Task): the task to be performed by the LLM.
            model (str): the model to be used for generation.
            max_new_tokens (int, optional): the maximum number of tokens to be generated.
                Defaults to 128.
            temperature (float, optional): the temperature to be used for generation. From the Together
                Inference docs: "A decimal number that determines the degree of randomness in the response.
                A value of 0 will always yield the same output. A temperature much less than 1 favors more
                correctness and is appropriate for question answering or summarization. A value approaching
                1 introduces more randomness in the output.". Defaults to 1.0.
            repetition_penalty (float, optional): the repetition penalty to be used for generation. From the
                Together Inference docs: "Controls the diversity of generated text by reducing the likelihood
                of repeated sequences. Higher values decrease repetition.". Defaults to 1.0.
            top_p (float, optional): the top-p value to be used for generation. From the Together
                Inference docs: "used to dynamically adjust the number of choices for each predicted
                token based on the cumulative probabilities. It specifies a probability threshold,
                below which all less likely tokens are filtered out. This technique helps to maintain
                diversity and generate more fluent and natural-sounding text.". Defaults to 1.0.
            top_k (int, optional): the top-k value to be used for generation. From the Together Inference
                docs: "used to limit the number of choices for the next predicted word or token. It specifies
                the maximum number of tokens to consider at each step, based on their probability of occurrence.
                This technique helps to speed up the generation process and can improve the quality of the
                generated text by focusing on the most likely options.". Defaults to 1.
            stop (List[str], optional): strings to delimitate the generation process, so that when the
                model generates any of the provided characters, the generation process is considered completed.
                Defaults to None.
            logprobs (int, optional): the number of logprobs to be returned for each token. From the
                Together Inference docs: "An integer that specifies how many top token log probabilities
                are included in the response for each token generation step.". Defaults to None.
            num_threads (Union[int, None], optional): the number of threads to be used
                for parallel generation. If `None`, no parallel generation will be performed.
                Defaults to `None`.
            prompt_format (Union[SupportedFormats, None], optional): the format to be used
                for the prompt. If `None`, the default format of the task will be used, available
                formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
                but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
                will be used if no `prompt_formatting_fn` is provided.
            prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
                applied to the prompt before generation. If `None`, no formatting will be applied.
                Defaults to `None`.

        Raises:
            AssertionError: if the provided `model` is not available in Together Inference.

        Examples:
            >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
            >>> from distilabel.llm import TogetherInferenceLLM
            >>> task = Task()
            >>> llm = TogetherInferenceLLM(model="togethercomputer/llama-2-7b", task=task, prompt_format="llama2")
        """
        if not _TOGETHER_AVAILABLE:
            raise ImportError(
                "`TogetherInferenceLLM` cannot be used as `together` is not installed, please "
                " install it with `pip install together`."
            )

        together.api_key = api_key or os.getenv("TOGETHER_API_KEY", None)
        if together.api_key is None:
            raise ValueError(
                "No `api_key` provided, please provide one or set the `TOGETHER_API_KEY` "
                "environment variable."
            )

        super().__init__(
            task=task,
            num_threads=num_threads,
            prompt_format=prompt_format,
            prompt_formatting_fn=prompt_formatting_fn,
        )

        assert (
            model in self.available_models
        ), f"Provided `model` is not available in Together Inference, available models are {self.available_models}"
        self.model = model

        self.max_new_tokens = max_new_tokens
        self.temperature = temperature
        self.top_p = top_p
        self.top_k = top_k
        self.repetition_penalty = repetition_penalty
        self.stop = stop
        self.logprobs = logprobs

    def __rich_repr__(self) -> Generator[Any, None, None]:
        yield from super().__rich_repr__()
        yield (
            "parameters",
            {
                "max_new_tokens": self.max_new_tokens,
                "temperature": self.temperature,
                "repetition_penalty": self.repetition_penalty,
                "top_p": self.top_p,
                "top_k": self.top_k,
                "stop": self.stop,
                "logprobs": self.logprobs,
            },
        )

    @cached_property
    def available_models(self) -> List[str]:
        """Returns the list of available models in Together Inference."""
        # TODO: exclude the image models
        return [model["name"] for model in together.Models.list()]

    @property
    def model_name(self) -> str:
        """Returns the name of the Together Inference model."""
        return self.model

    def _generate_single_output(self, prompt: str) -> LLMOutput:
        """Runs the Together Inference text generation function over a single prompt
        producing a single `LLMOutput`.

        Args:
            prompt (str): the formatted prompt to be provided to the Together Inference
                endpoint.

        Raises:
            RuntimeError: raised if the Together Inference endpoint fails.
        """
        try:
            output = together.Complete.create(
                prompt=prompt,
                model=self.model,
                max_tokens=self.max_new_tokens,
                stop=self.stop,
                temperature=self.temperature,
                top_k=self.top_k,
                top_p=self.top_p,
                repetition_penalty=self.repetition_penalty,
                logprobs=self.logprobs,
            )
        except Exception as e:
            raise RuntimeError(
                f"Together Inference generation failed with exception: {e}"
            ) from e

        if output["output"]["choices"] is None or len(output["output"]["choices"]) < 1:  # type: ignore
            raise RuntimeError("Together Inference generation returned no generations.")

        choice = output["output"]["choices"][0]  # type: ignore
        try:
            parsed_response = self.task.parse_output(choice["text"].strip())
        except Exception as e:
            logger.error(f"Error parsing Together Inference response: {e}")
            parsed_response = None

        return LLMOutput(
            model_name=self.model_name,
            prompt_used=prompt,
            raw_output=choice["text"] or None,
            parsed_output=parsed_response,
        )

    def _generate(
        self,
        inputs: List[Dict[str, Any]],
        num_generations: int = 1,
    ) -> List[List[LLMOutput]]:
        """Generates `num_generations` for each input in `inputs`.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            num_generations (int, optional): the number of generations to be performed for each
                input. Defaults to 1.

        Returns:
            List[List[LLMOutput]]: the generated outputs.
        """
        prompts = self._generate_prompts(inputs, default_format=None)
        outputs = []
        for prompt in prompts:
            outputs.append(
                [self._generate_single_output(prompt) for _ in range(num_generations)]
            )
        return outputs

available_models: List[str] cached property

Returns the list of available models in Together Inference.

model_name: str property

Returns the name of the Together Inference model.

__init__(task, model, api_key=None, max_new_tokens=128, repetition_penalty=1.0, temperature=1.0, top_p=1.0, top_k=1, stop=None, logprobs=0, num_threads=None, prompt_format=None, prompt_formatting_fn=None)

Initializes the OpenAILLM class.

Parameters:

Name Type Description Default
task Task

the task to be performed by the LLM.

required
model str

the model to be used for generation.

required
max_new_tokens int

the maximum number of tokens to be generated. Defaults to 128.

128
temperature float

the temperature to be used for generation. From the Together Inference docs: "A decimal number that determines the degree of randomness in the response. A value of 0 will always yield the same output. A temperature much less than 1 favors more correctness and is appropriate for question answering or summarization. A value approaching 1 introduces more randomness in the output.". Defaults to 1.0.

1.0
repetition_penalty float

the repetition penalty to be used for generation. From the Together Inference docs: "Controls the diversity of generated text by reducing the likelihood of repeated sequences. Higher values decrease repetition.". Defaults to 1.0.

1.0
top_p float

the top-p value to be used for generation. From the Together Inference docs: "used to dynamically adjust the number of choices for each predicted token based on the cumulative probabilities. It specifies a probability threshold, below which all less likely tokens are filtered out. This technique helps to maintain diversity and generate more fluent and natural-sounding text.". Defaults to 1.0.

1.0
top_k int

the top-k value to be used for generation. From the Together Inference docs: "used to limit the number of choices for the next predicted word or token. It specifies the maximum number of tokens to consider at each step, based on their probability of occurrence. This technique helps to speed up the generation process and can improve the quality of the generated text by focusing on the most likely options.". Defaults to 1.

1
stop List[str]

strings to delimitate the generation process, so that when the model generates any of the provided characters, the generation process is considered completed. Defaults to None.

None
logprobs int

the number of logprobs to be returned for each token. From the Together Inference docs: "An integer that specifies how many top token log probabilities are included in the response for each token generation step.". Defaults to None.

0
num_threads Union[int, None]

the number of threads to be used for parallel generation. If None, no parallel generation will be performed. Defaults to None.

None
prompt_format Union[SupportedFormats, None]

the format to be used for the prompt. If None, the default format of the task will be used, available formats are openai, chatml, llama2, zephyr, and default. Defaults to None, but default (concatenation of system_prompt and formatted_prompt with a line-break) will be used if no prompt_formatting_fn is provided.

None
prompt_formatting_fn Union[Callable[..., str], None]

a function to be applied to the prompt before generation. If None, no formatting will be applied. Defaults to None.

None

Raises:

Type Description
AssertionError

if the provided model is not available in Together Inference.

Examples:

>>> from distilabel.tasks.text_generation import TextGenerationTask as Task
>>> from distilabel.llm import TogetherInferenceLLM
>>> task = Task()
>>> llm = TogetherInferenceLLM(model="togethercomputer/llama-2-7b", task=task, prompt_format="llama2")
Source code in src/distilabel/llm/together.py
def __init__(
    self,
    task: "Task",
    model: str,
    api_key: Union[str, None] = None,
    max_new_tokens: int = 128,
    repetition_penalty: float = 1.0,
    temperature: float = 1.0,
    top_p: float = 1.0,
    top_k: int = 1,
    stop: Union[List[str], None] = None,
    logprobs: int = 0,
    num_threads: Union[int, None] = None,
    prompt_format: Union["SupportedFormats", None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """Initializes the OpenAILLM class.

    Args:
        task (Task): the task to be performed by the LLM.
        model (str): the model to be used for generation.
        max_new_tokens (int, optional): the maximum number of tokens to be generated.
            Defaults to 128.
        temperature (float, optional): the temperature to be used for generation. From the Together
            Inference docs: "A decimal number that determines the degree of randomness in the response.
            A value of 0 will always yield the same output. A temperature much less than 1 favors more
            correctness and is appropriate for question answering or summarization. A value approaching
            1 introduces more randomness in the output.". Defaults to 1.0.
        repetition_penalty (float, optional): the repetition penalty to be used for generation. From the
            Together Inference docs: "Controls the diversity of generated text by reducing the likelihood
            of repeated sequences. Higher values decrease repetition.". Defaults to 1.0.
        top_p (float, optional): the top-p value to be used for generation. From the Together
            Inference docs: "used to dynamically adjust the number of choices for each predicted
            token based on the cumulative probabilities. It specifies a probability threshold,
            below which all less likely tokens are filtered out. This technique helps to maintain
            diversity and generate more fluent and natural-sounding text.". Defaults to 1.0.
        top_k (int, optional): the top-k value to be used for generation. From the Together Inference
            docs: "used to limit the number of choices for the next predicted word or token. It specifies
            the maximum number of tokens to consider at each step, based on their probability of occurrence.
            This technique helps to speed up the generation process and can improve the quality of the
            generated text by focusing on the most likely options.". Defaults to 1.
        stop (List[str], optional): strings to delimitate the generation process, so that when the
            model generates any of the provided characters, the generation process is considered completed.
            Defaults to None.
        logprobs (int, optional): the number of logprobs to be returned for each token. From the
            Together Inference docs: "An integer that specifies how many top token log probabilities
            are included in the response for each token generation step.". Defaults to None.
        num_threads (Union[int, None], optional): the number of threads to be used
            for parallel generation. If `None`, no parallel generation will be performed.
            Defaults to `None`.
        prompt_format (Union[SupportedFormats, None], optional): the format to be used
            for the prompt. If `None`, the default format of the task will be used, available
            formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
            but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
            will be used if no `prompt_formatting_fn` is provided.
        prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
            applied to the prompt before generation. If `None`, no formatting will be applied.
            Defaults to `None`.

    Raises:
        AssertionError: if the provided `model` is not available in Together Inference.

    Examples:
        >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
        >>> from distilabel.llm import TogetherInferenceLLM
        >>> task = Task()
        >>> llm = TogetherInferenceLLM(model="togethercomputer/llama-2-7b", task=task, prompt_format="llama2")
    """
    if not _TOGETHER_AVAILABLE:
        raise ImportError(
            "`TogetherInferenceLLM` cannot be used as `together` is not installed, please "
            " install it with `pip install together`."
        )

    together.api_key = api_key or os.getenv("TOGETHER_API_KEY", None)
    if together.api_key is None:
        raise ValueError(
            "No `api_key` provided, please provide one or set the `TOGETHER_API_KEY` "
            "environment variable."
        )

    super().__init__(
        task=task,
        num_threads=num_threads,
        prompt_format=prompt_format,
        prompt_formatting_fn=prompt_formatting_fn,
    )

    assert (
        model in self.available_models
    ), f"Provided `model` is not available in Together Inference, available models are {self.available_models}"
    self.model = model

    self.max_new_tokens = max_new_tokens
    self.temperature = temperature
    self.top_p = top_p
    self.top_k = top_k
    self.repetition_penalty = repetition_penalty
    self.stop = stop
    self.logprobs = logprobs

TransformersLLM

Bases: LLM

Source code in src/distilabel/llm/huggingface/transformers.py
class TransformersLLM(LLM):
    def __init__(
        self,
        model: "PreTrainedModel",
        tokenizer: "PreTrainedTokenizer",
        task: "Task",
        max_new_tokens: int = 128,
        do_sample: bool = False,
        temperature: float = 1.0,
        top_k: int = 50,
        top_p: float = 1.0,
        typical_p: float = 1.0,
        num_threads: Union[int, None] = None,
        prompt_format: Union["SupportedFormats", None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """Initializes the TransformersLLM class.

        Args:
            model (PreTrainedModel): the model to be used for generation.
            tokenizer (PreTrainedTokenizer): the tokenizer to be used for generation.
            task (Task): the task to be performed by the LLM.
            max_new_tokens (int, optional): the maximum number of tokens to be generated.
                Defaults to 128.
            do_sample (bool, optional): whether to sample from the model or not.
                Defaults to False.
            temperature (float, optional): the temperature to be used for generation.
                Defaults to 1.0.
            top_k (int, optional): the top-k value to be used for generation.
                Defaults to 50.
            top_p (float, optional): the top-p value to be used for generation.
                Defaults to 1.0.
            typical_p (float, optional): the typical-p value to be used for generation.
                Defaults to 1.0.
            num_threads (Union[int, None], optional): the number of threads to be used for generation.
                If `None`, the number of threads will be set to the number of available CPUs.
                Defaults to `None`.
            prompt_format (Union[SupportedFormats, None], optional): the format to be used
                for formatting the prompts. If `None`, the prompts will not be formatted.
                Defaults to `None`.
            prompt_formatting_fn (Union[Callable[..., str], None], optional): the function to be used
                for formatting the prompts. If `None`, the prompts will not be formatted.

        Examples:
            >>> from transformers import AutoModelForCausalLM, AutoTokenizer
            >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
            >>> from distilabel.llm import TransformersLLM
            >>> model = AutoModelForCausalLM.from_pretrained("gpt2")
            >>> tokenizer = AutoTokenizer.from_pretrained("gpt2")
            >>> task = Task()
            >>> llm = TransformersLLM(
            ...     model=model,
            ...     tokenizer=tokenizer,
            ...     task=task,
            ... )
        """
        super().__init__(
            task=task,
            num_threads=num_threads,
            prompt_format=prompt_format,
            prompt_formatting_fn=prompt_formatting_fn,
        )

        self.max_new_tokens = max_new_tokens
        self.do_sample = do_sample
        self.temperature = temperature
        self.top_k = top_k
        self.top_p = top_p
        self.typical_p = typical_p

        self.model = model
        self.tokenizer = tokenizer

        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        if (
            hasattr(self.tokenizer, "use_default_system_prompt")
            and self.tokenizer.use_default_system_prompt  # type: ignore
        ):
            # The `tokenizer` also has a method named `apply_chat_template` that expects a `Conversation` as OpenAI does with the ChatML format
            warnings.warn(
                "The provided `tokenizer` has `use_default_system_prompt=True` which means that the default system prompt will be used, which may collide with the `task` provided as an arg to this class.",
                UserWarning,
                stacklevel=2,
            )

    def __rich_repr__(self) -> Generator[Any, None, None]:
        yield from super().__rich_repr__()
        yield (
            "parameters",
            {
                "max_new_tokens": self.max_new_tokens,
                "do_sample": self.do_sample,
                "temperature": self.temperature,
                "top_k": self.top_k,
                "top_p": self.top_p,
                "typical_p": self.typical_p,
            },
        )

    @property
    def model_name(self) -> str:
        """Returns the name of the Transformers model."""
        return self.model.config.name_or_path

    def _generate(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List[LLMOutput]]:
        """Generates `num_generations` for each input in `inputs`.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            num_generations (int, optional): the number of generations to be performed for each
                input. Defaults to 1.

        Returns:
            List[List[LLMOutput]]: the outputs of the LLM.
        """
        prompts = self._generate_prompts(inputs, default_format=None)
        encodings = self.tokenizer(prompts, padding=True, return_tensors="pt")
        encodings = encodings.to(self.model.device)
        with torch.inference_mode():
            generated_ids = self.model.generate(
                **encodings,  # type: ignore
                pad_token_id=self.tokenizer.eos_token_id,
                generation_config=GenerationConfig(
                    do_sample=self.do_sample,
                    temperature=self.temperature,
                    max_new_tokens=self.max_new_tokens,
                    top_k=self.top_k,
                    top_p=self.top_p,
                    typical_p=self.typical_p,
                    num_return_sequences=num_generations,
                ),
            )
        raw_outputs = self.tokenizer.batch_decode(
            generated_ids[:, encodings.input_ids.shape[1] :],
            skip_special_tokens=True,
            clean_up_tokenization_spaces=True,
        )
        outputs = []
        for prompt, i in zip(prompts, range(0, len(raw_outputs), num_generations)):
            output = []
            for raw_output in raw_outputs[i : i + num_generations]:
                try:
                    parsed_output = self.task.parse_output(raw_output)
                except Exception as e:
                    logger.error(f"Error parsing Transformers output: {e}")
                    parsed_output = None
                output.append(
                    LLMOutput(
                        model_name=self.model_name,
                        prompt_used=prompt,
                        raw_output=raw_output,
                        parsed_output=parsed_output,
                    )
                )
            outputs.append(output)
        return outputs

model_name: str property

Returns the name of the Transformers model.

__init__(model, tokenizer, task, max_new_tokens=128, do_sample=False, temperature=1.0, top_k=50, top_p=1.0, typical_p=1.0, num_threads=None, prompt_format=None, prompt_formatting_fn=None)

Initializes the TransformersLLM class.

Parameters:

Name Type Description Default
model PreTrainedModel

the model to be used for generation.

required
tokenizer PreTrainedTokenizer

the tokenizer to be used for generation.

required
task Task

the task to be performed by the LLM.

required
max_new_tokens int

the maximum number of tokens to be generated. Defaults to 128.

128
do_sample bool

whether to sample from the model or not. Defaults to False.

False
temperature float

the temperature to be used for generation. Defaults to 1.0.

1.0
top_k int

the top-k value to be used for generation. Defaults to 50.

50
top_p float

the top-p value to be used for generation. Defaults to 1.0.

1.0
typical_p float

the typical-p value to be used for generation. Defaults to 1.0.

1.0
num_threads Union[int, None]

the number of threads to be used for generation. If None, the number of threads will be set to the number of available CPUs. Defaults to None.

None
prompt_format Union[SupportedFormats, None]

the format to be used for formatting the prompts. If None, the prompts will not be formatted. Defaults to None.

None
prompt_formatting_fn Union[Callable[..., str], None]

the function to be used for formatting the prompts. If None, the prompts will not be formatted.

None

Examples:

>>> from transformers import AutoModelForCausalLM, AutoTokenizer
>>> from distilabel.tasks.text_generation import TextGenerationTask as Task
>>> from distilabel.llm import TransformersLLM
>>> model = AutoModelForCausalLM.from_pretrained("gpt2")
>>> tokenizer = AutoTokenizer.from_pretrained("gpt2")
>>> task = Task()
>>> llm = TransformersLLM(
...     model=model,
...     tokenizer=tokenizer,
...     task=task,
... )
Source code in src/distilabel/llm/huggingface/transformers.py
def __init__(
    self,
    model: "PreTrainedModel",
    tokenizer: "PreTrainedTokenizer",
    task: "Task",
    max_new_tokens: int = 128,
    do_sample: bool = False,
    temperature: float = 1.0,
    top_k: int = 50,
    top_p: float = 1.0,
    typical_p: float = 1.0,
    num_threads: Union[int, None] = None,
    prompt_format: Union["SupportedFormats", None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """Initializes the TransformersLLM class.

    Args:
        model (PreTrainedModel): the model to be used for generation.
        tokenizer (PreTrainedTokenizer): the tokenizer to be used for generation.
        task (Task): the task to be performed by the LLM.
        max_new_tokens (int, optional): the maximum number of tokens to be generated.
            Defaults to 128.
        do_sample (bool, optional): whether to sample from the model or not.
            Defaults to False.
        temperature (float, optional): the temperature to be used for generation.
            Defaults to 1.0.
        top_k (int, optional): the top-k value to be used for generation.
            Defaults to 50.
        top_p (float, optional): the top-p value to be used for generation.
            Defaults to 1.0.
        typical_p (float, optional): the typical-p value to be used for generation.
            Defaults to 1.0.
        num_threads (Union[int, None], optional): the number of threads to be used for generation.
            If `None`, the number of threads will be set to the number of available CPUs.
            Defaults to `None`.
        prompt_format (Union[SupportedFormats, None], optional): the format to be used
            for formatting the prompts. If `None`, the prompts will not be formatted.
            Defaults to `None`.
        prompt_formatting_fn (Union[Callable[..., str], None], optional): the function to be used
            for formatting the prompts. If `None`, the prompts will not be formatted.

    Examples:
        >>> from transformers import AutoModelForCausalLM, AutoTokenizer
        >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
        >>> from distilabel.llm import TransformersLLM
        >>> model = AutoModelForCausalLM.from_pretrained("gpt2")
        >>> tokenizer = AutoTokenizer.from_pretrained("gpt2")
        >>> task = Task()
        >>> llm = TransformersLLM(
        ...     model=model,
        ...     tokenizer=tokenizer,
        ...     task=task,
        ... )
    """
    super().__init__(
        task=task,
        num_threads=num_threads,
        prompt_format=prompt_format,
        prompt_formatting_fn=prompt_formatting_fn,
    )

    self.max_new_tokens = max_new_tokens
    self.do_sample = do_sample
    self.temperature = temperature
    self.top_k = top_k
    self.top_p = top_p
    self.typical_p = typical_p

    self.model = model
    self.tokenizer = tokenizer

    if self.tokenizer.pad_token is None:
        self.tokenizer.pad_token = self.tokenizer.eos_token
    if (
        hasattr(self.tokenizer, "use_default_system_prompt")
        and self.tokenizer.use_default_system_prompt  # type: ignore
    ):
        # The `tokenizer` also has a method named `apply_chat_template` that expects a `Conversation` as OpenAI does with the ChatML format
        warnings.warn(
            "The provided `tokenizer` has `use_default_system_prompt=True` which means that the default system prompt will be used, which may collide with the `task` provided as an arg to this class.",
            UserWarning,
            stacklevel=2,
        )

VertexAIEndpointLLM

Bases: LLM

An LLM which uses a Vertex AI Online prediction endpoint for the generation.

More information about Vertex AI Endpoints can be found here:

- https://cloud.google.com/vertex-ai/docs/general/deployment#deploy_a_model_to_an_endpoint
Source code in src/distilabel/llm/google/vertexai.py
class VertexAIEndpointLLM(LLM):
    """An `LLM` which uses a Vertex AI Online prediction endpoint for the generation.

    More information about Vertex AI Endpoints can be found here:

        - https://cloud.google.com/vertex-ai/docs/general/deployment#deploy_a_model_to_an_endpoint
    """

    def __init__(
        self,
        task: "Task",
        endpoint_id: str,
        project: Optional[str] = None,
        location: str = "us-central1",
        generation_kwargs: Optional[Dict[str, Any]] = None,
        prompt_argument: str = "prompt",
        num_generations_argument: str = "n",
        num_threads: Union[int, None] = None,
        prompt_format: Union["SupportedFormats", None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """Initializes the `VertexAIEndpointLLM` class.

        Args:
            task (Task): the task to be performed by the LLM.
            endpoint_id (str): the ID of the Vertex AI endpoint to be used for generation.
            project (Optional[str], optional): the project to be used for generation. If `None`,
                the default project will be used. Defaults to `None`.
            location (str, optional): the location of the Vertex AI endpoint to be used for
                generation. Defaults to "us-central1".
            generation_kwargs (Optional[Dict[str, Any]], optional): the generation parameters
                to be used for generation. The name of the parameters will depend on the
                Docker image used to deploy the model to the Vertex AI endpoint. Defaults
                to `None`.
            prompt_argument (str, optional): the name of the Vertex AI Endpoint key to
                be used for the prompt. Defaults to "prompt".
            num_generations_argument (str, optional): the name of the Vertex AI Endpoint
                key to be used to specify the number of generations per prompt. Defaults
                to "n".
            num_threads (Union[int, None], optional): the number of threads to be used
                for parallel generation. If `None`, no parallel generation will be performed.
                Defaults to `None`.
            prompt_format (Union[SupportedFormats, None], optional): the format to be used
                for the prompt. If `None`, the default format of the task will be used, available
                formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
                but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
                will be used if no `prompt_formatting_fn` is provided.
            prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
                applied to the prompt before generation. If `None`, no formatting will be applied.
                Defaults to `None`.
        """
        super().__init__(
            task=task,
            num_threads=num_threads,
            prompt_format=prompt_format,
            prompt_formatting_fn=prompt_formatting_fn,
        )

        if not _VERTEXAI_AVAILABLE:
            raise ImportError(
                "`VertexAIEndpointLLM` cannot be used as `google-cloud-aiplatform` is not"
                " installed, please install it with `pip install google-cloud-aiplatform`"
            )

        if project is None:
            try:
                project = google.auth.default()[1]
            except DefaultCredentialsError as e:
                raise ValueError(
                    "No `project` was specified and no default credentials were found."
                ) from e

        if generation_kwargs is None:
            generation_kwargs = {}

        self.endpoint_id = endpoint_id
        self.project = project
        self.location = location
        self.generation_kwargs = generation_kwargs
        self.prompt_argument = prompt_argument
        self.num_generations_argument = num_generations_argument

        self.client = PredictionServiceClient(
            client_options=ClientOptions(
                api_endpoint=f"{self.location}-aiplatform.googleapis.com"
            )
        )

    @cached_property
    def model_name(self) -> str:
        """Returns the name of the model used for generation."""
        client = EndpointServiceClient(
            client_options=ClientOptions(
                api_endpoint=f"{self.location}-aiplatform.googleapis.com"
            )
        )
        endpoint = client.get_endpoint(name=self.endpoint_path)
        return endpoint.deployed_models[0].display_name

    @property
    def endpoint_path(self) -> str:
        """Returns the path of the Vertex AI endpoint to be used for generation."""
        return self.client.endpoint_path(
            project=self.project,  # type: ignore
            location=self.location,
            endpoint=self.endpoint_id,
        )

    @_vertexai_retry_decorator
    def _call_vertexai_endpoint(self, instances: List[Any]) -> Any:
        return self.client.predict(endpoint=self.endpoint_path, instances=instances)

    def _prepare_instances(
        self, prompts: List[str], num_generations: int
    ) -> List["Value"]:
        """Prepares the instances to be sent to the Vertex AI endpoint.

        Args:
            prompts (List[str]): the prompts to be used for generation.
            num_generations (int): the number of generations to be performed for each prompt.

        Returns:
            The instances to be sent to the Vertex AI endpoint.
        """
        instances = []
        for prompt in prompts:
            instance = json_format.ParseDict(
                {
                    self.prompt_argument: prompt,
                    self.num_generations_argument: num_generations,
                    **self.generation_kwargs,
                },
                Value(),
            )
            instances.append(instance)
        return instances

    def _single_output(self, instance: Any) -> List[LLMOutput]:
        try:
            # NOTE: `predict` method accepts a list of instances, but depending on the
            # deployed Docker image, it can just accept one instance.
            response = self._call_vertexai_endpoint(instances=[instance])
        except exceptions.InternalServerError as e:
            raise ValueError(
                "The Vertex AI endpoint returned 500 Internal Server Error. This is"
                " usually caused due to wrong generation parameters. Please check the"
                " `generation_parameters` and try again."
            ) from e

        output = []
        for prediction in response.predictions:
            # Vertex endpoint output is `Prompt:\n{{ model_prompt }}\nOutput:\n{{ model_output }}`
            # so we need to do a pre-parsing to remove the `Prompt:` and `Output:` parts.
            match = _PARSE_VERTEXAI_ENDPOINT_PREDICTION_REGEX.search(prediction)
            if not match:
                raise ValueError(
                    "Couldn't parse the response from the Vertex AI endpoint."
                )

            model_output = match.group(1).strip()

            try:
                parsed_output = self.task.parse_output(model_output)
            except Exception as e:
                logger.error(f"Error parsing Vertex AI endpoint model response: {e}")
                parsed_output = None
            output.append(
                LLMOutput(
                    model_name=self.model_name,
                    prompt_used=instance.struct_value[self.prompt_argument],
                    raw_output=model_output,
                    parsed_output=parsed_output,
                )
            )
        return output

    def _generate(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List["LLMOutput"]]:
        prompts = self._generate_prompts(inputs)
        instances = self._prepare_instances(
            prompts=prompts, num_generations=num_generations
        )
        return [self._single_output(instance) for instance in instances]

endpoint_path: str property

Returns the path of the Vertex AI endpoint to be used for generation.

model_name: str cached property

Returns the name of the model used for generation.

__init__(task, endpoint_id, project=None, location='us-central1', generation_kwargs=None, prompt_argument='prompt', num_generations_argument='n', num_threads=None, prompt_format=None, prompt_formatting_fn=None)

Initializes the VertexAIEndpointLLM class.

Parameters:

Name Type Description Default
task Task

the task to be performed by the LLM.

required
endpoint_id str

the ID of the Vertex AI endpoint to be used for generation.

required
project Optional[str]

the project to be used for generation. If None, the default project will be used. Defaults to None.

None
location str

the location of the Vertex AI endpoint to be used for generation. Defaults to "us-central1".

'us-central1'
generation_kwargs Optional[Dict[str, Any]]

the generation parameters to be used for generation. The name of the parameters will depend on the Docker image used to deploy the model to the Vertex AI endpoint. Defaults to None.

None
prompt_argument str

the name of the Vertex AI Endpoint key to be used for the prompt. Defaults to "prompt".

'prompt'
num_generations_argument str

the name of the Vertex AI Endpoint key to be used to specify the number of generations per prompt. Defaults to "n".

'n'
num_threads Union[int, None]

the number of threads to be used for parallel generation. If None, no parallel generation will be performed. Defaults to None.

None
prompt_format Union[SupportedFormats, None]

the format to be used for the prompt. If None, the default format of the task will be used, available formats are openai, chatml, llama2, zephyr, and default. Defaults to None, but default (concatenation of system_prompt and formatted_prompt with a line-break) will be used if no prompt_formatting_fn is provided.

None
prompt_formatting_fn Union[Callable[..., str], None]

a function to be applied to the prompt before generation. If None, no formatting will be applied. Defaults to None.

None
Source code in src/distilabel/llm/google/vertexai.py
def __init__(
    self,
    task: "Task",
    endpoint_id: str,
    project: Optional[str] = None,
    location: str = "us-central1",
    generation_kwargs: Optional[Dict[str, Any]] = None,
    prompt_argument: str = "prompt",
    num_generations_argument: str = "n",
    num_threads: Union[int, None] = None,
    prompt_format: Union["SupportedFormats", None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """Initializes the `VertexAIEndpointLLM` class.

    Args:
        task (Task): the task to be performed by the LLM.
        endpoint_id (str): the ID of the Vertex AI endpoint to be used for generation.
        project (Optional[str], optional): the project to be used for generation. If `None`,
            the default project will be used. Defaults to `None`.
        location (str, optional): the location of the Vertex AI endpoint to be used for
            generation. Defaults to "us-central1".
        generation_kwargs (Optional[Dict[str, Any]], optional): the generation parameters
            to be used for generation. The name of the parameters will depend on the
            Docker image used to deploy the model to the Vertex AI endpoint. Defaults
            to `None`.
        prompt_argument (str, optional): the name of the Vertex AI Endpoint key to
            be used for the prompt. Defaults to "prompt".
        num_generations_argument (str, optional): the name of the Vertex AI Endpoint
            key to be used to specify the number of generations per prompt. Defaults
            to "n".
        num_threads (Union[int, None], optional): the number of threads to be used
            for parallel generation. If `None`, no parallel generation will be performed.
            Defaults to `None`.
        prompt_format (Union[SupportedFormats, None], optional): the format to be used
            for the prompt. If `None`, the default format of the task will be used, available
            formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
            but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
            will be used if no `prompt_formatting_fn` is provided.
        prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
            applied to the prompt before generation. If `None`, no formatting will be applied.
            Defaults to `None`.
    """
    super().__init__(
        task=task,
        num_threads=num_threads,
        prompt_format=prompt_format,
        prompt_formatting_fn=prompt_formatting_fn,
    )

    if not _VERTEXAI_AVAILABLE:
        raise ImportError(
            "`VertexAIEndpointLLM` cannot be used as `google-cloud-aiplatform` is not"
            " installed, please install it with `pip install google-cloud-aiplatform`"
        )

    if project is None:
        try:
            project = google.auth.default()[1]
        except DefaultCredentialsError as e:
            raise ValueError(
                "No `project` was specified and no default credentials were found."
            ) from e

    if generation_kwargs is None:
        generation_kwargs = {}

    self.endpoint_id = endpoint_id
    self.project = project
    self.location = location
    self.generation_kwargs = generation_kwargs
    self.prompt_argument = prompt_argument
    self.num_generations_argument = num_generations_argument

    self.client = PredictionServiceClient(
        client_options=ClientOptions(
            api_endpoint=f"{self.location}-aiplatform.googleapis.com"
        )
    )

VertexAILLM

Bases: LLM

An LLM which allows to use Google's proprietary models from the Vertex AI APIs:

  • Gemini API: https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/gemini
  • Codey API: https://cloud.google.com/vertex-ai/docs/generative-ai/code/code-models-overview
  • Text API: https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/text

To use the VertexAILLM is necessary to have configured the Google Cloud authentication using one of these methods:

  • Setting GOOGLE_CLOUD_CREDENTIALS environment variable
  • Using gcloud auth application-default login command
  • Using vertexai.init function from the google-cloud-aiplatform library
Source code in src/distilabel/llm/google/vertexai.py
class VertexAILLM(LLM):
    """An `LLM` which allows to use Google's proprietary models from the Vertex AI APIs:

    - Gemini API: https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/gemini
    - Codey API: https://cloud.google.com/vertex-ai/docs/generative-ai/code/code-models-overview
    - Text API: https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/text

    To use the `VertexAILLM` is necessary to have configured the Google Cloud authentication
    using one of these methods:

    - Setting `GOOGLE_CLOUD_CREDENTIALS` environment variable
    - Using `gcloud auth application-default login` command
    - Using `vertexai.init` function from the `google-cloud-aiplatform` library
    """

    def __init__(
        self,
        task: "Task",
        model: str = "gemini-pro",
        temperature: Optional[float] = None,
        top_p: Optional[float] = None,
        top_k: Optional[int] = None,
        max_new_tokens: int = 128,
        stop_sequences: Optional[List[str]] = None,
        num_threads: Union[int, None] = None,
    ) -> None:
        """Initializes the `VertexGenerativeModelLLM` class.

        Args:
            task (Task): the task to be performed by the LLM.
            model (str, optional): the model to be used for generation. Defaults to "gemini-pro".
            temperature (float, optional): the temperature to be used for generation.
                Defaults to 1.0.
            top_p (float, optional): the top-p value to be used for generation.
                Defaults to 1.0.
            top_k (int, optional): the top-k value to be used for generation.
                Defaults to 40.
            max_new_tokens (int, optional): the maximum number of tokens to be generated.
                Defaults to 128.
            num_threads (Union[int, None], optional): the number of threads to be used
                for parallel generation. If `None`, no parallel generation will be performed.
                Defaults to `None`.
        """
        super().__init__(task=task, num_threads=num_threads)

        if not _VERTEXAI_AVAILABLE:
            raise ImportError(
                "`VertexAILLM` cannot be used as `google-cloud-aiplatform` is not installed,"
                " please install it with `pip install google-cloud-aiplatform`"
            )

        self.temperature = temperature
        self.top_p = top_p
        self.top_k = top_k
        self.max_output_tokens = max_new_tokens
        self.stop_sequences = stop_sequences

        if is_gemini_model(model):
            self.model = GenerativeModel(model)
        elif is_codey_model(model):
            self.model = CodeGenerationModel.from_pretrained(model)
        else:
            self.model = TextGenerationModel.from_pretrained(model)

    @property
    def model_name(self) -> str:
        """Returns the name of the model used for generation."""
        if isinstance(self.model, GenerativeModel):
            return self.model._model_name

        return self.model._model_id

    def _generate_contents(self, prompts: List[str]) -> List[List[Dict[str, Any]]]:
        """Generates a list of valid dicts that can be parsed to `vertexai.preview.generative_models.Content`
        objects for each input.

        Args:
            prompts (List[str]): the prompts to be used for generation.

        Returns:
            List[List[Dict[str, Any]]]: the list of valid `vertexai.preview.generative_models.Content`
                objects.
        """
        return [[{"role": "user", "parts": [{"text": prompt}]}] for prompt in prompts]

    @_vertexai_retry_decorator
    def _call_generative_model_with_backoff(
        self, contents: List[Dict[str, Any]], **kwargs: Any
    ) -> "GenerationResponse":
        return self.model.generate_content(  # type: ignore
            contents=contents,
            # TODO: update `candidate_count` to have `num_generations` as value once valid range is not [1, 2)
            generation_config=GenerationConfig(candidate_count=1, **kwargs),
        )

    def _generative_model_single_output(
        self, contents: List[Dict[str, Any]]
    ) -> LLMOutput:
        raw_output = None
        try:
            response = self._call_generative_model_with_backoff(
                contents=contents,
                temperature=self.temperature,
                top_p=self.top_p,
                top_k=self.top_k,
                max_output_tokens=self.max_output_tokens,
                stop_sequences=self.stop_sequences,
            )
            raw_output = response.text
            parsed_output = self.task.parse_output(raw_output)
        except ValueError as e:
            logger.error(f"Vertex AI Gemini API model didn't return content: {e}")
            return LLMOutput(
                model_name=self.model_name,
                prompt_used=contents,
                raw_output=None,
                parsed_output=None,
            )
        except Exception as e:
            logger.error(f"Error parsing Vertex AI Gemini API model response: {e}")
            parsed_output = None

        return LLMOutput(
            model_name=self.model_name,
            prompt_used=contents,
            raw_output=raw_output,
            parsed_output=parsed_output,
        )

    def _generate_with_generative_model(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List["LLMOutput"]]:
        """Generate `num_generations` for each input in `inputs` using a Vertex AI Gemini
        API model."""
        prompts = self._generate_prompts(inputs, default_format="default")
        inputs_contents = self._generate_contents(prompts)
        outputs = []
        for contents in inputs_contents:
            output = []
            # TODO: remove this for-loop once `GenerationConfig.candidate_count` valid range is not [1, 2)
            for _ in range(num_generations):
                output.append(self._generative_model_single_output(contents=contents))
            outputs.append(output)
        return outputs

    @_vertexai_retry_decorator
    def _call_text_generation_model(
        self, **kwargs: Any
    ) -> "MultiCandidateTextGenerationResponse":
        return self.model.predict(**kwargs)  # type: ignore

    def _text_generation_model_single_output(
        self, prompt: str, num_generations: int
    ) -> List[LLMOutput]:
        response = self._call_text_generation_model(
            prompt=prompt,
            max_output_tokens=self.max_output_tokens,
            temperature=self.temperature,
            top_k=self.top_k,
            top_p=self.top_p,
            stop_sequences=self.stop_sequences,
            # WARNING: The model can return < `candidate_count` generations depending
            # on the generation parameters and the input.
            candidate_count=num_generations,
        )

        output = []
        for candidate in response.candidates:
            try:
                parsed_response = self.task.parse_output(candidate.text)
            except Exception as e:
                logger.error(
                    f"Error parsing Vertex AI Text/Code API model response: {e}"
                )
                parsed_response = None

            output.append(
                LLMOutput(
                    model_name=self.model_name,
                    prompt_used=prompt,
                    raw_output=candidate.text,
                    parsed_output=parsed_response,
                )
            )
        return output

    def _generate_with_text_generation_model(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List["LLMOutput"]]:
        """Generate `num_generations` for each input in `inputs` using a Vertex AI Text/Code
        API model."""
        prompts = self._generate_prompts(inputs, default_format="default")
        outputs = []
        for prompt in prompts:
            outputs.append(
                self._text_generation_model_single_output(prompt, num_generations)
            )
        return outputs

    def _generate(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List["LLMOutput"]]:
        if isinstance(self.model, GenerativeModel):
            return self._generate_with_generative_model(inputs, num_generations)

        return self._generate_with_text_generation_model(inputs, num_generations)

model_name: str property

Returns the name of the model used for generation.

__init__(task, model='gemini-pro', temperature=None, top_p=None, top_k=None, max_new_tokens=128, stop_sequences=None, num_threads=None)

Initializes the VertexGenerativeModelLLM class.

Parameters:

Name Type Description Default
task Task

the task to be performed by the LLM.

required
model str

the model to be used for generation. Defaults to "gemini-pro".

'gemini-pro'
temperature float

the temperature to be used for generation. Defaults to 1.0.

None
top_p float

the top-p value to be used for generation. Defaults to 1.0.

None
top_k int

the top-k value to be used for generation. Defaults to 40.

None
max_new_tokens int

the maximum number of tokens to be generated. Defaults to 128.

128
num_threads Union[int, None]

the number of threads to be used for parallel generation. If None, no parallel generation will be performed. Defaults to None.

None
Source code in src/distilabel/llm/google/vertexai.py
def __init__(
    self,
    task: "Task",
    model: str = "gemini-pro",
    temperature: Optional[float] = None,
    top_p: Optional[float] = None,
    top_k: Optional[int] = None,
    max_new_tokens: int = 128,
    stop_sequences: Optional[List[str]] = None,
    num_threads: Union[int, None] = None,
) -> None:
    """Initializes the `VertexGenerativeModelLLM` class.

    Args:
        task (Task): the task to be performed by the LLM.
        model (str, optional): the model to be used for generation. Defaults to "gemini-pro".
        temperature (float, optional): the temperature to be used for generation.
            Defaults to 1.0.
        top_p (float, optional): the top-p value to be used for generation.
            Defaults to 1.0.
        top_k (int, optional): the top-k value to be used for generation.
            Defaults to 40.
        max_new_tokens (int, optional): the maximum number of tokens to be generated.
            Defaults to 128.
        num_threads (Union[int, None], optional): the number of threads to be used
            for parallel generation. If `None`, no parallel generation will be performed.
            Defaults to `None`.
    """
    super().__init__(task=task, num_threads=num_threads)

    if not _VERTEXAI_AVAILABLE:
        raise ImportError(
            "`VertexAILLM` cannot be used as `google-cloud-aiplatform` is not installed,"
            " please install it with `pip install google-cloud-aiplatform`"
        )

    self.temperature = temperature
    self.top_p = top_p
    self.top_k = top_k
    self.max_output_tokens = max_new_tokens
    self.stop_sequences = stop_sequences

    if is_gemini_model(model):
        self.model = GenerativeModel(model)
    elif is_codey_model(model):
        self.model = CodeGenerationModel.from_pretrained(model)
    else:
        self.model = TextGenerationModel.from_pretrained(model)

vLLM

Bases: LLM

Source code in src/distilabel/llm/vllm.py
class vLLM(LLM):
    def __init__(
        self,
        vllm: "_vLLM",
        task: "Task",
        max_new_tokens: int = 128,
        presence_penalty: float = 0.0,
        frequency_penalty: float = 0.0,
        temperature: float = 1.0,
        top_p: float = 1.0,
        top_k: int = -1,
        prompt_format: Union["SupportedFormats", None] = None,
        prompt_formatting_fn: Union[Callable[..., str], None] = None,
    ) -> None:
        """Initializes the vLLM class.

        Args:
            vllm (_vLLM): the vLLM model to be used.
            task (Task): the task to be performed by the LLM.
            max_new_tokens (int, optional): the maximum number of tokens to be generated.
                Defaults to 128.
            presence_penalty (float, optional): the presence penalty to be used for generation.
                Defaults to 0.0.
            frequency_penalty (float, optional): the frequency penalty to be used for generation.
                Defaults to 0.0.
            temperature (float, optional): the temperature to be used for generation.
                Defaults to 1.0.
            top_p (float, optional): the top-p value to be used for generation.
                Defaults to 1.0.
            top_k (int, optional): the top-k value to be used for generation.
                Defaults to -1.
            prompt_format (Union[SupportedFormats, None], optional): the format to be used
                for the prompt. If `None`, the default format of the task will be used, available
                formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
                but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
                will be used if no `prompt_formatting_fn` is provided.
            prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
                applied to the prompt before generation. If `None`, no formatting will be applied.

        Examples:
            >>> from vllm import LLM
            >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
            >>> from distilabel.llm import vLLM
            >>> model = LLM(model="gpt2")
            >>> task = Task()
            >>> llm = vLLM(model=model, task=task)
        """
        super().__init__(
            task=task,
            prompt_format=prompt_format,
            prompt_formatting_fn=prompt_formatting_fn,
        )

        if not _VLLM_AVAILABLE:
            raise ImportError(
                "`vLLM` cannot be used as `vllm` is not installed, please "
                " install it with `pip install vllm`."
            )

        self.presence_penalty = presence_penalty
        self.frequency_penalty = frequency_penalty
        self.temperature = temperature
        self.top_p = top_p
        self.top_k = top_k
        self.max_tokens = max_new_tokens

        self.vllm = vllm

    def __rich_repr__(self) -> Generator[Any, None, None]:
        yield from super().__rich_repr__()
        yield (
            "parameters",
            {
                "max_tokens": self.max_tokens,
                "presence_penalty": self.presence_penalty,
                "frequency_penalty": self.frequency_penalty,
                "temperature": self.temperature,
                "top_p": self.top_p,
                "top_k": self.top_k,
            },
        )

    @property
    def model_name(self) -> str:
        """Returns the name of the vLLM model."""
        return self.vllm.llm_engine.model_config.model  # type: ignore

    def _generate(
        self, inputs: List[Dict[str, Any]], num_generations: int = 1
    ) -> List[List[LLMOutput]]:
        """Generates `num_generations` for each input in `inputs`.

        Args:
            inputs (List[Dict[str, Any]]): the inputs to be used for generation.
            num_generations (int, optional): the number of generations to be performed for each
                input. Defaults to 1.

        Returns:
            List[List[LLMOutput]]: the outputs of the LLM.
        """
        prompts = self._generate_prompts(inputs, default_format=None)
        requests = self.vllm.generate(
            prompts,
            SamplingParams(  # type: ignore
                n=num_generations,
                presence_penalty=self.presence_penalty,
                frequency_penalty=self.frequency_penalty,
                temperature=self.temperature,
                top_p=self.top_p,
                top_k=self.top_k,
                max_tokens=self.max_tokens,
            ),
            use_tqdm=False,  # type: ignore
        )
        outputs = []
        for request, prompt in zip(requests, prompts):
            output = []
            for request_output in request.outputs:
                try:
                    parsed_output = self.task.parse_output(request_output.text)
                except Exception as e:
                    logger.error(f"Error parsing vLLM output: {e}")
                    parsed_output = None
                output.append(
                    LLMOutput(
                        model_name=self.model_name,
                        prompt_used=prompt,
                        raw_output=request_output.text,
                        parsed_output=parsed_output,
                    )
                )
            outputs.append(output)
        return outputs

model_name: str property

Returns the name of the vLLM model.

__init__(vllm, task, max_new_tokens=128, presence_penalty=0.0, frequency_penalty=0.0, temperature=1.0, top_p=1.0, top_k=-1, prompt_format=None, prompt_formatting_fn=None)

Initializes the vLLM class.

Parameters:

Name Type Description Default
vllm LLM

the vLLM model to be used.

required
task Task

the task to be performed by the LLM.

required
max_new_tokens int

the maximum number of tokens to be generated. Defaults to 128.

128
presence_penalty float

the presence penalty to be used for generation. Defaults to 0.0.

0.0
frequency_penalty float

the frequency penalty to be used for generation. Defaults to 0.0.

0.0
temperature float

the temperature to be used for generation. Defaults to 1.0.

1.0
top_p float

the top-p value to be used for generation. Defaults to 1.0.

1.0
top_k int

the top-k value to be used for generation. Defaults to -1.

-1
prompt_format Union[SupportedFormats, None]

the format to be used for the prompt. If None, the default format of the task will be used, available formats are openai, chatml, llama2, zephyr, and default. Defaults to None, but default (concatenation of system_prompt and formatted_prompt with a line-break) will be used if no prompt_formatting_fn is provided.

None
prompt_formatting_fn Union[Callable[..., str], None]

a function to be applied to the prompt before generation. If None, no formatting will be applied.

None

Examples:

>>> from vllm import LLM
>>> from distilabel.tasks.text_generation import TextGenerationTask as Task
>>> from distilabel.llm import vLLM
>>> model = LLM(model="gpt2")
>>> task = Task()
>>> llm = vLLM(model=model, task=task)
Source code in src/distilabel/llm/vllm.py
def __init__(
    self,
    vllm: "_vLLM",
    task: "Task",
    max_new_tokens: int = 128,
    presence_penalty: float = 0.0,
    frequency_penalty: float = 0.0,
    temperature: float = 1.0,
    top_p: float = 1.0,
    top_k: int = -1,
    prompt_format: Union["SupportedFormats", None] = None,
    prompt_formatting_fn: Union[Callable[..., str], None] = None,
) -> None:
    """Initializes the vLLM class.

    Args:
        vllm (_vLLM): the vLLM model to be used.
        task (Task): the task to be performed by the LLM.
        max_new_tokens (int, optional): the maximum number of tokens to be generated.
            Defaults to 128.
        presence_penalty (float, optional): the presence penalty to be used for generation.
            Defaults to 0.0.
        frequency_penalty (float, optional): the frequency penalty to be used for generation.
            Defaults to 0.0.
        temperature (float, optional): the temperature to be used for generation.
            Defaults to 1.0.
        top_p (float, optional): the top-p value to be used for generation.
            Defaults to 1.0.
        top_k (int, optional): the top-k value to be used for generation.
            Defaults to -1.
        prompt_format (Union[SupportedFormats, None], optional): the format to be used
            for the prompt. If `None`, the default format of the task will be used, available
            formats are `openai`, `chatml`, `llama2`, `zephyr`, and `default`. Defaults to `None`,
            but `default` (concatenation of `system_prompt` and `formatted_prompt` with a line-break)
            will be used if no `prompt_formatting_fn` is provided.
        prompt_formatting_fn (Union[Callable[..., str], None], optional): a function to be
            applied to the prompt before generation. If `None`, no formatting will be applied.

    Examples:
        >>> from vllm import LLM
        >>> from distilabel.tasks.text_generation import TextGenerationTask as Task
        >>> from distilabel.llm import vLLM
        >>> model = LLM(model="gpt2")
        >>> task = Task()
        >>> llm = vLLM(model=model, task=task)
    """
    super().__init__(
        task=task,
        prompt_format=prompt_format,
        prompt_formatting_fn=prompt_formatting_fn,
    )

    if not _VLLM_AVAILABLE:
        raise ImportError(
            "`vLLM` cannot be used as `vllm` is not installed, please "
            " install it with `pip install vllm`."
        )

    self.presence_penalty = presence_penalty
    self.frequency_penalty = frequency_penalty
    self.temperature = temperature
    self.top_p = top_p
    self.top_k = top_k
    self.max_tokens = max_new_tokens

    self.vllm = vllm