Skip to content

Steps

This section contains the API reference for the distilabel steps. For an example on how to create and use a step, see the Tutorial - Steps.

StepInput = Annotated[List[Dict[str, Any]], _STEP_INPUT_ANNOTATION] module-attribute

StepInput is just an Annotated alias of the typing List[Dict[str, Any]] with extra metadata that allows distilabel to perform validations over the process step method defined in each Step

GeneratorStep

Bases: _Step, ABC

A special kind of Step that is able to generate data i.e. it doesn't receive any input from the previous steps.

Attributes:

Name Type Description
batch_size RuntimeParameter[int]

The number of rows that will contain the batches generated by the step. Defaults to 50.

Runtime parameters
  • batch_size: The number of rows that will contain the batches generated by the step. Defaults to 50.
Source code in src/distilabel/steps/base.py
class GeneratorStep(_Step, ABC):
    """A special kind of `Step` that is able to generate data i.e. it doesn't receive
    any input from the previous steps.

    Attributes:
        batch_size: The number of rows that will contain the batches generated by the
            step. Defaults to `50`.

    Runtime parameters:
        - `batch_size`: The number of rows that will contain the batches generated by
            the step. Defaults to `50`.
    """

    batch_size: RuntimeParameter[int] = Field(
        default=50,
        description="The number of rows that will contain the batches generated by the"
        " step.",
    )

    @abstractmethod
    def process(self, offset: int = 0) -> "GeneratorStepOutput":
        """Method that defines the generation logic of the step. It should yield the
        output rows and a boolean indicating if it's the last batch or not.

        Args:
            offset: The offset to start the generation from. Defaults to 0.

        Yields:
            The output rows and a boolean indicating if it's the last batch or not.
        """
        pass

    def process_applying_mappings(self, offset: int = 0) -> "GeneratorStepOutput":
        """Runs the `process` method of the step applying the `outputs_mappings` to the
        output rows. This is the function that should be used to run the generation logic
        of the step.

        Args:
            offset: The offset to start the generation from. Defaults to 0.

        Yields:
            The output rows and a boolean indicating if it's the last batch or not.
        """

        # If the `Step` was built using the `@step` decorator, then we need to pass
        # the runtime parameters as `kwargs`, so they can be used within the processing
        # function
        generator = (
            self.process(offset=offset)
            if not self._built_from_decorator
            else self.process(offset=offset, **self._runtime_parameters)
        )

        for output_rows, last_batch in generator:
            yield (
                [
                    {self.output_mappings.get(k, k): v for k, v in row.items()}
                    for row in output_rows
                ],
                last_batch,
            )

process(offset=0) abstractmethod

Method that defines the generation logic of the step. It should yield the output rows and a boolean indicating if it's the last batch or not.

Parameters:

Name Type Description Default
offset int

The offset to start the generation from. Defaults to 0.

0

Yields:

Type Description
GeneratorStepOutput

The output rows and a boolean indicating if it's the last batch or not.

Source code in src/distilabel/steps/base.py
@abstractmethod
def process(self, offset: int = 0) -> "GeneratorStepOutput":
    """Method that defines the generation logic of the step. It should yield the
    output rows and a boolean indicating if it's the last batch or not.

    Args:
        offset: The offset to start the generation from. Defaults to 0.

    Yields:
        The output rows and a boolean indicating if it's the last batch or not.
    """
    pass

process_applying_mappings(offset=0)

Runs the process method of the step applying the outputs_mappings to the output rows. This is the function that should be used to run the generation logic of the step.

Parameters:

Name Type Description Default
offset int

The offset to start the generation from. Defaults to 0.

0

Yields:

Type Description
GeneratorStepOutput

The output rows and a boolean indicating if it's the last batch or not.

Source code in src/distilabel/steps/base.py
def process_applying_mappings(self, offset: int = 0) -> "GeneratorStepOutput":
    """Runs the `process` method of the step applying the `outputs_mappings` to the
    output rows. This is the function that should be used to run the generation logic
    of the step.

    Args:
        offset: The offset to start the generation from. Defaults to 0.

    Yields:
        The output rows and a boolean indicating if it's the last batch or not.
    """

    # If the `Step` was built using the `@step` decorator, then we need to pass
    # the runtime parameters as `kwargs`, so they can be used within the processing
    # function
    generator = (
        self.process(offset=offset)
        if not self._built_from_decorator
        else self.process(offset=offset, **self._runtime_parameters)
    )

    for output_rows, last_batch in generator:
        yield (
            [
                {self.output_mappings.get(k, k): v for k, v in row.items()}
                for row in output_rows
            ],
            last_batch,
        )

GlobalStep

Bases: Step, ABC

A special kind of Step which it's process method receives all the data processed by their previous steps at once, instead of receiving it in batches. This kind of steps are useful when the processing logic requires to have all the data at once, for example to train a model, to perform a global aggregation, etc.

Source code in src/distilabel/steps/base.py
class GlobalStep(Step, ABC):
    """A special kind of `Step` which it's `process` method receives all the data processed
    by their previous steps at once, instead of receiving it in batches. This kind of steps
    are useful when the processing logic requires to have all the data at once, for example
    to train a model, to perform a global aggregation, etc.
    """

    @property
    def inputs(self) -> List[str]:
        return []

    @property
    def outputs(self) -> List[str]:
        return []

