Skip to content

Argilla

This section contains the existing steps integrated with Argilla so as to easily push the generated datasets to Argilla.

ArgillaBase

Bases: Step, ABC

Abstract step that provides a class to subclass from, that contains the boilerplate code required to interact with Argilla, as well as some extra validations on top of it. It also defines the abstract methods that need to be implemented in order to add a new dataset type as a step.

Note

This class is not intended to be instanced directly, but via subclass.

Attributes:

Name Type Description
dataset_name RuntimeParameter[str]

The name of the dataset in Argilla where the records will be added.

dataset_workspace Optional[RuntimeParameter[str]]

The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.

api_url Optional[RuntimeParameter[str]]

The URL of the Argilla API. Defaults to None, which means it will be read from the ARGILLA_API_URL environment variable.

api_key Optional[RuntimeParameter[SecretStr]]

The API key to authenticate with Argilla. Defaults to None, which means it will be read from the ARGILLA_API_KEY environment variable.

Runtime parameters
  • dataset_name: The name of the dataset in Argilla where the records will be added.
  • dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.
  • api_url: The base URL to use for the Argilla API requests.
  • api_key: The API key to authenticate the requests to the Argilla API.
Input columns
  • dynamic, based on the inputs value provided
Source code in src/distilabel/steps/argilla/base.py
class ArgillaBase(Step, ABC):
    """Abstract step that provides a class to subclass from, that contains the boilerplate code
    required to interact with Argilla, as well as some extra validations on top of it. It also defines
    the abstract methods that need to be implemented in order to add a new dataset type as a step.

    Note:
        This class is not intended to be instanced directly, but via subclass.

    Attributes:
        dataset_name: The name of the dataset in Argilla where the records will be added.
        dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to
            `None`, which means it will be created in the default workspace.
        api_url: The URL of the Argilla API. Defaults to `None`, which means it will be read from
            the `ARGILLA_API_URL` environment variable.
        api_key: The API key to authenticate with Argilla. Defaults to `None`, which means it will
            be read from the `ARGILLA_API_KEY` environment variable.

    Runtime parameters:
        - `dataset_name`: The name of the dataset in Argilla where the records will be
            added.
        - `dataset_workspace`: The workspace where the dataset will be created in Argilla.
            Defaults to `None`, which means it will be created in the default workspace.
        - `api_url`: The base URL to use for the Argilla API requests.
        - `api_key`: The API key to authenticate the requests to the Argilla API.

    Input columns:
        - dynamic, based on the `inputs` value provided
    """

    dataset_name: RuntimeParameter[str] = Field(
        default=None, description="The name of the dataset in Argilla."
    )
    dataset_workspace: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The workspace where the dataset will be created in Argilla. Defaults "
        "to `None` which means it will be created in the default workspace.",
    )

    api_url: Optional[RuntimeParameter[str]] = Field(
        default_factory=lambda: os.getenv(_ARGILLA_API_URL_ENV_VAR_NAME),
        description="The base URL to use for the Argilla API requests.",
    )
    api_key: Optional[RuntimeParameter[SecretStr]] = Field(
        default_factory=lambda: os.getenv(_ARGILLA_API_KEY_ENV_VAR_NAME),
        description="The API key to authenticate the requests to the Argilla API.",
    )

    _client: Optional["Argilla"] = PrivateAttr(...)
    _dataset: Optional["Dataset"] = PrivateAttr(...)

    def model_post_init(self, __context: Any) -> None:
        """Checks that the Argilla Python SDK is installed, and then filters the Argilla warnings."""
        super().model_post_init(__context)

        if importlib.util.find_spec("argilla") is None:
            raise ImportError(
                "Argilla is not installed. Please install it using `pip install argilla"
                " --upgrade`."
            )

    def _client_init(self) -> None:
        """Initializes the Argilla API client with the provided `api_url` and `api_key`."""
        try:
            self._client = rg.Argilla(  # type: ignore
                api_url=self.api_url,
                api_key=self.api_key.get_secret_value(),  # type: ignore
                headers={"Authorization": f"Bearer {os.environ['HF_TOKEN']}"}
                if isinstance(self.api_url, str)
                and "hf.space" in self.api_url
                and "HF_TOKEN" in os.environ
                else {},
            )
        except Exception as e:
            raise ValueError(f"Failed to initialize the Argilla API: {e}") from e

    @property
    def _dataset_exists_in_workspace(self) -> bool:
        """Checks if the dataset already exists in Argilla in the provided workspace if any.

        Returns:
            `True` if the dataset exists, `False` otherwise.
        """
        return (
            self._client.datasets(  # type: ignore
                name=self.dataset_name,  # type: ignore
                workspace=self.dataset_workspace,
            )
            is not None
        )

    @property
    def outputs(self) -> List[str]:
        """The outputs of the step is an empty list, since the steps subclassing from this one, will
        always be leaf nodes and won't propagate the inputs neither generate any outputs.
        """
        return []

    def load(self) -> None:
        """Method to perform any initialization logic before the `process` method is
        called. For example, to load an LLM, stablish a connection to a database, etc.
        """
        super().load()

        if self.api_url is None or self.api_key is None:
            raise ValueError(
                "`Argilla` step requires the `api_url` and `api_key` to be provided. Please,"
                " provide those at step instantiation, via environment variables `ARGILLA_API_URL`"
                " and `ARGILLA_API_KEY`, or as `Step` runtime parameters via `pipeline.run(parameters={...})`."
            )

        self._client_init()

    @property
    @abstractmethod
    def inputs(self) -> List[str]: ...

    @abstractmethod
    def process(self, *inputs: StepInput) -> "StepOutput": ...

