Skip to content

Steps for processing data

Working with Steps

The Step is intended to be used within the scope of a Pipeline, which will orchestrate the different steps defined but can also be used standalone.

Assuming that we have a Step already defined as it follows:

from typing import TYPE_CHECKING
from distilabel.steps import Step, StepInput

if TYPE_CHECKING:
    from distilabel.typing import StepColumns, StepOutput

class MyStep(Step):
    @property
    def inputs(self) -> "StepColumns":
        return ["input_field"]

    @property
    def outputs(self) -> "StepColumns":
        return ["output_field"]

    def process(self, inputs: StepInput) -> "StepOutput":
        for input in inputs:
            input["output_field"] = input["input_field"]
        yield inputs

Then we can use it as follows:

step = MyStep(name="my-step")
step.load()

next(step.process([{"input_field": "value"}]))
# [{'input_field': 'value', 'output_field': 'value'}]

Note

The Step.load() always needs to be executed when being used as a standalone. Within a pipeline, this will be done automatically during pipeline execution.

Arguments

  • input_mappings, is a dictionary that maps keys from the input dictionaries to the keys expected by the step. For example, if input_mappings={"instruction": "prompt"}, means that the input key prompt will be used as the key instruction for current step.

  • output_mappings, is a dictionary that can be used to map the outputs of the step to other names. For example, if output_mappings={"conversation": "prompt"}, means that output key conversation will be renamed to prompt for the next step.

  • input_batch_size (by default set to 50), is independent for every step and will determine how many input dictionaries will process at once.

Runtime parameters

Steps can also have RuntimeParameter, which are parameters that can only be used after the pipeline initialisation when calling the Pipeline.run.

from distilabel.mixins.runtime_parameters import RuntimeParameter

class Step(...):
    input_batch_size: RuntimeParameter[PositiveInt] = Field(
        default=DEFAULT_INPUT_BATCH_SIZE,
        description="The number of rows that will contain the batches processed by the"
        " step.",
    )

Types of Steps

There are two special types of Step in distilabel:

  • GeneratorStep: is a step that only generates data, and it doesn't need any input data from previous steps and normally is the first node in a Pipeline. More information: Components -> Step - GeneratorStep.

  • GlobalStep: is a step with the standard interface i.e. receives inputs and generates outputs, but it processes all the data at once, and often is the final step in the Pipeline. The fact that a GlobalStep requires the previous steps to finish before being able to start. More information: Components - Step - GlobalStep.

  • Task, is essentially the same as a default Step, but it relies on an LLM as an attribute, and the process method will be in charge of calling that LLM. More information: Components - Task.

Defining custom Steps

We can define a custom step by creating a new subclass of the Step and defining the following:

  • inputs: is a property that returns a list of strings with the names of the required input fields or a dictionary in which the keys are the names of the columns and the values are boolean indicating whether the column is required or not.

  • outputs: is a property that returns a list of strings with the names of the output fields or a dictionary in which the keys are the names of the columns and the values are boolean indicating whether the column is required or not.

  • process: is a method that receives the input data and returns the output data, and it should be a generator, meaning that it should yield the output data.

Note

The default signature for the process method is process(self, *inputs: StepInput) -> StepOutput. The argument inputs should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too because it should be able to receive any number of inputs by default i.e. more than one Step at a time could be connected to the current one.

Warning

For the custom Step subclasses to work properly with distilabel and with the validation and serialization performed by default over each Step in the Pipeline, the type-hint for both StepInput and StepOutput should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING, otherwise, the validation and/or serialization will fail.

We can inherit from the Step class and define the inputs, outputs, and process methods as follows:

from typing import TYPE_CHECKING
from distilabel.steps import Step, StepInput

if TYPE_CHECKING:
    from distilabel.typing import StepColumns, StepOutput

class CustomStep(Step):
    @property
    def inputs(self) -> "StepColumns":
        ...

    @property
    def outputs(self) -> "StepColumns":
        ...

    def process(self, *inputs: StepInput) -> "StepOutput":
        for upstream_step_inputs in inputs:
            ...
            yield item

    # When overridden (ideally under the `typing_extensions.override` decorator)
    # @typing_extensions.override
    # def process(self, inputs: StepInput) -> StepOutput:
    #     for input in inputs:
    #         ...
    #     yield inputs

The @step decorator will take care of the boilerplate code, and will allow to define the inputs, outputs, and process methods in a more straightforward way. One downside is that it won't let you access the self attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom Step subclass.

from typing import TYPE_CHECKING
from distilabel.steps import StepInput, step

if TYPE_CHECKING:
    from distilabel.typing import StepOutput

@step(inputs=[...], outputs=[...])
def CustomStep(inputs: StepInput) -> "StepOutput":
    for input in inputs:
        ...
    yield inputs

step = CustomStep(name="my-step")