Skip to content

Argilla

This section contains the existing steps integrated with Argilla so as to easily push the generated datasets to Argilla.

Argilla

Bases: Step, ABC

Abstract step that provides a class to subclass from, that contains the boilerplate code required to interact with Argilla, as well as some extra validations on top of it. It also defines the abstract methods that need to be implemented in order to add a new dataset type as a step.

Note

This class is not intended to be instanced directly, but via subclass.

Attributes:

Name Type Description
dataset_name RuntimeParameter[str]

The name of the dataset in Argilla where the records will be added.

dataset_workspace Optional[RuntimeParameter[str]]

The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.

api_url Optional[RuntimeParameter[str]]

The URL of the Argilla API. Defaults to None, which means it will be read from the ARGILLA_API_URL environment variable.

api_key Optional[RuntimeParameter[SecretStr]]

The API key to authenticate with Argilla. Defaults to None, which means it will be read from the ARGILLA_API_KEY environment variable.

Runtime parameters
  • dataset_name: The name of the dataset in Argilla where the records will be added.
  • dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.
  • api_url: The base URL to use for the Argilla API requests.
  • api_key: The API key to authenticate the requests to the Argilla API.
Input columns
  • dynamic, based on the inputs value provided
Source code in src/distilabel/steps/argilla/base.py
class Argilla(Step, ABC):
    """Abstract step that provides a class to subclass from, that contains the boilerplate code
    required to interact with Argilla, as well as some extra validations on top of it. It also defines
    the abstract methods that need to be implemented in order to add a new dataset type as a step.

    Note:
        This class is not intended to be instanced directly, but via subclass.

    Attributes:
        dataset_name: The name of the dataset in Argilla where the records will be added.
        dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to
            `None`, which means it will be created in the default workspace.
        api_url: The URL of the Argilla API. Defaults to `None`, which means it will be read from
            the `ARGILLA_API_URL` environment variable.
        api_key: The API key to authenticate with Argilla. Defaults to `None`, which means it will
            be read from the `ARGILLA_API_KEY` environment variable.

    Runtime parameters:
        - `dataset_name`: The name of the dataset in Argilla where the records will be
            added.
        - `dataset_workspace`: The workspace where the dataset will be created in Argilla.
            Defaults to `None`, which means it will be created in the default workspace.
        - `api_url`: The base URL to use for the Argilla API requests.
        - `api_key`: The API key to authenticate the requests to the Argilla API.

    Input columns:
        - dynamic, based on the `inputs` value provided
    """

    dataset_name: RuntimeParameter[str] = Field(
        default=None, description="The name of the dataset in Argilla."
    )
    dataset_workspace: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The workspace where the dataset will be created in Argilla. Defaults"
        "to `None` which means it will be created in the default workspace.",
    )

    api_url: Optional[RuntimeParameter[str]] = Field(
        default_factory=lambda: os.getenv("ARGILLA_API_URL"),
        description="The base URL to use for the Argilla API requests.",
    )
    api_key: Optional[RuntimeParameter[SecretStr]] = Field(
        default_factory=lambda: os.getenv(_ARGILLA_API_KEY_ENV_VAR_NAME),
        description="The API key to authenticate the requests to the Argilla API.",
    )

    _rg_dataset: Optional["RemoteFeedbackDataset"] = PrivateAttr(...)

    def model_post_init(self, __context: Any) -> None:
        """Checks that the Argilla Python SDK is installed, and then filters the Argilla warnings."""
        super().model_post_init(__context)

        try:
            import argilla as rg  # noqa
        except ImportError as ie:
            raise ImportError(
                "Argilla is not installed. Please install it using `pip install argilla`."
            ) from ie

        warnings.filterwarnings("ignore")

    def _rg_init(self) -> None:
        """Initializes the Argilla API client with the provided `api_url` and `api_key`."""
        try:
            if "hf.space" in self.api_url and "HF_TOKEN" in os.environ:
                headers = {"Authorization": f"Bearer {os.environ['HF_TOKEN']}"}
            else:
                headers = None
            rg.init(
                api_url=self.api_url,
                api_key=self.api_key.get_secret_value(),
                extra_headers=headers,
            )  # type: ignore
        except Exception as e:
            raise ValueError(f"Failed to initialize the Argilla API: {e}") from e

    def _rg_dataset_exists(self) -> bool:
        """Checks if the dataset already exists in Argilla."""
        return self.dataset_name in [
            dataset.name
            for dataset in rg.FeedbackDataset.list(workspace=self.dataset_workspace)  # type: ignore
        ]

    @property
    def outputs(self) -> List[str]:
        """The outputs of the step is an empty list, since the steps subclassing from this one, will
        always be leaf nodes and won't propagate the inputs neither generate any outputs.
        """
        return []

    def load(self) -> None:
        """Method to perform any initialization logic before the `process` method is
        called. For example, to load an LLM, stablish a connection to a database, etc.
        """
        super().load()

        self._rg_init()

    @property
    @abstractmethod
    def inputs(self) -> List[str]:
        ...

    @abstractmethod
    def process(self, *inputs: StepInput) -> "StepOutput":
        ...