outputs: List[str] property

The outputs of the step is an empty list, since the steps subclassing from this one, will always be leaf nodes and won't propagate the inputs neither generate any outputs.

load()

Method to perform any initialization logic before the process method is called. For example, to load an LLM, stablish a connection to a database, etc.

Source code in src/distilabel/steps/argilla/base.py
def load(self) -> None:
    """Method to perform any initialization logic before the `process` method is
    called. For example, to load an LLM, stablish a connection to a database, etc.
    """
    super().load()

    if self.api_url is None or self.api_key is None:
        raise ValueError(
            "`Argilla` step requires the `api_url` and `api_key` to be provided. Please,"
            " provide those at step instantiation, via environment variables `ARGILLA_API_URL`"
            " and `ARGILLA_API_KEY`, or as `Step` runtime parameters via `pipeline.run(parameters={...})`."
        )

    self._client_init()

model_post_init(__context)

Checks that the Argilla Python SDK is installed, and then filters the Argilla warnings.

Source code in src/distilabel/steps/argilla/base.py
def model_post_init(self, __context: Any) -> None:
    """Checks that the Argilla Python SDK is installed, and then filters the Argilla warnings."""
    super().model_post_init(__context)

    if importlib.util.find_spec("argilla") is None:
        raise ImportError(
            "Argilla is not installed. Please install it using `pip install argilla"
            " --upgrade`."
        )

PreferenceToArgilla

Bases: ArgillaBase

Creates a preference dataset in Argilla.

Step that creates a dataset in Argilla during the load phase, and then pushes the input batches into it as records. This dataset is a preference dataset, where there's one field for the instruction and one extra field per each generation within the same record, and then a rating question per each of the generation fields. The rating question asks the annotator to set a rating from 1 to 5 for each of the provided generations.

Note

This step is meant to be used in conjunction with the UltraFeedback step, or any other step generating both ratings and responses for a given set of instruction and generations for the given instruction. But alternatively, it can also be used with any other task or step generating only the instruction and generations, as the ratings and rationales are optional.

Attributes:

Name Type Description
num_generations int

The number of generations to include in the dataset.

dataset_name int

The name of the dataset in Argilla.

dataset_workspace int

The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.

api_url int

The URL of the Argilla API. Defaults to None, which means it will be read from the ARGILLA_API_URL environment variable.

api_key int

The API key to authenticate with Argilla. Defaults to None, which means it will be read from the ARGILLA_API_KEY environment variable.

Runtime parameters
  • api_url: The base URL to use for the Argilla API requests.
  • api_key: The API key to authenticate the requests to the Argilla API.
Input columns
  • instruction (str): The instruction that was used to generate the completion.
  • generations (List[str]): The completion that was generated based on the input instruction.
  • ratings (List[str], optional): The ratings for the generations. If not provided, the generated ratings won't be pushed to Argilla.
  • rationales (List[str], optional): The rationales for the ratings. If not provided, the generated rationales won't be pushed to Argilla.

