Steps for processing data¶
Working with Steps¶
The Step
is intended to be used within the scope of a Pipeline
, which will orchestrate the different steps defined but can also be used standalone.
Assuming that we have a Step
already defined as it follows:
from typing import TYPE_CHECKING
from distilabel.steps import Step, StepInput
if TYPE_CHECKING:
from distilabel.steps.typing import StepColumns, StepOutput
class MyStep(Step):
@property
def inputs(self) -> "StepColumns":
return ["input_field"]
@property
def outputs(self) -> "StepColumns":
return ["output_field"]
def process(self, inputs: StepInput) -> "StepOutput":
for input in inputs:
input["output_field"] = input["input_field"]
yield inputs
Then we can use it as follows:
step = MyStep(name="my-step")
step.load()
next(step.process([{"input_field": "value"}]))
# [{'input_field': 'value', 'output_field': 'value'}]
Note
The Step.load()
always needs to be executed when being used as a standalone. Within a pipeline, this will be done automatically during pipeline execution.
Arguments¶
-
input_mappings
, is a dictionary that maps keys from the input dictionaries to the keys expected by the step. For example, ifinput_mappings={"instruction": "prompt"}
, means that the input keyprompt
will be used as the keyinstruction
for current step. -
output_mappings
, is a dictionary that can be used to map the outputs of the step to other names. For example, ifoutput_mappings={"conversation": "prompt"}
, means that output keyconversation
will be renamed toprompt
for the next step. -
input_batch_size
(by default set to 50), is independent for every step and will determine how many input dictionaries will process at once.
Runtime parameters¶
Step
s can also have RuntimeParameter
, which are parameters that can only be used after the pipeline initialisation when calling the Pipeline.run
.
from distilabel.mixins.runtime_parameters import RuntimeParameter
class Step(...):
input_batch_size: RuntimeParameter[PositiveInt] = Field(
default=DEFAULT_INPUT_BATCH_SIZE,
description="The number of rows that will contain the batches processed by the"
" step.",
)
Types of Steps¶
There are two special types of Step
in distilabel
:
-
GeneratorStep
: is a step that only generates data, and it doesn't need any input data from previous steps and normally is the first node in aPipeline
. More information: Components -> Step - GeneratorStep. -
GlobalStep
: is a step with the standard interface i.e. receives inputs and generates outputs, but it processes all the data at once, and often is the final step in thePipeline
. The fact that aGlobalStep
requires the previous steps to finish before being able to start. More information: Components - Step - GlobalStep. -
Task
, is essentially the same as a defaultStep
, but it relies on anLLM
as an attribute, and theprocess
method will be in charge of calling that LLM. More information: Components - Task.
Defining custom Steps¶
We can define a custom step by creating a new subclass of the Step
and defining the following:
-
inputs
: is a property that returns a list of strings with the names of the required input fields or a dictionary in which the keys are the names of the columns and the values are boolean indicating whether the column is required or not. -
outputs
: is a property that returns a list of strings with the names of the output fields or a dictionary in which the keys are the names of the columns and the values are boolean indicating whether the column is required or not. -
process
: is a method that receives the input data and returns the output data, and it should be a generator, meaning that it shouldyield
the output data.
Note
The default signature for the process
method is process(self, *inputs: StepInput) -> StepOutput
. The argument inputs
should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too because it should be able to receive any number of inputs by default i.e. more than one Step
at a time could be connected to the current one.
Warning
For the custom Step
subclasses to work properly with distilabel
and with the validation and serialization performed by default over each Step
in the Pipeline
, the type-hint for both StepInput
and StepOutput
should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING
, otherwise, the validation and/or serialization will fail.
We can inherit from the Step
class and define the inputs
, outputs
, and process
methods as follows:
from typing import TYPE_CHECKING
from distilabel.steps import Step, StepInput
if TYPE_CHECKING:
from distilabel.steps.typing import StepColumns, StepOutput
class CustomStep(Step):
@property
def inputs(self) -> "StepColumns":
...
@property
def outputs(self) -> "StepColumns":
...
def process(self, *inputs: StepInput) -> "StepOutput":
for upstream_step_inputs in inputs:
...
yield item
# When overridden (ideally under the `typing_extensions.override` decorator)
# @typing_extensions.override
# def process(self, inputs: StepInput) -> StepOutput:
# for input in inputs:
# ...
# yield inputs
The @step
decorator will take care of the boilerplate code, and will allow to define the inputs
, outputs
, and process
methods in a more straightforward way. One downside is that it won't let you access the self
attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom Step
subclass.
from typing import TYPE_CHECKING
from distilabel.steps import StepInput, step
if TYPE_CHECKING:
from distilabel.steps.typing import StepOutput
@step(inputs=[...], outputs=[...])
def CustomStep(inputs: StepInput) -> "StepOutput":
for input in inputs:
...
yield inputs
step = CustomStep(name="my-step")