outputs: List[str] property

The outputs of the step is an empty list, since the steps subclassing from this one, will always be leaf nodes and won't propagate the inputs neither generate any outputs.

load()

Method to perform any initialization logic before the process method is called. For example, to load an LLM, stablish a connection to a database, etc.

Source code in src/distilabel/steps/argilla/base.py
def load(self) -> None:
    """Method to perform any initialization logic before the `process` method is
    called. For example, to load an LLM, stablish a connection to a database, etc.
    """
    super().load()

    self._rg_init()

model_post_init(__context)

Checks that the Argilla Python SDK is installed, and then filters the Argilla warnings.

Source code in src/distilabel/steps/argilla/base.py
def model_post_init(self, __context: Any) -> None:
    """Checks that the Argilla Python SDK is installed, and then filters the Argilla warnings."""
    super().model_post_init(__context)

    try:
        import argilla as rg  # noqa
    except ImportError as ie:
        raise ImportError(
            "Argilla is not installed. Please install it using `pip install argilla`."
        ) from ie

    warnings.filterwarnings("ignore")

PreferenceToArgilla

Bases: Argilla

Creates a preference dataset in Argilla.

Step that creates a dataset in Argilla during the load phase, and then pushes the input batches into it as records. This dataset is a preference dataset, where there's one field for the instruction and one extra field per each generation within the same record, and then a rating question per each of the generation fields. The rating question asks the annotator to set a rating from 1 to 5 for each of the provided generations.

Note

This step is meant to be used in conjunction with the UltraFeedback step, or any other step generating both ratings and responses for a given set of instruction and generations for the given instruction. But alternatively, it can also be used with any other task or step generating only the instruction and generations, as the ratings and rationales are optional.

Attributes:

Name Type Description
num_generations int

The number of generations to include in the dataset.

dataset_name int

The name of the dataset in Argilla.

dataset_workspace int

The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.

api_url int

The URL of the Argilla API. Defaults to None, which means it will be read from the ARGILLA_API_URL environment variable.

api_key int

The API key to authenticate with Argilla. Defaults to None, which means it will be read from the ARGILLA_API_KEY environment variable.

Runtime parameters
  • api_url: The base URL to use for the Argilla API requests.
  • api_key: The API key to authenticate the requests to the Argilla API.
Input columns
  • instruction (str): The instruction that was used to generate the completion.
  • generations (List[str]): The completion that was generated based on the input instruction.
  • ratings (List[str], optional): The ratings for the generations. If not provided, the generated ratings won't be pushed to Argilla.
  • rationales (List[str], optional): The rationales for the ratings. If not provided, the generated rationales won't be pushed to Argilla.