Examples:

Push a preference dataset to an Argilla instance:

```python
from distilabel.steps import PreferenceToArgilla

to_argilla = PreferenceToArgilla(
    num_generations=2,
    api_url="https://dibt-demo-argilla-space.hf.space/",
    api_key="api.key",
    dataset_name="argilla_dataset",
    dataset_workspace="my_workspace",
)
to_argilla.load()

result = next(
    to_argilla.process(
        [
            {
                "instruction": "instruction",
                "generations": ["first_generation", "second_generation"],
            }
        ],
    )
)
# >>> result
# [{'instruction': 'instruction', 'generations': ['first_generation', 'second_generation']}]
```

It can also include ratings and rationales:

```python
result = next(
    to_argilla.process(
        [
            {
                "instruction": "instruction",
                "generations": ["first_generation", "second_generation"],
                "ratings": ["4", "5"],
                "rationales": ["rationale for 4", "rationale for 5"],
            }
        ],
    )
)
# >>> result
# [
#     {
#         'instruction': 'instruction',
#         'generations': ['first_generation', 'second_generation'],
#         'ratings': ['4', '5'],
#         'rationales': ['rationale for 4', 'rationale for 5']
#     }
# ]
```
Source code in src/distilabel/steps/argilla/preference.py
class PreferenceToArgilla(ArgillaBase):
    """Creates a preference dataset in Argilla.

    Step that creates a dataset in Argilla during the load phase, and then pushes the input
    batches into it as records. This dataset is a preference dataset, where there's one field
    for the instruction and one extra field per each generation within the same record, and then
    a rating question per each of the generation fields. The rating question asks the annotator to
    set a rating from 1 to 5 for each of the provided generations.

    Note:
        This step is meant to be used in conjunction with the `UltraFeedback` step, or any other step
        generating both ratings and responses for a given set of instruction and generations for the
        given instruction. But alternatively, it can also be used with any other task or step generating
        only the `instruction` and `generations`, as the `ratings` and `rationales` are optional.

    Attributes:
        num_generations: The number of generations to include in the dataset.
        dataset_name: The name of the dataset in Argilla.
        dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to
            `None`, which means it will be created in the default workspace.
        api_url: The URL of the Argilla API. Defaults to `None`, which means it will be read from
            the `ARGILLA_API_URL` environment variable.
        api_key: The API key to authenticate with Argilla. Defaults to `None`, which means it will
            be read from the `ARGILLA_API_KEY` environment variable.

    Runtime parameters:
        - `api_url`: The base URL to use for the Argilla API requests.
        - `api_key`: The API key to authenticate the requests to the Argilla API.

    Input columns:
        - instruction (`str`): The instruction that was used to generate the completion.
        - generations (`List[str]`): The completion that was generated based on the input instruction.
        - ratings (`List[str]`, optional): The ratings for the generations. If not provided, the
            generated ratings won't be pushed to Argilla.
        - rationales (`List[str]`, optional): The rationales for the ratings. If not provided, the
            generated rationales won't be pushed to Argilla.

    Examples:

        Push a preference dataset to an Argilla instance:

        ```python
        from distilabel.steps import PreferenceToArgilla

        to_argilla = PreferenceToArgilla(
            num_generations=2,
            api_url="https://dibt-demo-argilla-space.hf.space/",
            api_key="api.key",
            dataset_name="argilla_dataset",
            dataset_workspace="my_workspace",
        )
        to_argilla.load()

        result = next(
            to_argilla.process(
                [
                    {
                        "instruction": "instruction",
                        "generations": ["first_generation", "second_generation"],
                    }
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction', 'generations': ['first_generation', 'second_generation']}]
        ```

        It can also include ratings and rationales:

        ```python
        result = next(
            to_argilla.process(
                [
                    {
                        "instruction": "instruction",
                        "generations": ["first_generation", "second_generation"],
                        "ratings": ["4", "5"],
                        "rationales": ["rationale for 4", "rationale for 5"],
                    }
                ],
            )
        )
        # >>> result
        # [
        #     {
        #         'instruction': 'instruction',
        #         'generations': ['first_generation', 'second_generation'],
        #         'ratings': ['4', '5'],
        #         'rationales': ['rationale for 4', 'rationale for 5']
        #     }
        # ]
        ```
    """

    num_generations: int

    _id: str = PrivateAttr(default="id")
    _instruction: str = PrivateAttr(...)
    _generations: str = PrivateAttr(...)
    _ratings: str = PrivateAttr(...)
    _rationales: str = PrivateAttr(...)

    def load(self) -> None:
        """Sets the `_instruction` and `_generations` attributes based on the `inputs_mapping`, otherwise
        uses the default values; and then uses those values to create a `FeedbackDataset` suited for
        the text-generation scenario. And then it pushes it to Argilla.
        """
        super().load()

        # Both `instruction` and `generations` will be used as the fields of the dataset
        self._instruction = self.input_mappings.get("instruction", "instruction")
        self._generations = self.input_mappings.get("generations", "generations")
        # Both `ratings` and `rationales` will be used as suggestions to the default questions of the dataset
        self._ratings = self.input_mappings.get("ratings", "ratings")
        self._rationales = self.input_mappings.get("rationales", "rationales")

        if self._dataset_exists_in_workspace:
            _dataset = self._client.datasets(  # type: ignore
                name=self.dataset_name,  # type: ignore
                workspace=self.dataset_workspace,  # type: ignore
            )

            for field in _dataset.fields:
                if not isinstance(field, rg.TextField):
                    continue
                if (
                    field.name
                    not in [self._id, self._instruction]  # type: ignore
                    + [
                        f"{self._generations}-{idx}"
                        for idx in range(self.num_generations)
                    ]
                    and field.required
                ):
                    raise ValueError(
                        f"The dataset '{self.dataset_name}' in the workspace '{self.dataset_workspace}'"
                        f" already exists, but contains at least a required field that is"
                        f" neither `{self._id}`, `{self._instruction}`, nor `{self._generations}`"
                        f" (one per generation starting from 0 up to {self.num_generations - 1})."
                    )

            self._dataset = _dataset
        else:
            _settings = rg.Settings(  # type: ignore
                fields=[
                    rg.TextField(name=self._id, title=self._id),  # type: ignore
                    rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                    *self._generation_fields(),  # type: ignore
                ],
                questions=self._rating_rationale_pairs(),  # type: ignore
            )
            _dataset = rg.Dataset(  # type: ignore
                name=self.dataset_name,
                workspace=self.dataset_workspace,
                settings=_settings,
                client=self._client,
            )
            self._dataset = _dataset.create()

    def _generation_fields(self) -> List["TextField"]:
        """Method to generate the fields for each of the generations.

        Returns:
            A list containing `TextField`s for each text generation.
        """
        return [
            rg.TextField(  # type: ignore
                name=f"{self._generations}-{idx}",
                title=f"{self._generations}-{idx}",
                required=True if idx == 0 else False,
            )
            for idx in range(self.num_generations)
        ]

    def _rating_rationale_pairs(
        self,
    ) -> List[Union["RatingQuestion", "TextQuestion"]]:
        """Method to generate the rating and rationale questions for each of the generations.

        Returns:
            A list of questions containing a `RatingQuestion` and `TextQuestion` pair for
            each text generation.
        """
        questions = []
        for idx in range(self.num_generations):
            questions.extend(
                [
                    rg.RatingQuestion(  # type: ignore
                        name=f"{self._generations}-{idx}-rating",
                        title=f"Rate {self._generations}-{idx} given {self._instruction}.",
                        description=f"Ignore this question if the corresponding `{self._generations}-{idx}` field is not available."
                        if idx != 0
                        else None,
                        values=[1, 2, 3, 4, 5],
                        required=True if idx == 0 else False,
                    ),
                    rg.TextQuestion(  # type: ignore
                        name=f"{self._generations}-{idx}-rationale",
                        title=f"Specify the rationale for {self._generations}-{idx}'s rating.",
                        description=f"Ignore this question if the corresponding `{self._generations}-{idx}` field is not available."
                        if idx != 0
                        else None,
                        required=False,
                    ),
                ]
            )
        return questions

    @property
    def inputs(self) -> List[str]:
        """The inputs for the step are the `instruction` and the `generations`. Optionally, one could also
        provide the `ratings` and the `rationales` for the generations."""
        return ["instruction", "generations"]

    @property
    def optional_inputs(self) -> List[str]:
        """The optional inputs for the step are the `ratings` and the `rationales` for the generations."""
        return ["ratings", "rationales"]

    def _add_suggestions_if_any(self, input: Dict[str, Any]) -> List["Suggestion"]:
        """Method to generate the suggestions for the `rg.Record` based on the input.

        Returns:
            A list of `Suggestion`s for the rating and rationales questions.
        """
        # Since the `suggestions` i.e. answers to the `questions` are optional, will default to {}
        suggestions = []
        # If `ratings` is in `input`, then add those as suggestions
        if self._ratings in input:
            suggestions.extend(
                [
                    rg.Suggestion(  # type: ignore
                        value=rating,
                        question_name=f"{self._generations}-{idx}-rating",
                    )
                    for idx, rating in enumerate(input[self._ratings])
                    if rating is not None
                    and isinstance(rating, int)
                    and rating in [1, 2, 3, 4, 5]
                ],
            )
        # If `rationales` is in `input`, then add those as suggestions
        if self._rationales in input:
            suggestions.extend(
                [
                    rg.Suggestion(  # type: ignore
                        value=rationale,
                        question_name=f"{self._generations}-{idx}-rationale",
                    )
                    for idx, rationale in enumerate(input[self._rationales])
                    if rationale is not None and isinstance(rationale, str)
                ],
            )
        return suggestions

    @override
    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Creates and pushes the records as `rg.Record`s to the Argilla dataset.

        Args:
            inputs: A list of Python dictionaries with the inputs of the task.

        Returns:
            A list of Python dictionaries with the outputs of the task.
        """
        records = []
        for input in inputs:
            # Generate the SHA-256 hash of the instruction to use it as the metadata
            instruction_id = hashlib.sha256(
                input["instruction"].encode("utf-8")  # type: ignore
            ).hexdigest()

            generations = {
                f"{self._generations}-{idx}": generation
                for idx, generation in enumerate(input["generations"])  # type: ignore
            }

            records.append(  # type: ignore
                rg.Record(  # type: ignore
                    fields={
                        "id": instruction_id,
                        "instruction": input["instruction"],  # type: ignore
                        **generations,
                    },
                    suggestions=self._add_suggestions_if_any(input),  # type: ignore
                )
            )
        self._dataset.records.log(records)  # type: ignore
        yield inputs

inputs: List[str] property

The inputs for the step are the instruction and the generations. Optionally, one could also provide the ratings and the rationales for the generations.

optional_inputs: List[str] property

The optional inputs for the step are the ratings and the rationales for the generations.

load()

Sets the _instruction and _generations attributes based on the inputs_mapping, otherwise uses the default values; and then uses those values to create a FeedbackDataset suited for the text-generation scenario. And then it pushes it to Argilla.

Source code in src/distilabel/steps/argilla/preference.py
def load(self) -> None:
    """Sets the `_instruction` and `_generations` attributes based on the `inputs_mapping`, otherwise
    uses the default values; and then uses those values to create a `FeedbackDataset` suited for
    the text-generation scenario. And then it pushes it to Argilla.
    """
    super().load()

    # Both `instruction` and `generations` will be used as the fields of the dataset
    self._instruction = self.input_mappings.get("instruction", "instruction")
    self._generations = self.input_mappings.get("generations", "generations")
    # Both `ratings` and `rationales` will be used as suggestions to the default questions of the dataset
    self._ratings = self.input_mappings.get("ratings", "ratings")
    self._rationales = self.input_mappings.get("rationales", "rationales")

    if self._dataset_exists_in_workspace:
        _dataset = self._client.datasets(  # type: ignore
            name=self.dataset_name,  # type: ignore
            workspace=self.dataset_workspace,  # type: ignore
        )

        for field in _dataset.fields:
            if not isinstance(field, rg.TextField):
                continue
            if (
                field.name
                not in [self._id, self._instruction]  # type: ignore
                + [
                    f"{self._generations}-{idx}"
                    for idx in range(self.num_generations)
                ]
                and field.required
            ):
                raise ValueError(
                    f"The dataset '{self.dataset_name}' in the workspace '{self.dataset_workspace}'"
                    f" already exists, but contains at least a required field that is"
                    f" neither `{self._id}`, `{self._instruction}`, nor `{self._generations}`"
                    f" (one per generation starting from 0 up to {self.num_generations - 1})."
                )

        self._dataset = _dataset
    else:
        _settings = rg.Settings(  # type: ignore
            fields=[
                rg.TextField(name=self._id, title=self._id),  # type: ignore
                rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                *self._generation_fields(),  # type: ignore
            ],
            questions=self._rating_rationale_pairs(),  # type: ignore
        )
        _dataset = rg.Dataset(  # type: ignore
            name=self.dataset_name,
            workspace=self.dataset_workspace,
            settings=_settings,
            client=self._client,
        )
        self._dataset = _dataset.create()

process(inputs)

Creates and pushes the records as rg.Records to the Argilla dataset.

Parameters:

Name Type Description Default
inputs StepInput

A list of Python dictionaries with the inputs of the task.

required

Returns:

Type Description
StepOutput

A list of Python dictionaries with the outputs of the task.

Source code in src/distilabel/steps/argilla/preference.py
@override
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Creates and pushes the records as `rg.Record`s to the Argilla dataset.

    Args:
        inputs: A list of Python dictionaries with the inputs of the task.

    Returns:
        A list of Python dictionaries with the outputs of the task.
    """
    records = []
    for input in inputs:
        # Generate the SHA-256 hash of the instruction to use it as the metadata
        instruction_id = hashlib.sha256(
            input["instruction"].encode("utf-8")  # type: ignore
        ).hexdigest()

        generations = {
            f"{self._generations}-{idx}": generation
            for idx, generation in enumerate(input["generations"])  # type: ignore
        }

        records.append(  # type: ignore
            rg.Record(  # type: ignore
                fields={
                    "id": instruction_id,
                    "instruction": input["instruction"],  # type: ignore
                    **generations,
                },
                suggestions=self._add_suggestions_if_any(input),  # type: ignore
            )
        )
    self._dataset.records.log(records)  # type: ignore
    yield inputs