Step

Bases: _Step, ABC

Base class for the steps that can be included in a Pipeline.

Attributes:

Name Type Description
input_batch_size RuntimeParameter[PositiveInt]

The number of rows that will contain the batches processed by the step. Defaults to 50.

Runtime parameters
  • input_batch_size: The number of rows that will contain the batches processed by the step. Defaults to 50.
Source code in src/distilabel/steps/base.py
class Step(_Step, ABC):
    """Base class for the steps that can be included in a `Pipeline`.

    Attributes:
        input_batch_size: The number of rows that will contain the batches processed by
            the step. Defaults to `50`.

    Runtime parameters:
        - `input_batch_size`: The number of rows that will contain the batches processed
            by the step. Defaults to `50`.
    """

    input_batch_size: RuntimeParameter[PositiveInt] = Field(
        default=DEFAULT_INPUT_BATCH_SIZE,
        description="The number of rows that will contain the batches processed by the"
        " step.",
    )

    @abstractmethod
    def process(self, *inputs: StepInput) -> "StepOutput":
        """Method that defines the processing logic of the step. It should yield the
        output rows.

        Args:
            *inputs: An argument used to receive the outputs of the previous steps. The
                number of arguments depends on the number of previous steps. It doesn't
                need to be an `*args` argument, it can be a regular argument annotated
                with `StepInput` if the step has only one previous step.
        """
        pass

    def process_applying_mappings(self, *args: List[Dict[str, Any]]) -> "StepOutput":
        """Runs the `process` method of the step applying the `input_mappings` to the input
        rows and the `outputs_mappings` to the output rows. This is the function that
        should be used to run the processing logic of the step.

        Yields:
            The output rows.
        """

        inputs = self._apply_input_mappings(args) if self.input_mappings else args

        # If the `Step` was built using the `@step` decorator, then we need to pass
        # the runtime parameters as kwargs, so they can be used within the processing
        # function
        generator = (
            self.process(*inputs)
            if not self._built_from_decorator
            else self.process(*inputs, **self._runtime_parameters)
        )

        for output_rows in generator:
            yield [
                {
                    # Apply output mapping and revert input mapping
                    self.output_mappings.get(k, None)
                    or self.input_mappings.get(k, None)
                    or k: v
                    for k, v in row.items()
                }
                for row in output_rows
            ]

    def _revert_input_mappings(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """Reverts the `input_mappings` of the step to the input row.

        Args:
            input: The input row.

        Returns:
            The input row with the `input_mappings` reverted.
        """
        return {self.input_mappings.get(k, k): v for k, v in input.items()}

    def _apply_input_mappings(
        self, inputs: Tuple[List[Dict[str, Any]], ...]
    ) -> List[List[Dict[str, Any]]]:
        """Applies the `input_mappings` to the input rows.

        Args:
            inputs: The input rows.

        Returns:
            The input rows with the `input_mappings` applied.
        """
        reverted_input_mappings = {v: k for k, v in self.input_mappings.items()}

        return [
            [
                {reverted_input_mappings.get(k, k): v for k, v in row.items()}
                for row in row_inputs
            ]
            for row_inputs in inputs
        ]

process(*inputs) abstractmethod

Method that defines the processing logic of the step. It should yield the output rows.

Parameters:

Name Type Description Default
*inputs StepInput

An argument used to receive the outputs of the previous steps. The number of arguments depends on the number of previous steps. It doesn't need to be an *args argument, it can be a regular argument annotated with StepInput if the step has only one previous step.

()
Source code in src/distilabel/steps/base.py
@abstractmethod
def process(self, *inputs: StepInput) -> "StepOutput":
    """Method that defines the processing logic of the step. It should yield the
    output rows.

    Args:
        *inputs: An argument used to receive the outputs of the previous steps. The
            number of arguments depends on the number of previous steps. It doesn't
            need to be an `*args` argument, it can be a regular argument annotated
            with `StepInput` if the step has only one previous step.
    """
    pass

process_applying_mappings(*args)

Runs the process method of the step applying the input_mappings to the input rows and the outputs_mappings to the output rows. This is the function that should be used to run the processing logic of the step.

Yields:

Type Description
StepOutput

The output rows.

Source code in src/distilabel/steps/base.py
def process_applying_mappings(self, *args: List[Dict[str, Any]]) -> "StepOutput":
    """Runs the `process` method of the step applying the `input_mappings` to the input
    rows and the `outputs_mappings` to the output rows. This is the function that
    should be used to run the processing logic of the step.

    Yields:
        The output rows.
    """

    inputs = self._apply_input_mappings(args) if self.input_mappings else args

    # If the `Step` was built using the `@step` decorator, then we need to pass
    # the runtime parameters as kwargs, so they can be used within the processing
    # function
    generator = (
        self.process(*inputs)
        if not self._built_from_decorator
        else self.process(*inputs, **self._runtime_parameters)
    )

    for output_rows in generator:
        yield [
            {
                # Apply output mapping and revert input mapping
                self.output_mappings.get(k, None)
                or self.input_mappings.get(k, None)
                or k: v
                for k, v in row.items()
            }
            for row in output_rows
        ]