Skip to content

GeneratorStep

This section contains the API reference for the GeneratorStep class.

For more information and examples on how to use existing generator steps or create custom ones, please refer to Tutorial - Step - GeneratorStep.

GeneratorStep

Bases: _Step, ABC

A special kind of Step that is able to generate data i.e. it doesn't receive any input from the previous steps.

Attributes:

Name Type Description
batch_size RuntimeParameter[int]

The number of rows that will contain the batches generated by the step. Defaults to 50.

Runtime parameters
  • batch_size: The number of rows that will contain the batches generated by the step. Defaults to 50.
Source code in src/distilabel/steps/base.py
class GeneratorStep(_Step, ABC):
    """A special kind of `Step` that is able to generate data i.e. it doesn't receive
    any input from the previous steps.

    Attributes:
        batch_size: The number of rows that will contain the batches generated by the
            step. Defaults to `50`.

    Runtime parameters:
        - `batch_size`: The number of rows that will contain the batches generated by
            the step. Defaults to `50`.
    """

    batch_size: RuntimeParameter[int] = Field(
        default=50,
        description="The number of rows that will contain the batches generated by the"
        " step.",
    )

    @abstractmethod
    def process(self, offset: int = 0) -> "GeneratorStepOutput":
        """Method that defines the generation logic of the step. It should yield the
        output rows and a boolean indicating if it's the last batch or not.

        Args:
            offset: The offset to start the generation from. Defaults to 0.

        Yields:
            The output rows and a boolean indicating if it's the last batch or not.
        """
        pass

    def process_applying_mappings(self, offset: int = 0) -> "GeneratorStepOutput":
        """Runs the `process` method of the step applying the `outputs_mappings` to the
        output rows. This is the function that should be used to run the generation logic
        of the step.

        Args:
            offset: The offset to start the generation from. Defaults to 0.

        Yields:
            The output rows and a boolean indicating if it's the last batch or not.
        """

        # If the `Step` was built using the `@step` decorator, then we need to pass
        # the runtime parameters as `kwargs`, so they can be used within the processing
        # function
        generator = (
            self.process(offset=offset)
            if not self._built_from_decorator
            else self.process(offset=offset, **self._runtime_parameters)
        )

        for output_rows, last_batch in generator:
            yield (
                [
                    {self.output_mappings.get(k, k): v for k, v in row.items()}
                    for row in output_rows
                ],
                last_batch,
            )

process(offset=0) abstractmethod

Method that defines the generation logic of the step. It should yield the output rows and a boolean indicating if it's the last batch or not.

Parameters:

Name Type Description Default
offset int

The offset to start the generation from. Defaults to 0.

0

Yields:

Type Description
GeneratorStepOutput

The output rows and a boolean indicating if it's the last batch or not.

Source code in src/distilabel/steps/base.py
@abstractmethod
def process(self, offset: int = 0) -> "GeneratorStepOutput":
    """Method that defines the generation logic of the step. It should yield the
    output rows and a boolean indicating if it's the last batch or not.

    Args:
        offset: The offset to start the generation from. Defaults to 0.

    Yields:
        The output rows and a boolean indicating if it's the last batch or not.
    """
    pass

process_applying_mappings(offset=0)

Runs the process method of the step applying the outputs_mappings to the output rows. This is the function that should be used to run the generation logic of the step.

Parameters:

Name Type Description Default
offset int

The offset to start the generation from. Defaults to 0.

0

Yields:

Type Description
GeneratorStepOutput

The output rows and a boolean indicating if it's the last batch or not.

Source code in src/distilabel/steps/base.py
def process_applying_mappings(self, offset: int = 0) -> "GeneratorStepOutput":
    """Runs the `process` method of the step applying the `outputs_mappings` to the
    output rows. This is the function that should be used to run the generation logic
    of the step.

    Args:
        offset: The offset to start the generation from. Defaults to 0.

    Yields:
        The output rows and a boolean indicating if it's the last batch or not.
    """

    # If the `Step` was built using the `@step` decorator, then we need to pass
    # the runtime parameters as `kwargs`, so they can be used within the processing
    # function
    generator = (
        self.process(offset=offset)
        if not self._built_from_decorator
        else self.process(offset=offset, **self._runtime_parameters)
    )

    for output_rows, last_batch in generator:
        yield (
            [
                {self.output_mappings.get(k, k): v for k, v in row.items()}
                for row in output_rows
            ],
            last_batch,
        )