TextGenerationToArgilla

Bases: ArgillaBase

Creates a text generation dataset in Argilla.

Step that creates a dataset in Argilla during the load phase, and then pushes the input batches into it as records. This dataset is a text-generation dataset, where there's one field per each input, and then a label question to rate the quality of the completion in either bad (represented with 👎) or good (represented with 👍).

Note

This step is meant to be used in conjunction with a TextGeneration step and no column mapping is needed, as it will use the default values for the instruction and generation columns.

Attributes:

Name Type Description
dataset_name

The name of the dataset in Argilla.

dataset_workspace

The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.

api_url

The URL of the Argilla API. Defaults to None, which means it will be read from the ARGILLA_API_URL environment variable.

api_key

The API key to authenticate with Argilla. Defaults to None, which means it will be read from the ARGILLA_API_KEY environment variable.

Runtime parameters
  • api_url: The base URL to use for the Argilla API requests.
  • api_key: The API key to authenticate the requests to the Argilla API.
Input columns
  • instruction (str): The instruction that was used to generate the completion.
  • generation (str or List[str]): The completions that were generated based on the input instruction.

Examples:

Push a text generation dataset to an Argilla instance:

```python
from distilabel.steps import PreferenceToArgilla

to_argilla = TextGenerationToArgilla(
    num_generations=2,
    api_url="https://dibt-demo-argilla-space.hf.space/",
    api_key="api.key",
    dataset_name="argilla_dataset",
    dataset_workspace="my_workspace",
)
to_argilla.load()

result = next(
    to_argilla.process(
        [
            {
                "instruction": "instruction",
                "generation": "generation",
            }
        ],
    )
)
# >>> result
# [{'instruction': 'instruction', 'generation': 'generation'}]
```
Source code in src/distilabel/steps/argilla/text_generation.py
class TextGenerationToArgilla(ArgillaBase):
    """Creates a text generation dataset in Argilla.

    `Step` that creates a dataset in Argilla during the load phase, and then pushes the input
    batches into it as records. This dataset is a text-generation dataset, where there's one field
    per each input, and then a label question to rate the quality of the completion in either bad
    (represented with 👎) or good (represented with 👍).

    Note:
        This step is meant to be used in conjunction with a `TextGeneration` step and no column mapping
        is needed, as it will use the default values for the `instruction` and `generation` columns.

    Attributes:
        dataset_name: The name of the dataset in Argilla.
        dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to
            `None`, which means it will be created in the default workspace.
        api_url: The URL of the Argilla API. Defaults to `None`, which means it will be read from
            the `ARGILLA_API_URL` environment variable.
        api_key: The API key to authenticate with Argilla. Defaults to `None`, which means it will
            be read from the `ARGILLA_API_KEY` environment variable.

    Runtime parameters:
        - `api_url`: The base URL to use for the Argilla API requests.
        - `api_key`: The API key to authenticate the requests to the Argilla API.

    Input columns:
        - instruction (`str`): The instruction that was used to generate the completion.
        - generation (`str` or `List[str]`): The completions that were generated based on the input instruction.

    Examples:

        Push a text generation dataset to an Argilla instance:

        ```python
        from distilabel.steps import PreferenceToArgilla

        to_argilla = TextGenerationToArgilla(
            num_generations=2,
            api_url="https://dibt-demo-argilla-space.hf.space/",
            api_key="api.key",
            dataset_name="argilla_dataset",
            dataset_workspace="my_workspace",
        )
        to_argilla.load()

        result = next(
            to_argilla.process(
                [
                    {
                        "instruction": "instruction",
                        "generation": "generation",
                    }
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction', 'generation': 'generation'}]
        ```
    """

    _id: str = PrivateAttr(default="id")
    _instruction: str = PrivateAttr(...)
    _generation: str = PrivateAttr(...)

    def load(self) -> None:
        """Sets the `_instruction` and `_generation` attributes based on the `inputs_mapping`, otherwise
        uses the default values; and then uses those values to create a `FeedbackDataset` suited for
        the text-generation scenario. And then it pushes it to Argilla.
        """
        super().load()

        self._instruction = self.input_mappings.get("instruction", "instruction")
        self._generation = self.input_mappings.get("generation", "generation")

        if self._dataset_exists_in_workspace:
            _dataset = self._client.datasets(  # type: ignore
                name=self.dataset_name,  # type: ignore
                workspace=self.dataset_workspace,  # type: ignore
            )

            for field in _dataset.fields:
                if not isinstance(field, rg.TextField):  # type: ignore
                    continue
                if (
                    field.name not in [self._id, self._instruction, self._generation]
                    and field.required
                ):
                    raise ValueError(
                        f"The dataset '{self.dataset_name}' in the workspace '{self.dataset_workspace}'"
                        f" already exists, but contains at least a required field that is"
                        f" neither `{self._id}`, `{self._instruction}`, nor `{self._generation}`,"
                        " so it cannot be reused for this dataset."
                    )

            self._dataset = _dataset
        else:
            _settings = rg.Settings(  # type: ignore
                fields=[
                    rg.TextField(name=self._id, title=self._id),  # type: ignore
                    rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                    rg.TextField(name=self._generation, title=self._generation),  # type: ignore
                ],
                questions=[
                    rg.LabelQuestion(  # type: ignore
                        name="quality",
                        title=f"What's the quality of the {self._generation} for the given {self._instruction}?",
                        labels={"bad": "👎", "good": "👍"},  # type: ignore
                    )
                ],
            )
            _dataset = rg.Dataset(  # type: ignore
                name=self.dataset_name,
                workspace=self.dataset_workspace,
                settings=_settings,
                client=self._client,
            )
            self._dataset = _dataset.create()

    @property
    def inputs(self) -> List[str]:
        """The inputs for the step are the `instruction` and the `generation`."""
        return ["instruction", "generation"]

    @override
    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

        Args:
            inputs: A list of Python dictionaries with the inputs of the task.

        Returns:
            A list of Python dictionaries with the outputs of the task.
        """
        records = []
        for input in inputs:
            # Generate the SHA-256 hash of the instruction to use it as the metadata
            instruction_id = hashlib.sha256(
                input["instruction"].encode("utf-8")
            ).hexdigest()

            generations = input["generation"]

            # If the `generation` is not a list, then convert it into a list
            if not isinstance(generations, list):
                generations = [generations]

            # Create a `generations_set` to avoid adding duplicates
            generations_set = set()

            for generation in generations:
                # If the generation is already in the set, then skip it
                if generation in generations_set:
                    continue
                # Otherwise, add it to the set
                generations_set.add(generation)

                records.append(
                    rg.Record(  # type: ignore
                        fields={
                            self._id: instruction_id,
                            self._instruction: input["instruction"],
                            self._generation: generation,
                        },
                    ),
                )
        self._dataset.records.log(records)  # type: ignore
        yield inputs

inputs: List[str] property

The inputs for the step are the instruction and the generation.

load()

Sets the _instruction and _generation attributes based on the inputs_mapping, otherwise uses the default values; and then uses those values to create a FeedbackDataset suited for the text-generation scenario. And then it pushes it to Argilla.

Source code in src/distilabel/steps/argilla/text_generation.py
def load(self) -> None:
    """Sets the `_instruction` and `_generation` attributes based on the `inputs_mapping`, otherwise
    uses the default values; and then uses those values to create a `FeedbackDataset` suited for
    the text-generation scenario. And then it pushes it to Argilla.
    """
    super().load()

    self._instruction = self.input_mappings.get("instruction", "instruction")
    self._generation = self.input_mappings.get("generation", "generation")

    if self._dataset_exists_in_workspace:
        _dataset = self._client.datasets(  # type: ignore
            name=self.dataset_name,  # type: ignore
            workspace=self.dataset_workspace,  # type: ignore
        )

        for field in _dataset.fields:
            if not isinstance(field, rg.TextField):  # type: ignore
                continue
            if (
                field.name not in [self._id, self._instruction, self._generation]
                and field.required
            ):
                raise ValueError(
                    f"The dataset '{self.dataset_name}' in the workspace '{self.dataset_workspace}'"
                    f" already exists, but contains at least a required field that is"
                    f" neither `{self._id}`, `{self._instruction}`, nor `{self._generation}`,"
                    " so it cannot be reused for this dataset."
                )

        self._dataset = _dataset
    else:
        _settings = rg.Settings(  # type: ignore
            fields=[
                rg.TextField(name=self._id, title=self._id),  # type: ignore
                rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                rg.TextField(name=self._generation, title=self._generation),  # type: ignore
            ],
            questions=[
                rg.LabelQuestion(  # type: ignore
                    name="quality",
                    title=f"What's the quality of the {self._generation} for the given {self._instruction}?",
                    labels={"bad": "👎", "good": "👍"},  # type: ignore
                )
            ],
        )
        _dataset = rg.Dataset(  # type: ignore
            name=self.dataset_name,
            workspace=self.dataset_workspace,
            settings=_settings,
            client=self._client,
        )
        self._dataset = _dataset.create()

process(inputs)

Creates and pushes the records as FeedbackRecords to the Argilla dataset.

Parameters:

Name Type Description Default
inputs StepInput

A list of Python dictionaries with the inputs of the task.

required

Returns:

Type Description
StepOutput

A list of Python dictionaries with the outputs of the task.

Source code in src/distilabel/steps/argilla/text_generation.py
@override
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

    Args:
        inputs: A list of Python dictionaries with the inputs of the task.

    Returns:
        A list of Python dictionaries with the outputs of the task.
    """
    records = []
    for input in inputs:
        # Generate the SHA-256 hash of the instruction to use it as the metadata
        instruction_id = hashlib.sha256(
            input["instruction"].encode("utf-8")
        ).hexdigest()

        generations = input["generation"]

        # If the `generation` is not a list, then convert it into a list
        if not isinstance(generations, list):
            generations = [generations]

        # Create a `generations_set` to avoid adding duplicates
        generations_set = set()

        for generation in generations:
            # If the generation is already in the set, then skip it
            if generation in generations_set:
                continue
            # Otherwise, add it to the set
            generations_set.add(generation)

            records.append(
                rg.Record(  # type: ignore
                    fields={
                        self._id: instruction_id,
                        self._instruction: input["instruction"],
                        self._generation: generation,
                    },
                ),
            )
    self._dataset.records.log(records)  # type: ignore
    yield inputs