Skip to content


This section contains the existing steps integrated with Argilla so as to easily push the generated datasets to Argilla.


Bases: Step, ABC

Abstract step that provides a class to subclass from, that contains the boilerplate code required to interact with Argilla, as well as some extra validations on top of it. It also defines the abstract methods that need to be implemented in order to add a new dataset type as a step.


This class is not intended to be instanced directly, but via subclass.


Name Type Description
dataset_name RuntimeParameter[str]

The name of the dataset in Argilla where the records will be added.

dataset_workspace Optional[RuntimeParameter[str]]

The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.

api_url Optional[RuntimeParameter[str]]

The URL of the Argilla API. Defaults to None, which means it will be read from the ARGILLA_API_URL environment variable.

api_key Optional[RuntimeParameter[SecretStr]]

The API key to authenticate with Argilla. Defaults to None, which means it will be read from the ARGILLA_API_KEY environment variable.

Runtime parameters
  • dataset_name: The name of the dataset in Argilla where the records will be added.
  • dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.
  • api_url: The base URL to use for the Argilla API requests.
  • api_key: The API key to authenticate the requests to the Argilla API.
Input columns
  • dynamic, based on the inputs value provided
Source code in src/distilabel/steps/argilla/
class Argilla(Step, ABC):
    """Abstract step that provides a class to subclass from, that contains the boilerplate code
    required to interact with Argilla, as well as some extra validations on top of it. It also defines
    the abstract methods that need to be implemented in order to add a new dataset type as a step.

        This class is not intended to be instanced directly, but via subclass.

        dataset_name: The name of the dataset in Argilla where the records will be added.
        dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to
            `None`, which means it will be created in the default workspace.
        api_url: The URL of the Argilla API. Defaults to `None`, which means it will be read from
            the `ARGILLA_API_URL` environment variable.
        api_key: The API key to authenticate with Argilla. Defaults to `None`, which means it will
            be read from the `ARGILLA_API_KEY` environment variable.

    Runtime parameters:
        - `dataset_name`: The name of the dataset in Argilla where the records will be
        - `dataset_workspace`: The workspace where the dataset will be created in Argilla.
            Defaults to `None`, which means it will be created in the default workspace.
        - `api_url`: The base URL to use for the Argilla API requests.
        - `api_key`: The API key to authenticate the requests to the Argilla API.

    Input columns:
        - dynamic, based on the `inputs` value provided

    dataset_name: RuntimeParameter[str] = Field(
        default=None, description="The name of the dataset in Argilla."
    dataset_workspace: Optional[RuntimeParameter[str]] = Field(
        description="The workspace where the dataset will be created in Argilla. Defaults"
        "to `None` which means it will be created in the default workspace.",

    api_url: Optional[RuntimeParameter[str]] = Field(
        default_factory=lambda: os.getenv("ARGILLA_API_URL"),
        description="The base URL to use for the Argilla API requests.",
    api_key: Optional[RuntimeParameter[SecretStr]] = Field(
        default_factory=lambda: os.getenv(_ARGILLA_API_KEY_ENV_VAR_NAME),
        description="The API key to authenticate the requests to the Argilla API.",

    _rg_dataset: Optional["RemoteFeedbackDataset"] = PrivateAttr(...)

    def model_post_init(self, __context: Any) -> None:
        """Checks that the Argilla Python SDK is installed, and then filters the Argilla warnings."""

            import argilla as rg  # noqa
        except ImportError as ie:
            raise ImportError(
                "Argilla is not installed. Please install it using `pip install argilla`."
            ) from ie


    def _rg_init(self) -> None:
        """Initializes the Argilla API client with the provided `api_url` and `api_key`."""
            if "" in self.api_url and "HF_TOKEN" in os.environ:
                headers = {"Authorization": f"Bearer {os.environ['HF_TOKEN']}"}
                headers = None
            )  # type: ignore
        except Exception as e:
            raise ValueError(f"Failed to initialize the Argilla API: {e}") from e

    def _rg_dataset_exists(self) -> bool:
        """Checks if the dataset already exists in Argilla."""
        return self.dataset_name in [
            for dataset in rg.FeedbackDataset.list(workspace=self.dataset_workspace)  # type: ignore

    def outputs(self) -> List[str]:
        """The outputs of the step is an empty list, since the steps subclassing from this one, will
        always be leaf nodes and won't propagate the inputs neither generate any outputs.
        return []

    def load(self) -> None:
        """Method to perform any initialization logic before the `process` method is
        called. For example, to load an LLM, stablish a connection to a database, etc.


    def inputs(self) -> List[str]:

    def process(self, *inputs: StepInput) -> "StepOutput":

outputs: List[str] property

The outputs of the step is an empty list, since the steps subclassing from this one, will always be leaf nodes and won't propagate the inputs neither generate any outputs.


Method to perform any initialization logic before the process method is called. For example, to load an LLM, stablish a connection to a database, etc.

Source code in src/distilabel/steps/argilla/
def load(self) -> None:
    """Method to perform any initialization logic before the `process` method is
    called. For example, to load an LLM, stablish a connection to a database, etc.