Source code in src/distilabel/steps/argilla/preference.py
class PreferenceToArgilla(Argilla):
    """Creates a preference dataset in Argilla.

    Step that creates a dataset in Argilla during the load phase, and then pushes the input
    batches into it as records. This dataset is a preference dataset, where there's one field
    for the instruction and one extra field per each generation within the same record, and then
    a rating question per each of the generation fields. The rating question asks the annotator to
    set a rating from 1 to 5 for each of the provided generations.

    Note:
        This step is meant to be used in conjunction with the `UltraFeedback` step, or any other step
        generating both ratings and responses for a given set of instruction and generations for the
        given instruction. But alternatively, it can also be used with any other task or step generating
        only the `instruction` and `generations`, as the `ratings` and `rationales` are optional.

    Attributes:
        num_generations: The number of generations to include in the dataset.
        dataset_name: The name of the dataset in Argilla.
        dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to
            `None`, which means it will be created in the default workspace.
        api_url: The URL of the Argilla API. Defaults to `None`, which means it will be read from
            the `ARGILLA_API_URL` environment variable.
        api_key: The API key to authenticate with Argilla. Defaults to `None`, which means it will
            be read from the `ARGILLA_API_KEY` environment variable.

    Runtime parameters:
        - `api_url`: The base URL to use for the Argilla API requests.
        - `api_key`: The API key to authenticate the requests to the Argilla API.

    Input columns:
        - instruction (`str`): The instruction that was used to generate the completion.
        - generations (`List[str]`): The completion that was generated based on the input instruction.
        - ratings (`List[str]`, optional): The ratings for the generations. If not provided, the
            generated ratings won't be pushed to Argilla.
        - rationales (`List[str]`, optional): The rationales for the ratings. If not provided, the
            generated rationales won't be pushed to Argilla.
    """

    num_generations: int

    _id: str = PrivateAttr(default="id")
    _instruction: str = PrivateAttr(...)
    _generations: str = PrivateAttr(...)
    _ratings: str = PrivateAttr(...)
    _rationales: str = PrivateAttr(...)

    def load(self) -> None:
        """Sets the `_instruction` and `_generations` attributes based on the `inputs_mapping`, otherwise
        uses the default values; and then uses those values to create a `FeedbackDataset` suited for
        the text-generation scenario. And then it pushes it to Argilla.
        """
        super().load()

        # Both `instruction` and `generations` will be used as the fields of the dataset
        self._instruction = self.input_mappings.get("instruction", "instruction")
        self._generations = self.input_mappings.get("generations", "generations")
        # Both `ratings` and `rationales` will be used as suggestions to the default questions of the dataset
        self._ratings = self.input_mappings.get("ratings", "ratings")
        self._rationales = self.input_mappings.get("rationales", "rationales")

        if self._rg_dataset_exists():
            _rg_dataset = rg.FeedbackDataset.from_argilla(  # type: ignore
                name=self.dataset_name,
                workspace=self.dataset_workspace,
            )

            for field in _rg_dataset.fields:
                if (
                    field.name
                    not in [self._id, self._instruction]
                    + [
                        f"{self._generations}-{idx}"
                        for idx in range(self.num_generations)
                    ]
                    and field.required
                ):
                    raise ValueError(
                        f"The dataset {self.dataset_name} in the workspace {self.dataset_workspace} already exists,"
                        f" but contains at least a required field that is neither `{self._id}`, `{self._instruction}`,"
                        f" nor `{self._generations}`."
                    )

            self._rg_dataset = _rg_dataset
        else:
            _rg_dataset = rg.FeedbackDataset(  # type: ignore
                fields=[
                    rg.TextField(name=self._id, title=self._id),  # type: ignore
                    rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                    *self._generation_fields(),  # type: ignore
                ],
                questions=self._rating_rationale_pairs(),  # type: ignore
            )
            self._rg_dataset = _rg_dataset.push_to_argilla(
                name=self.dataset_name,  # type: ignore
                workspace=self.dataset_workspace,
            )

    def _generation_fields(self) -> List["TextField"]:
        """Method to generate the fields for each of the generations."""
        return [
            rg.TextField(  # type: ignore
                name=f"{self._generations}-{idx}",
                title=f"{self._generations}-{idx}",
                required=True if idx == 0 else False,
            )
            for idx in range(self.num_generations)
        ]

    def _rating_rationale_pairs(
        self,
    ) -> List[Union["RatingQuestion", "TextQuestion"]]:
        """Method to generate the rating and rationale questions for each of the generations."""
        questions = []
        for idx in range(self.num_generations):
            questions.extend(
                [
                    rg.RatingQuestion(  # type: ignore
                        name=f"{self._generations}-{idx}-rating",
                        title=f"Rate {self._generations}-{idx} given {self._instruction}.",
                        description=f"Ignore this question if the corresponding `{self._generations}-{idx}` field is not available."
                        if idx != 0
                        else None,
                        values=[1, 2, 3, 4, 5],
                        required=True if idx == 0 else False,
                    ),
                    rg.TextQuestion(  # type: ignore
                        name=f"{self._generations}-{idx}-rationale",
                        title=f"Specify the rationale for {self._generations}-{idx}'s rating.",
                        description=f"Ignore this question if the corresponding `{self._generations}-{idx}` field is not available."
                        if idx != 0
                        else None,
                        required=False,
                    ),
                ]
            )
        return questions

    @property
    def inputs(self) -> List[str]:
        """The inputs for the step are the `instruction` and the `generations`. Optionally, one could also
        provide the `ratings` and the `rationales` for the generations."""
        return ["instruction", "generations"]

    def _add_suggestions_if_any(
        self, input: Dict[str, Any]
    ) -> List["SuggestionSchema"]:
        """Method to generate the suggestions for the `FeedbackRecord` based on the input."""
        # Since the `suggestions` i.e. answers to the `questions` are optional, will default to {}
        suggestions = []
        # If `ratings` is in `input`, then add those as suggestions
        if self._ratings in input:
            suggestions.extend(
                [
                    {
                        "question_name": f"{self._generations}-{idx}-rating",
                        "value": rating,
                    }
                    for idx, rating in enumerate(input[self._ratings])
                    if rating is not None
                    and isinstance(rating, int)
                    and rating in [1, 2, 3, 4, 5]
                ],
            )
        # If `rationales` is in `input`, then add those as suggestions
        if self._rationales in input:
            suggestions.extend(
                [
                    {
                        "question_name": f"{self._generations}-{idx}-rationale",
                        "value": rationale,
                    }
                    for idx, rationale in enumerate(input[self._rationales])
                    if rationale is not None and isinstance(rationale, str)
                ],
            )
        return suggestions

    @override
    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

        Args:
            inputs: A list of Python dictionaries with the inputs of the task.

        Returns:
            A list of Python dictionaries with the outputs of the task.
        """
        records = []
        for input in inputs:
            # Generate the SHA-256 hash of the instruction to use it as the metadata
            instruction_id = hashlib.sha256(
                input["instruction"].encode("utf-8")  # type: ignore
            ).hexdigest()

            generations = {
                f"{self._generations}-{idx}": generation
                for idx, generation in enumerate(input["generations"])  # type: ignore
            }

            records.append(  # type: ignore
                rg.FeedbackRecord(  # type: ignore
                    fields={
                        "id": instruction_id,
                        "instruction": input["instruction"],  # type: ignore
                        **generations,
                    },
                    suggestions=self._add_suggestions_if_any(input),  # type: ignore
                )
            )
        self._rg_dataset.add_records(records)  # type: ignore
        yield inputs

inputs: List[str] property

The inputs for the step are the instruction and the generations. Optionally, one could also provide the ratings and the rationales for the generations.

load()

Sets the _instruction and _generations attributes based on the inputs_mapping, otherwise uses the default values; and then uses those values to create a FeedbackDataset suited for the text-generation scenario. And then it pushes it to Argilla.

Source code in src/distilabel/steps/argilla/preference.py
def load(self) -> None:
    """Sets the `_instruction` and `_generations` attributes based on the `inputs_mapping`, otherwise
    uses the default values; and then uses those values to create a `FeedbackDataset` suited for
    the text-generation scenario. And then it pushes it to Argilla.
    """
    super().load()

    # Both `instruction` and `generations` will be used as the fields of the dataset
    self._instruction = self.input_mappings.get("instruction", "instruction")
    self._generations = self.input_mappings.get("generations", "generations")
    # Both `ratings` and `rationales` will be used as suggestions to the default questions of the dataset
    self._ratings = self.input_mappings.get("ratings", "ratings")
    self._rationales = self.input_mappings.get("rationales", "rationales")

    if self._rg_dataset_exists():
        _rg_dataset = rg.FeedbackDataset.from_argilla(  # type: ignore
            name=self.dataset_name,
            workspace=self.dataset_workspace,
        )

        for field in _rg_dataset.fields:
            if (
                field.name
                not in [self._id, self._instruction]
                + [
                    f"{self._generations}-{idx}"
                    for idx in range(self.num_generations)
                ]
                and field.required
            ):
                raise ValueError(
                    f"The dataset {self.dataset_name} in the workspace {self.dataset_workspace} already exists,"
                    f" but contains at least a required field that is neither `{self._id}`, `{self._instruction}`,"
                    f" nor `{self._generations}`."
                )

        self._rg_dataset = _rg_dataset
    else:
        _rg_dataset = rg.FeedbackDataset(  # type: ignore
            fields=[
                rg.TextField(name=self._id, title=self._id),  # type: ignore
                rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                *self._generation_fields(),  # type: ignore
            ],
            questions=self._rating_rationale_pairs(),  # type: ignore
        )
        self._rg_dataset = _rg_dataset.push_to_argilla(
            name=self.dataset_name,  # type: ignore
            workspace=self.dataset_workspace,
        )

process(inputs)

Creates and pushes the records as FeedbackRecords to the Argilla dataset.

Parameters:

Name Type Description Default
inputs StepInput

A list of Python dictionaries with the inputs of the task.

required

Returns:

Type Description
StepOutput

A list of Python dictionaries with the outputs of the task.

Source code in src/distilabel/steps/argilla/preference.py
@override
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

    Args:
        inputs: A list of Python dictionaries with the inputs of the task.

    Returns:
        A list of Python dictionaries with the outputs of the task.
    """
    records = []
    for input in inputs:
        # Generate the SHA-256 hash of the instruction to use it as the metadata
        instruction_id = hashlib.sha256(
            input["instruction"].encode("utf-8")  # type: ignore
        ).hexdigest()

        generations = {
            f"{self._generations}-{idx}": generation
            for idx, generation in enumerate(input["generations"])  # type: ignore
        }

        records.append(  # type: ignore
            rg.FeedbackRecord(  # type: ignore
                fields={
                    "id": instruction_id,
                    "instruction": input["instruction"],  # type: ignore
                    **generations,
                },
                suggestions=self._add_suggestions_if_any(input),  # type: ignore
            )
        )
    self._rg_dataset.add_records(records)  # type: ignore
    yield inputs

