Skip to content

Step

The Step is an abstract class which defines the interface for the building blocks to be defined within the context of a Pipeline, a Step can be seen as a node within a Direct Acyclic Graph (DAG) which execution is orchestrated by the Pipeline.

Working with Steps

The Step is intended to be used within the scope of a Pipeline, which will orchestrate the different steps defined; but nonetheless, they can be used standalone if needed too.

Assuming that we have a Step already defined as it follows:

class MyStep(Step):
    @property
    def inputs(self) -> List[str]:
        return ["input_field"]

    @property
    def outputs(self) -> List[str]:
        return ["output_field"]

    def process(self, inputs: StepInput) -> "StepOutput":
        for input in inputs:
            input["output_field"] = input["input_field"]
        yield inputs

Then we can use / instantiate it as follows:

step = MyStep(name="my-step")
step.load()

next(step.process([{"input_field": "value"}]))
# [{'input_field': 'value', 'output_field': 'value'}]

Note

The load method needs to be called ALWAYS if using the steps and any Step subclass as standalone, unless the Pipeline context manager is used, meaning that there will be no need to call the load method, since it will be automatically called on Pipeline.run; but in any other case the method load needs to be called from the parent class.

Anyway, most of the times we'll end up using pre-defined steps in distilabel, so that there's no need to create custom steps, but anyway, we'll cover that later in this page.

Let's see now a set of arguments that can be used to map fields across steps, or to set the batch size specific for the step:

  • input_mappings, which is a dictionary that can be useful to map keys from the input dictionaries to the keys expected by the step. For example, if input_mappings={"instruction": "prompt"}, that means that the key prompt from the input dictionaries will be used as the key instruction for the step.

  • output_mappings, which is a dictionary that can be used to map the outputs of the step to other names. For example, if output_mappings={"conversation": "prompt"}, that means that the key conversation generated by the step will be renamed to prompt and the output dictionaries of this step will contain a key called prompt instead of conversation.

  • input_batch_size (by default set to 50), which is independent for every step and will determine how many input dictionaries will process at once. If won't matter that much in this step, but as we will see later, other types of steps will come with an LLM, so having this flexibility will be really useful.

Runtime parameters

Finally, let's introduce at a special type of argument that we will find when dealing with the Steps, the Runtime parameters. For example, the input_batch_size is of type RuntimeParameter:

from distilabel.mixins.runtime_parameters import RuntimeParameter

class Step(...):
    input_batch_size: RuntimeParameter[PositiveInt] = Field(
        default=DEFAULT_INPUT_BATCH_SIZE,
        description="The number of rows that will contain the batches processed by the"
        " step.",
    )

We can interact with these types of arguments when we call the Pipeline.run method as we will see in the Pipeline section. These types of arguments can be really useful to insert info to the steps after the pipeline has been defined.

Types of Steps

Besides the default Step already described, in distilabel we find the following abstract subclasses on top of the Step.

  • GeneratorStep: is a step that only produces / generates data, and it doesn't need any input data from previous steps, is in most of the cases a parent node of the graph i.e. the first Step in the Pipeline.

    More information about it at Components -> Step - GeneratorStep.

  • GlobalStep: is a step with the standard interface i.e. receives inputs and generates outputs, but it processes all the data at once, is in most of the cases a leaf node of the graph i.e. the last Step in the Pipeline. The fact that a GlobalStep requires the outputs from the previous steps, means that the previous steps needs to finish for this step to start, and the connected outputs steps, if any, will need to wait until this step is done.

    More information about it at Components - Step - GlobalStep.

Additionally, distilabel also defines another type of Step, which is the Task, which is essentially the same, besides the fact that the task will expect an LLM as an attribute, and the process method will be in charge of calling that LLM. So one could say that the Task is a Step to work with an LLM.

More information about it at Components - Task.

Defining custom Steps

In order to define custom steps, we need to create a new subclass of the Step class, and set both the inputs and outputs property, as well as the process method.

So on, the following will need to be defined:

  • inputs: is a property that returns a list of strings with the names of the required input fields.

  • outputs: is a property that returns a list of strings with the names of the output fields.

  • process: is a method that receives the input data and returns the output data, and it should be a generator, meaning that it should yield the output data. It's important to preserve the default signature within the method def process(self, *inputs: StepInput) -> StepOutput, since that's the one that will be used by the Pipeline to orchestrate the steps, meaning that the argument inputs should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too.

Note

The default signature for the process method is process(self, *inputs: StepInput) -> StepOutput, meaning that it should be able to receive any number of inputs by default i.e. more than one Step at a time could be connected to the current one. Anyway, when defining custom steps, that can be overridden with process(self, inputs: StepInput) -> StepOutput, so that the process method only receives the outputs from one previous Step connected to it.

Warning

For the custom Step subclasses to work properly with distilabel and with the validation and serialization performed by default over each Step in the Pipeline, the type-hint for both StepInput and StepOutput should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING, otherwise, the validation and/or serialization will fail.

from distilabel.steps import Step, StepInput
from distilabel.steps.typing import StepOutput

class CustomStep(Step):
    @property
    def inputs(self) -> List[str]:
        ...

    @property
    def outputs(self) -> List[str]:
        ...

    def process(self, *inputs: StepInput) -> StepOutput:
        for input in inputs:
            ...
            yield item

    # When overridden (ideally under the `typing_extensions.override` decorator)
    # @typing_extensions.override
    # def process(self, inputs: StepInput) -> StepOutput:
    #     for input in inputs:
    #         ...
    #     yield inputs

Alternatively, a simpler and more suitable way of defining custom Step subclasses is via the @step decorator, which will take care of the boilerplate code, and will allow to define the inputs, outputs, and process methods in a more straightforward way.

from distilabel.steps import StepInput, step
from distilabel.steps.typing import StepOutput

@step(inputs=[...], outputs=[...])
def CustomStep(inputs: StepInput) - StepOutput:
    for input in inputs:
        ...
    yield inputs

step = CustomStep(name="my-step")

Warning

One downside of the @step decorator is that it won't let you access the self attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom Step subclass.