Checks that the Argilla Python SDK is installed, and then filters the Argilla warnings.

Source code in src/distilabel/steps/argilla/
def model_post_init(self, __context: Any) -> None:
    """Checks that the Argilla Python SDK is installed, and then filters the Argilla warnings."""

        import argilla as rg  # noqa
    except ImportError as ie:
        raise ImportError(
            "Argilla is not installed. Please install it using `pip install argilla`."
        ) from ie



Bases: Argilla

Creates a preference dataset in Argilla.

Step that creates a dataset in Argilla during the load phase, and then pushes the input batches into it as records. This dataset is a preference dataset, where there's one field for the instruction and one extra field per each generation within the same record, and then a rating question per each of the generation fields. The rating question asks the annotator to set a rating from 1 to 5 for each of the provided generations.


This step is meant to be used in conjunction with the UltraFeedback step, or any other step generating both ratings and responses for a given set of instruction and generations for the given instruction. But alternatively, it can also be used with any other task or step generating only the instruction and generations, as the ratings and rationales are optional.


Name Type Description
num_generations int

The number of generations to include in the dataset.

dataset_name int

The name of the dataset in Argilla.

dataset_workspace int

The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.

api_url int

The URL of the Argilla API. Defaults to None, which means it will be read from the ARGILLA_API_URL environment variable.

api_key int

The API key to authenticate with Argilla. Defaults to None, which means it will be read from the ARGILLA_API_KEY environment variable.

Runtime parameters
  • api_url: The base URL to use for the Argilla API requests.
  • api_key: The API key to authenticate the requests to the Argilla API.
Input columns
  • instruction (str): The instruction that was used to generate the completion.
  • generations (List[str]): The completion that was generated based on the input instruction.
  • ratings (List[str], optional): The ratings for the generations. If not provided, the generated ratings won't be pushed to Argilla.
  • rationales (List[str], optional): The rationales for the ratings. If not provided, the generated rationales won't be pushed to Argilla.