TextGenerationToArgilla

Bases: Argilla

Creates a text generation dataset in Argilla.

Step that creates a dataset in Argilla during the load phase, and then pushes the input batches into it as records. This dataset is a text-generation dataset, where there's one field per each input, and then a label question to rate the quality of the completion in either bad (represented with 👎) or good (represented with 👍).

Note

This step is meant to be used in conjunction with a TextGeneration step and no column mapping is needed, as it will use the default values for the instruction and generation columns.

Attributes:

Name Type Description
dataset_name

The name of the dataset in Argilla.

dataset_workspace

The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.

api_url

The URL of the Argilla API. Defaults to None, which means it will be read from the ARGILLA_API_URL environment variable.

api_key

The API key to authenticate with Argilla. Defaults to None, which means it will be read from the ARGILLA_API_KEY environment variable.

Runtime parameters
  • api_url: The base URL to use for the Argilla API requests.
  • api_key: The API key to authenticate the requests to the Argilla API.
Input columns
  • instruction (str): The instruction that was used to generate the completion.
  • generation (str or List[str]): The completions that were generated based on the input instruction.
Source code in src/distilabel/steps/argilla/text_generation.py
class TextGenerationToArgilla(Argilla):
    """Creates a text generation dataset in Argilla.

    `Step` that creates a dataset in Argilla during the load phase, and then pushes the input
    batches into it as records. This dataset is a text-generation dataset, where there's one field
    per each input, and then a label question to rate the quality of the completion in either bad
    (represented with 👎) or good (represented with 👍).

    Note:
        This step is meant to be used in conjunction with a `TextGeneration` step and no column mapping
        is needed, as it will use the default values for the `instruction` and `generation` columns.

    Attributes:
        dataset_name: The name of the dataset in Argilla.
        dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to
            `None`, which means it will be created in the default workspace.
        api_url: The URL of the Argilla API. Defaults to `None`, which means it will be read from
            the `ARGILLA_API_URL` environment variable.
        api_key: The API key to authenticate with Argilla. Defaults to `None`, which means it will
            be read from the `ARGILLA_API_KEY` environment variable.

    Runtime parameters:
        - `api_url`: The base URL to use for the Argilla API requests.
        - `api_key`: The API key to authenticate the requests to the Argilla API.

    Input columns:
        - instruction (`str`): The instruction that was used to generate the completion.
        - generation (`str` or `List[str]`): The completions that were generated based on the input instruction.
    """

    _id: str = PrivateAttr(default="id")
    _instruction: str = PrivateAttr(...)
    _generation: str = PrivateAttr(...)

    def load(self) -> None:
        """Sets the `_instruction` and `_generation` attributes based on the `inputs_mapping`, otherwise
        uses the default values; and then uses those values to create a `FeedbackDataset` suited for
        the text-generation scenario. And then it pushes it to Argilla.
        """
        super().load()

        self._instruction = self.input_mappings.get("instruction", "instruction")
        self._generation = self.input_mappings.get("generation", "generation")

        if self._rg_dataset_exists():
            _rg_dataset = rg.FeedbackDataset.from_argilla(  # type: ignore
                name=self.dataset_name,
                workspace=self.dataset_workspace,
            )

            for field in _rg_dataset.fields:
                if (
                    field.name not in [self._id, self._instruction, self._generation]
                    and field.required
                ):
                    raise ValueError(
                        f"The dataset {self.dataset_name} in the workspace {self.dataset_workspace} already exists,"
                        f" but contains at least a required field that is neither `{self._id}`, `{self._instruction}`"
                        f", nor `{self._generation}`."
                    )

            self._rg_dataset = _rg_dataset
        else:
            _rg_dataset = rg.FeedbackDataset(  # type: ignore
                fields=[
                    rg.TextField(name=self._id, title=self._id),  # type: ignore
                    rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                    rg.TextField(name=self._generation, title=self._generation),  # type: ignore
                ],
                questions=[
                    rg.LabelQuestion(  # type: ignore
                        name="quality",
                        title=f"What's the quality of the {self._generation} for the given {self._instruction}?",
                        labels={"bad": "👎", "good": "👍"},
                    )
                ],
            )
            self._rg_dataset = _rg_dataset.push_to_argilla(
                name=self.dataset_name,  # type: ignore
                workspace=self.dataset_workspace,
            )

    @property
    def inputs(self) -> List[str]:
        """The inputs for the step are the `instruction` and the `generation`."""
        return ["instruction", "generation"]

    @override
    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

        Args:
            inputs: A list of Python dictionaries with the inputs of the task.

        Returns:
            A list of Python dictionaries with the outputs of the task.
        """
        records = []
        for input in inputs:
            # Generate the SHA-256 hash of the instruction to use it as the metadata
            instruction_id = hashlib.sha256(
                input["instruction"].encode("utf-8")
            ).hexdigest()

            generations = input["generation"]

            # If the `generation` is not a list, then convert it into a list
            if not isinstance(generations, list):
                generations = [generations]

            # Create a `generations_set` to avoid adding duplicates
            generations_set = set()

            for generation in generations:
                # If the generation is already in the set, then skip it
                if generation in generations_set:
                    continue
                # Otherwise, add it to the set
                generations_set.add(generation)

                records.append(
                    rg.FeedbackRecord(  # type: ignore
                        fields={
                            self._id: instruction_id,
                            self._instruction: input["instruction"],
                            self._generation: generation,
                        },
                    )
                )
        self._rg_dataset.add_records(records)  # type: ignore
        yield inputs

inputs: List[str] property

The inputs for the step are the instruction and the generation.

load()

Sets the _instruction and _generation attributes based on the inputs_mapping, otherwise uses the default values; and then uses those values to create a FeedbackDataset suited for the text-generation scenario. And then it pushes it to Argilla.

Source code in src/distilabel/steps/argilla/text_generation.py
def load(self) -> None:
    """Sets the `_instruction` and `_generation` attributes based on the `inputs_mapping`, otherwise
    uses the default values; and then uses those values to create a `FeedbackDataset` suited for
    the text-generation scenario. And then it pushes it to Argilla.
    """
    super().load()

    self._instruction = self.input_mappings.get("instruction", "instruction")
    self._generation = self.input_mappings.get("generation", "generation")

    if self._rg_dataset_exists():
        _rg_dataset = rg.FeedbackDataset.from_argilla(  # type: ignore
            name=self.dataset_name,
            workspace=self.dataset_workspace,
        )

        for field in _rg_dataset.fields:
            if (
                field.name not in [self._id, self._instruction, self._generation]
                and field.required
            ):
                raise ValueError(
                    f"The dataset {self.dataset_name} in the workspace {self.dataset_workspace} already exists,"
                    f" but contains at least a required field that is neither `{self._id}`, `{self._instruction}`"
                    f", nor `{self._generation}`."
                )

        self._rg_dataset = _rg_dataset
    else:
        _rg_dataset = rg.FeedbackDataset(  # type: ignore
            fields=[
                rg.TextField(name=self._id, title=self._id),  # type: ignore
                rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                rg.TextField(name=self._generation, title=self._generation),  # type: ignore
            ],
            questions=[
                rg.LabelQuestion(  # type: ignore
                    name="quality",
                    title=f"What's the quality of the {self._generation} for the given {self._instruction}?",
                    labels={"bad": "👎", "good": "👍"},
                )
            ],
        )
        self._rg_dataset = _rg_dataset.push_to_argilla(
            name=self.dataset_name,  # type: ignore
            workspace=self.dataset_workspace,
        )

process(inputs)

Creates and pushes the records as FeedbackRecords to the Argilla dataset.

Parameters:

Name Type Description Default
inputs StepInput

A list of Python dictionaries with the inputs of the task.

required

Returns:

Type Description
StepOutput

A list of Python dictionaries with the outputs of the task.

Source code in src/distilabel/steps/argilla/text_generation.py
@override
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

    Args:
        inputs: A list of Python dictionaries with the inputs of the task.

    Returns:
        A list of Python dictionaries with the outputs of the task.
    """
    records = []
    for input in inputs:
        # Generate the SHA-256 hash of the instruction to use it as the metadata
        instruction_id = hashlib.sha256(
            input["instruction"].encode("utf-8")
        ).hexdigest()

        generations = input["generation"]

        # If the `generation` is not a list, then convert it into a list
        if not isinstance(generations, list):
            generations = [generations]

        # Create a `generations_set` to avoid adding duplicates
        generations_set = set()

        for generation in generations:
            # If the generation is already in the set, then skip it
            if generation in generations_set:
                continue
            # Otherwise, add it to the set
            generations_set.add(generation)

            records.append(
                rg.FeedbackRecord(  # type: ignore
                    fields={
                        self._id: instruction_id,
                        self._instruction: input["instruction"],
                        self._generation: generation,
                    },
                )
            )
    self._rg_dataset.add_records(records)  # type: ignore
    yield inputs