make_generator_step(dataset, pipeline=None, batch_size=50, input_mappings=None, output_mappings=None, resources=StepResources(), repo_id='default_name')

Helper method to create a GeneratorStep from a dataset, to simplify

Parameters:

Name Type Description Default
dataset Union[Dataset, DataFrame, List[Dict[str, str]]]

The dataset to use in the Pipeline.

required
batch_size int

The batch_size, will default to the same used by the GeneratorSteps. Defaults to 50.

50
input_mappings Optional[Dict[str, str]]

Applies the same as any other step. Defaults to None.

None
output_mappings Optional[Dict[str, str]]

Applies the same as any other step. Defaults to None.

None
resources StepResources

Applies the same as any other step. Defaults to StepResources().

StepResources()
repo_id Optional[str]

The repository ID to use in the LoadDataFromHub step. This shouldn't be necessary, but in case of error, the dataset will try to be loaded using load_dataset internally. If that case happens, the repo_id will be used.

'default_name'

Raises:

Type Description
ValueError

If the format is different from the ones supported.

Returns:

Type Description
GeneratorStep

A LoadDataFromDicts if the input is a list of dicts, or LoadDataFromHub instance

GeneratorStep

if the input is a pd.DataFrame or a Dataset.

Source code in src/distilabel/steps/generators/utils.py
def make_generator_step(
    dataset: Union[Dataset, pd.DataFrame, List[Dict[str, str]]],
    pipeline: Union["BasePipeline", None] = None,
    batch_size: int = 50,
    input_mappings: Optional[Dict[str, str]] = None,
    output_mappings: Optional[Dict[str, str]] = None,
    resources: StepResources = StepResources(),
    repo_id: Optional[str] = "default_name",
) -> "GeneratorStep":
    """Helper method to create a `GeneratorStep` from a dataset, to simplify

    Args:
        dataset: The dataset to use in the `Pipeline`.
        batch_size: The batch_size, will default to the same used by the `GeneratorStep`s.
            Defaults to `50`.
        input_mappings: Applies the same as any other step. Defaults to `None`.
        output_mappings: Applies the same as any other step. Defaults to `None`.
        resources: Applies the same as any other step. Defaults to `StepResources()`.
        repo_id: The repository ID to use in the `LoadDataFromHub` step.
            This shouldn't be necessary, but in case of error, the dataset will try to be loaded
            using `load_dataset` internally. If that case happens, the `repo_id` will be used.

    Raises:
        ValueError: If the format is different from the ones supported.

    Returns:
        A `LoadDataFromDicts` if the input is a list of dicts, or `LoadDataFromHub` instance
        if the input is a `pd.DataFrame` or a `Dataset`.
    """
    from distilabel.steps import LoadDataFromDicts, LoadDataFromHub

    if isinstance(dataset, list):
        return LoadDataFromDicts(
            pipeline=pipeline,
            data=dataset,
            batch_size=batch_size,
            input_mappings=input_mappings or {},
            output_mappings=output_mappings or {},
            resources=resources,
        )

    if isinstance(dataset, pd.DataFrame):
        dataset = Dataset.from_pandas(dataset, preserve_index=False)

    if not isinstance(dataset, Dataset):
        raise DistilabelUserError(
            f"Dataset type not allowed: {type(dataset)}, must be one of: "
            "`datasets.Dataset`, `pd.DataFrame`, `List[Dict[str, str]]`",
            page="sections/how_to_guides/basic/pipeline/?h=make_#__tabbed_1_2",
        )

    loader = LoadDataFromHub(
        pipeline=pipeline,
        repo_id=repo_id,
        batch_size=batch_size,
        input_mappings=input_mappings or {},
        output_mappings=output_mappings or {},
        resources=resources,
    )
    super(loader.__class__, loader).load()  # Ensure the logger is loaded
    loader._dataset = dataset
    loader.num_examples = len(dataset)
    loader._dataset_info = {"default": dataset.info}
    return loader