Source code in src/distilabel/steps/argilla/
class PreferenceToArgilla(Argilla):
    """Creates a preference dataset in Argilla.

    Step that creates a dataset in Argilla during the load phase, and then pushes the input
    batches into it as records. This dataset is a preference dataset, where there's one field
    for the instruction and one extra field per each generation within the same record, and then
    a rating question per each of the generation fields. The rating question asks the annotator to
    set a rating from 1 to 5 for each of the provided generations.

        This step is meant to be used in conjunction with the `UltraFeedback` step, or any other step
        generating both ratings and responses for a given set of instruction and generations for the
        given instruction. But alternatively, it can also be used with any other task or step generating
        only the `instruction` and `generations`, as the `ratings` and `rationales` are optional.

        num_generations: The number of generations to include in the dataset.
        dataset_name: The name of the dataset in Argilla.
        dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to
            `None`, which means it will be created in the default workspace.
        api_url: The URL of the Argilla API. Defaults to `None`, which means it will be read from
            the `ARGILLA_API_URL` environment variable.
        api_key: The API key to authenticate with Argilla. Defaults to `None`, which means it will
            be read from the `ARGILLA_API_KEY` environment variable.

    Runtime parameters:
        - `api_url`: The base URL to use for the Argilla API requests.
        - `api_key`: The API key to authenticate the requests to the Argilla API.

    Input columns:
        - instruction (`str`): The instruction that was used to generate the completion.
        - generations (`List[str]`): The completion that was generated based on the input instruction.
        - ratings (`List[str]`, optional): The ratings for the generations. If not provided, the
            generated ratings won't be pushed to Argilla.
        - rationales (`List[str]`, optional): The rationales for the ratings. If not provided, the
            generated rationales won't be pushed to Argilla.

    num_generations: int

    _id: str = PrivateAttr(default="id")
    _instruction: str = PrivateAttr(...)
    _generations: str = PrivateAttr(...)
    _ratings: str = PrivateAttr(...)
    _rationales: str = PrivateAttr(...)

    def load(self) -> None:
        """Sets the `_instruction` and `_generations` attributes based on the `inputs_mapping`, otherwise
        uses the default values; and then uses those values to create a `FeedbackDataset` suited for
        the text-generation scenario. And then it pushes it to Argilla.

        # Both `instruction` and `generations` will be used as the fields of the dataset
        self._instruction = self.input_mappings.get("instruction", "instruction")
        self._generations = self.input_mappings.get("generations", "generations")
        # Both `ratings` and `rationales` will be used as suggestions to the default questions of the dataset
        self._ratings = self.input_mappings.get("ratings", "ratings")
        self._rationales = self.input_mappings.get("rationales", "rationales")

        if self._rg_dataset_exists():
            _rg_dataset = rg.FeedbackDataset.from_argilla(  # type: ignore

            for field in _rg_dataset.fields:
                if (
                    not in [self._id, self._instruction]
                    + [
                        for idx in range(self.num_generations)
                    and field.required
                    raise ValueError(
                        f"The dataset {self.dataset_name} in the workspace {self.dataset_workspace} already exists,"
                        f" but contains at least a required field that is neither `{self._id}`, `{self._instruction}`,"
                        f" nor `{self._generations}`."

            self._rg_dataset = _rg_dataset
            _rg_dataset = rg.FeedbackDataset(  # type: ignore
                    rg.TextField(name=self._id, title=self._id),  # type: ignore
                    rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                    *self._generation_fields(),  # type: ignore
                questions=self._rating_rationale_pairs(),  # type: ignore
            self._rg_dataset = _rg_dataset.push_to_argilla(
                name=self.dataset_name,  # type: ignore

    def _generation_fields(self) -> List["TextField"]:
        """Method to generate the fields for each of the generations."""
        return [
            rg.TextField(  # type: ignore
                required=True if idx == 0 else False,
            for idx in range(self.num_generations)

    def _rating_rationale_pairs(
    ) -> List[Union["RatingQuestion", "TextQuestion"]]:
        """Method to generate the rating and rationale questions for each of the generations."""
        questions = []
        for idx in range(self.num_generations):
                    rg.RatingQuestion(  # type: ignore
                        title=f"Rate {self._generations}-{idx} given {self._instruction}.",
                        description=f"Ignore this question if the corresponding `{self._generations}-{idx}` field is not available."
                        if idx != 0
                        else None,
                        values=[1, 2, 3, 4, 5],
                        required=True if idx == 0 else False,
                    rg.TextQuestion(  # type: ignore
                        title=f"Specify the rationale for {self._generations}-{idx}'s rating.",
                        description=f"Ignore this question if the corresponding `{self._generations}-{idx}` field is not available."
                        if idx != 0
                        else None,
        return questions

    def inputs(self) -> List[str]:
        """The inputs for the step are the `instruction` and the `generations`. Optionally, one could also
        provide the `ratings` and the `rationales` for the generations."""
        return ["instruction", "generations"]

    def _add_suggestions_if_any(
        self, input: Dict[str, Any]
    ) -> List["SuggestionSchema"]:
        """Method to generate the suggestions for the `FeedbackRecord` based on the input."""
        # Since the `suggestions` i.e. answers to the `questions` are optional, will default to {}
        suggestions = []
        # If `ratings` is in `input`, then add those as suggestions
        if self._ratings in input:
                        "question_name": f"{self._generations}-{idx}-rating",
                        "value": rating,
                    for idx, rating in enumerate(input[self._ratings])
                    if rating is not None
                    and isinstance(rating, int)
                    and rating in [1, 2, 3, 4, 5]
        # If `rationales` is in `input`, then add those as suggestions
        if self._rationales in input:
                        "question_name": f"{self._generations}-{idx}-rationale",
                        "value": rationale,
                    for idx, rationale in enumerate(input[self._rationales])
                    if rationale is not None and isinstance(rationale, str)
        return suggestions

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

            inputs: A list of Python dictionaries with the inputs of the task.

            A list of Python dictionaries with the outputs of the task.
        records = []
        for input in inputs:
            # Generate the SHA-256 hash of the instruction to use it as the metadata
            instruction_id = hashlib.sha256(
                input["instruction"].encode("utf-8")  # type: ignore

            generations = {
                f"{self._generations}-{idx}": generation
                for idx, generation in enumerate(input["generations"])  # type: ignore

            records.append(  # type: ignore
                rg.FeedbackRecord(  # type: ignore
                        "id": instruction_id,
                        "instruction": input["instruction"],  # type: ignore
                    suggestions=self._add_suggestions_if_any(input),  # type: ignore
        self._rg_dataset.add_records(records)  # type: ignore
        yield inputs

inputs: List[str] property

The inputs for the step are the instruction and the generations. Optionally, one could also provide the ratings and the rationales for the generations.


Sets the _instruction and _generations attributes based on the inputs_mapping, otherwise uses the default values; and then uses those values to create a FeedbackDataset suited for the text-generation scenario. And then it pushes it to Argilla.

Source code in src/distilabel/steps/argilla/
def load(self) -> None:
    """Sets the `_instruction` and `_generations` attributes based on the `inputs_mapping`, otherwise
    uses the default values; and then uses those values to create a `FeedbackDataset` suited for
    the text-generation scenario. And then it pushes it to Argilla.

    # Both `instruction` and `generations` will be used as the fields of the dataset
    self._instruction = self.input_mappings.get("instruction", "instruction")
    self._generations = self.input_mappings.get("generations", "generations")
    # Both `ratings` and `rationales` will be used as suggestions to the default questions of the dataset
    self._ratings = self.input_mappings.get("ratings", "ratings")
    self._rationales = self.input_mappings.get("rationales", "rationales")

    if self._rg_dataset_exists():
        _rg_dataset = rg.FeedbackDataset.from_argilla(  # type: ignore

        for field in _rg_dataset.fields:
            if (
                not in [self._id, self._instruction]
                + [
                    for idx in range(self.num_generations)
                and field.required
                raise ValueError(
                    f"The dataset {self.dataset_name} in the workspace {self.dataset_workspace} already exists,"
                    f" but contains at least a required field that is neither `{self._id}`, `{self._instruction}`,"
                    f" nor `{self._generations}`."

        self._rg_dataset = _rg_dataset
        _rg_dataset = rg.FeedbackDataset(  # type: ignore
                rg.TextField(name=self._id, title=self._id),  # type: ignore
                rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                *self._generation_fields(),  # type: ignore
            questions=self._rating_rationale_pairs(),  # type: ignore
        self._rg_dataset = _rg_dataset.push_to_argilla(
            name=self.dataset_name,  # type: ignore


Creates and pushes the records as FeedbackRecords to the Argilla dataset.


Name Type Description Default
inputs StepInput

A list of Python dictionaries with the inputs of the task.



Type Description

A list of Python dictionaries with the outputs of the task.

Source code in src/distilabel/steps/argilla/
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

        inputs: A list of Python dictionaries with the inputs of the task.

        A list of Python dictionaries with the outputs of the task.
    records = []
    for input in inputs:
        # Generate the SHA-256 hash of the instruction to use it as the metadata
        instruction_id = hashlib.sha256(
            input["instruction"].encode("utf-8")  # type: ignore

        generations = {
            f"{self._generations}-{idx}": generation
            for idx, generation in enumerate(input["generations"])  # type: ignore

        records.append(  # type: ignore
            rg.FeedbackRecord(  # type: ignore
                    "id": instruction_id,
                    "instruction": input["instruction"],  # type: ignore
                suggestions=self._add_suggestions_if_any(input),  # type: ignore
    self._rg_dataset.add_records(records)  # type: ignore
    yield inputs


Bases: Argilla

Creates a text generation dataset in Argilla.

Step that creates a dataset in Argilla during the load phase, and then pushes the input batches into it as records. This dataset is a text-generation dataset, where there's one field per each input, and then a label question to rate the quality of the completion in either bad (represented with 👎) or good (represented with 👍).


This step is meant to be used in conjunction with a TextGeneration step and no column mapping is needed, as it will use the default values for the instruction and generation columns.


Name Type Description

The name of the dataset in Argilla.


The workspace where the dataset will be created in Argilla. Defaults to None, which means it will be created in the default workspace.


The URL of the Argilla API. Defaults to None, which means it will be read from the ARGILLA_API_URL environment variable.


The API key to authenticate with Argilla. Defaults to None, which means it will be read from the ARGILLA_API_KEY environment variable.

Runtime parameters
  • api_url: The base URL to use for the Argilla API requests.
  • api_key: The API key to authenticate the requests to the Argilla API.
Input columns
  • instruction (str): The instruction that was used to generate the completion.
  • generation (str or List[str]): The completions that were generated based on the input instruction.
Source code in src/distilabel/steps/argilla/
class TextGenerationToArgilla(Argilla):
    """Creates a text generation dataset in Argilla.

    `Step` that creates a dataset in Argilla during the load phase, and then pushes the input
    batches into it as records. This dataset is a text-generation dataset, where there's one field
    per each input, and then a label question to rate the quality of the completion in either bad
    (represented with 👎) or good (represented with 👍).

        This step is meant to be used in conjunction with a `TextGeneration` step and no column mapping
        is needed, as it will use the default values for the `instruction` and `generation` columns.

        dataset_name: The name of the dataset in Argilla.
        dataset_workspace: The workspace where the dataset will be created in Argilla. Defaults to
            `None`, which means it will be created in the default workspace.
        api_url: The URL of the Argilla API. Defaults to `None`, which means it will be read from
            the `ARGILLA_API_URL` environment variable.
        api_key: The API key to authenticate with Argilla. Defaults to `None`, which means it will
            be read from the `ARGILLA_API_KEY` environment variable.

    Runtime parameters:
        - `api_url`: The base URL to use for the Argilla API requests.
        - `api_key`: The API key to authenticate the requests to the Argilla API.

    Input columns:
        - instruction (`str`): The instruction that was used to generate the completion.
        - generation (`str` or `List[str]`): The completions that were generated based on the input instruction.

    _id: str = PrivateAttr(default="id")
    _instruction: str = PrivateAttr(...)
    _generation: str = PrivateAttr(...)

    def load(self) -> None:
        """Sets the `_instruction` and `_generation` attributes based on the `inputs_mapping`, otherwise
        uses the default values; and then uses those values to create a `FeedbackDataset` suited for
        the text-generation scenario. And then it pushes it to Argilla.

        self._instruction = self.input_mappings.get("instruction", "instruction")
        self._generation = self.input_mappings.get("generation", "generation")

        if self._rg_dataset_exists():
            _rg_dataset = rg.FeedbackDataset.from_argilla(  # type: ignore

            for field in _rg_dataset.fields:
                if (
           not in [self._id, self._instruction, self._generation]
                    and field.required
                    raise ValueError(
                        f"The dataset {self.dataset_name} in the workspace {self.dataset_workspace} already exists,"
                        f" but contains at least a required field that is neither `{self._id}`, `{self._instruction}`"
                        f", nor `{self._generation}`."

            self._rg_dataset = _rg_dataset
            _rg_dataset = rg.FeedbackDataset(  # type: ignore
                    rg.TextField(name=self._id, title=self._id),  # type: ignore
                    rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                    rg.TextField(name=self._generation, title=self._generation),  # type: ignore
                    rg.LabelQuestion(  # type: ignore
                        title=f"What's the quality of the {self._generation} for the given {self._instruction}?",
                        labels={"bad": "👎", "good": "👍"},
            self._rg_dataset = _rg_dataset.push_to_argilla(
                name=self.dataset_name,  # type: ignore

    def inputs(self) -> List[str]:
        """The inputs for the step are the `instruction` and the `generation`."""
        return ["instruction", "generation"]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

            inputs: A list of Python dictionaries with the inputs of the task.

            A list of Python dictionaries with the outputs of the task.
        records = []
        for input in inputs:
            # Generate the SHA-256 hash of the instruction to use it as the metadata
            instruction_id = hashlib.sha256(

            generations = input["generation"]

            # If the `generation` is not a list, then convert it into a list
            if not isinstance(generations, list):
                generations = [generations]

            # Create a `generations_set` to avoid adding duplicates
            generations_set = set()

            for generation in generations:
                # If the generation is already in the set, then skip it
                if generation in generations_set:
                # Otherwise, add it to the set

                    rg.FeedbackRecord(  # type: ignore
                            self._id: instruction_id,
                            self._instruction: input["instruction"],
                            self._generation: generation,
        self._rg_dataset.add_records(records)  # type: ignore
        yield inputs

inputs: List[str] property

The inputs for the step are the instruction and the generation.


Sets the _instruction and _generation attributes based on the inputs_mapping, otherwise uses the default values; and then uses those values to create a FeedbackDataset suited for the text-generation scenario. And then it pushes it to Argilla.

Source code in src/distilabel/steps/argilla/
def load(self) -> None:
    """Sets the `_instruction` and `_generation` attributes based on the `inputs_mapping`, otherwise
    uses the default values; and then uses those values to create a `FeedbackDataset` suited for
    the text-generation scenario. And then it pushes it to Argilla.

    self._instruction = self.input_mappings.get("instruction", "instruction")
    self._generation = self.input_mappings.get("generation", "generation")

    if self._rg_dataset_exists():
        _rg_dataset = rg.FeedbackDataset.from_argilla(  # type: ignore

        for field in _rg_dataset.fields:
            if (
       not in [self._id, self._instruction, self._generation]
                and field.required
                raise ValueError(
                    f"The dataset {self.dataset_name} in the workspace {self.dataset_workspace} already exists,"
                    f" but contains at least a required field that is neither `{self._id}`, `{self._instruction}`"
                    f", nor `{self._generation}`."

        self._rg_dataset = _rg_dataset
        _rg_dataset = rg.FeedbackDataset(  # type: ignore
                rg.TextField(name=self._id, title=self._id),  # type: ignore
                rg.TextField(name=self._instruction, title=self._instruction),  # type: ignore
                rg.TextField(name=self._generation, title=self._generation),  # type: ignore
                rg.LabelQuestion(  # type: ignore
                    title=f"What's the quality of the {self._generation} for the given {self._instruction}?",
                    labels={"bad": "👎", "good": "👍"},
        self._rg_dataset = _rg_dataset.push_to_argilla(
            name=self.dataset_name,  # type: ignore


Creates and pushes the records as FeedbackRecords to the Argilla dataset.


Name Type Description Default
inputs StepInput

A list of Python dictionaries with the inputs of the task.



Type Description

A list of Python dictionaries with the outputs of the task.

Source code in src/distilabel/steps/argilla/
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Creates and pushes the records as FeedbackRecords to the Argilla dataset.

        inputs: A list of Python dictionaries with the inputs of the task.

        A list of Python dictionaries with the outputs of the task.
    records = []
    for input in inputs:
        # Generate the SHA-256 hash of the instruction to use it as the metadata
        instruction_id = hashlib.sha256(

        generations = input["generation"]

        # If the `generation` is not a list, then convert it into a list
        if not isinstance(generations, list):
            generations = [generations]

        # Create a `generations_set` to avoid adding duplicates
        generations_set = set()

        for generation in generations:
            # If the generation is already in the set, then skip it
            if generation in generations_set:
            # Otherwise, add it to the set

                rg.FeedbackRecord(  # type: ignore
                        self._id: instruction_id,
                        self._instruction: input["instruction"],
                        self._generation: generation,
    self._rg_dataset.add_records(records)  # type: ignore
    yield inputs