Skip to content

Steps

The Step is the base class in distilabel, every unit of work in a Pipeline will inherit from it.

What's a Step in distilabel?

It's a base class is in charge of processing data, which in the end will be lists of dictionaries. In order to process, it defines two properties: inputs and outputs, which are a list of strings that represent the names of the columns that the step needs as input or output respectively i.e. the keys that have to be present in the list of dictionaries received by the step and the keys that will be added to these dictionaries after running it.

Every Step is connected to a Pipeline, which in practice means that we will build them in the context of a Pipeline.

Lastly, these steps inherit from pydantic.BaseModel, so all the attributes of a step will be validated upon definition.

An example: ConversationTemplate

Let's see one simple type of step as an example, the ConversationTemplate. Let's take a look at it's definition (the docstrings are removed for clarity, but it can be reviewd in the API reference):

class ConversationTemplate(Step):

    @property
    def inputs(self) -> List[str]:
        return ["instruction", "response"]

    @property
    def outputs(self) -> List[str]:
        return ["conversation"]

    def process(self, inputs: StepInput) -> "StepOutput":
        for input in inputs:
            input["conversation"] = [
                {"role": "user", "content": input["instruction"]},
                {"role": "assistant", "content": input["response"]},
            ]
        yield inputs

At the very minimal, we need to define the inputs and outputs properties with the column names required as input, and returned as output respectively, and the processing logic of the step in the process method.

In this example, we see that it takes inputs as argument, annotated as StepInput, which is a list of dictionaries with the data, and yields a StepOutput.

Working with the step

Let's see how to instantiate this Step outside of a Pipeline:

from distilabel.pipeline.local import Pipeline
from distilabel.steps.conversation import ConversationTemplate

conversation_template = ConversationTemplate(
    name="my-conversation-template",
    pipeline=Pipeline(name="my-pipeline"),
)

As we mentioned, every Step must be defined in the context of a Pipeline, which means we need to pass it as an argument if we decide to create a standalone step. If we take a look at the conversation_template step, we see the following fields:

conversation_template
# ConversationTemplate(name='my-conversation-template', input_mappings={}, output_mappings={}, input_batch_size=50)
  • The name of the Step, a mandatory field to identify the Step within the Pipeline.
  • input_mappings, which is a dictionary that can be useful to map keys from the input dictionaries to the keys expected by the step. For example, if input_mappings={"instruction": "prompt"}, that means that the key prompt from the input dictionaries will be used as the key instruction for the step.
  • output_mappings, which is a dictionary that can be used to map the outputs of the step to other names. For example, if output_mappings={"conversation": "prompt"}, that means that the key conversation generated by the step will be renamed to prompt and the output dictionaries of this step will contain a key called prompt instead of conversation.
  • input_batch_size (by default set to 50), which is independent for every step and will determine how many input dictionaries will process at once. If won't matter that much in this step, but as we will see later, other types of steps will come with an LLM, so having this flexibility will be really useful.

Processing data

Internally, the Pipeline will call the process method when appropriate, but we can see it in action with some dummy data:

next(conversation_template.process([{"instruction": "Hello", "response": "Hi"}]))
# [
#   {
#     "instruction": "Hello",
#     "response": "Hi",
#     "conversation": [
#       {
#         "role": "user",
#         "content": "Hello"
#       },
#       {
#         "role": "assistant",
#         "content": "Hi"
#       }
#     ]
#   }
# ]

It takes the dictionary with data, adds another conversation with the data formatted as a conversation template, and passes this data to the following step.

This is a small type step that shows what to expect when we are creating our Step objects, which can start from something as simple as generating a conversation template from some columns on a dataset.

Runtime Parameters

Let's take a look at a special argument implementation that we will find when dealing with the Steps, the Runtime parameters. Let's inspect them using the previous example class:

print(conversation_template.runtime_parameters_names)
# {'input_batch_size': True}

The ConversationTemplate only has one runtime_parameter, which comes defined from the Step class, and can be defined as such:

from distilabel.mixins.runtime_parameters import RuntimeParameter

class Step(...):
    ...
    input_batch_size: RuntimeParameter[PositiveInt] = Field(
        default=DEFAULT_INPUT_BATCH_SIZE,
        description="The number of rows that will contain the batches processed by the"
        " step.",
    )

When we define the input_batch_size as a RuntimeParameter, the most direct effect we can see is we have some access to some extra information, thanks to the RuntimeParamatersMixin:

print(conversation_template.get_runtime_parameters_info())
# [{'name': 'input_batch_size', 'optional': True, 'description': 'The number of rows that will contain the batches processed by the step.'}]

But other than accessing some extra information internally, we can directly interact with these parameters when we interacting or modifying the arguments of our Steps while running them in the context of a Pipeline. We will see them in action once we interact with the Steps inside of a Pipeline.

step decorator

If all that we want to apply in a step is some simple processing, it can be easier to just create a plain function, and decorate it. We can find more examples in the API reference, but let's see how we could define the previous step as a function and use the decorator:

from distilabel.steps import StepInput, StepOutput, step 

# Normal step
@step(inputs=["instruction", "response"], outputs=["conversation"])
def ConversationTemplate(inputs: StepInput) -> StepOutput:
    for input in inputs:
        input["conversation"] = [
            {"role": "user", "content": input["instruction"]},
            {"role": "assistant", "content": input["response"]},
        ]
    yield inputs

Which can be instantiated exactly the same as the ConversationTemplate class:

conversation_template = ConversationTemplate(
    name="my-conversation-template",
    pipeline=Pipeline(name="my-pipeline"),
)

This @step decorator has a special type depending step_type which will be better understood once we see the different types of steps.

Types of steps

Other than the general or normal steps we have seen, there are special types of steps that have a restricted behaviour compared to the general Step.

Generator steps

These are steps that are able to generate data, and don't need to receive any input from previous step, as it's implied in the normal steps. The typical use for these steps will be loading data for example, as can be seen in LoadDataFromDicts. For this type of steps we will only need to define the process method, and we can optionally pass a batch_size argument, that will determine the batch size of the generated batches.

Global steps

Other special type of step are the global steps. These steps don't have any inputs or outputs, and their process method receives all the data at once instead of using batches. This kind of behavior is necessary for example to push a dataset to a specific place, or doing some filtering on the whole data before continuing with the pipeline.