Step¶
The Step
is an abstract class which defines the interface for the building blocks to be defined within the context of a Pipeline
, a Step
can be seen as a node within a Direct Acyclic Graph (DAG) which execution is orchestrated by the Pipeline
.
Working with Steps¶
The Step
is intended to be used within the scope of a Pipeline
, which will orchestrate the different steps defined; but nonetheless, they can be used standalone if needed too.
Assuming that we have a Step
already defined as it follows:
class MyStep(Step):
@property
def inputs(self) -> List[str]:
return ["input_field"]
@property
def outputs(self) -> List[str]:
return ["output_field"]
def process(self, inputs: StepInput) -> "StepOutput":
for input in inputs:
input["output_field"] = input["input_field"]
yield inputs
Then we can use / instantiate it as follows:
step = MyStep(name="my-step")
step.load()
next(step.process([{"input_field": "value"}]))
# [{'input_field': 'value', 'output_field': 'value'}]
Note
The load
method needs to be called ALWAYS if using the steps and any Step
subclass as standalone, unless the Pipeline
context manager is used, meaning that there will be no need to call the load
method, since it will be automatically called on Pipeline.run
; but in any other case the method load
needs to be called from the parent class.
Anyway, most of the times we'll end up using pre-defined steps in distilabel
, so that there's no need to create custom steps, but anyway, we'll cover that later in this page.
Let's see now a set of arguments that can be used to map fields across steps, or to set the batch size specific for the step:
-
input_mappings
, which is a dictionary that can be useful to map keys from the input dictionaries to the keys expected by the step. For example, ifinput_mappings={"instruction": "prompt"}
, that means that the key prompt from the input dictionaries will be used as the key instruction for the step. -
output_mappings
, which is a dictionary that can be used to map the outputs of the step to other names. For example, ifoutput_mappings={"conversation": "prompt"}
, that means that the key conversation generated by the step will be renamed to prompt and the output dictionaries of this step will contain a key called prompt instead of conversation. -
input_batch_size
(by default set to 50), which is independent for every step and will determine how many input dictionaries will process at once. If won't matter that much in this step, but as we will see later, other types of steps will come with an LLM, so having this flexibility will be really useful.
Runtime parameters¶
Finally, let's introduce at a special type of argument that we will find when dealing with the Steps
, the Runtime parameters
. For example, the input_batch_size
is of type RuntimeParameter
:
from distilabel.mixins.runtime_parameters import RuntimeParameter
class Step(...):
input_batch_size: RuntimeParameter[PositiveInt] = Field(
default=DEFAULT_INPUT_BATCH_SIZE,
description="The number of rows that will contain the batches processed by the"
" step.",
)
We can interact with these types of arguments when we call the Pipeline.run
method as we will see in the Pipeline
section. These types of arguments can be really useful to insert info to the steps after the pipeline has been defined.
Types of Steps¶
Besides the default Step
already described, in distilabel
we find the following abstract subclasses on top of the Step
.
-
GeneratorStep
: is a step that only produces / generates data, and it doesn't need any input data from previous steps, is in most of the cases a parent node of the graph i.e. the firstStep
in thePipeline
.More information about it at Components -> Step - GeneratorStep.
-
GlobalStep
: is a step with the standard interface i.e. receives inputs and generates outputs, but it processes all the data at once, is in most of the cases a leaf node of the graph i.e. the lastStep
in thePipeline
. The fact that aGlobalStep
requires the outputs from the previous steps, means that the previous steps needs to finish for this step to start, and the connected outputs steps, if any, will need to wait until this step is done.More information about it at Components - Step - GlobalStep.
Additionally, distilabel
also defines another type of Step
, which is the Task
, which is essentially the same, besides the fact that the task will expect an LLM
as an attribute, and the process
method will be in charge of calling that LLM. So one could say that the Task
is a Step
to work with an LLM
.
More information about it at Components - Task.
Defining custom Steps¶
In order to define custom steps, we need to create a new subclass of the Step
class, and set both the inputs
and outputs
property, as well as the process
method.
So on, the following will need to be defined:
-
inputs
: is a property that returns a list of strings with the names of the required input fields. -
outputs
: is a property that returns a list of strings with the names of the output fields. -
process
: is a method that receives the input data and returns the output data, and it should be a generator, meaning that it shouldyield
the output data. It's important to preserve the default signature within the methoddef process(self, *inputs: StepInput) -> StepOutput
, since that's the one that will be used by thePipeline
to orchestrate the steps, meaning that the argumentinputs
should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too.
Note
The default signature for the process
method is process(self, *inputs: StepInput) -> StepOutput
, meaning that it should be able to receive any number of inputs by default i.e. more than one Step
at a time could be connected to the current one. Anyway, when defining custom steps, that can be overridden with process(self, inputs: StepInput) -> StepOutput
, so that the process
method only receives the outputs from one previous Step
connected to it.
Warning
For the custom Step
subclasses to work properly with distilabel
and with the validation and serialization performed by default over each Step
in the Pipeline
, the type-hint for both StepInput
and StepOutput
should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING
, otherwise, the validation and/or serialization will fail.
from distilabel.steps import Step, StepInput
from distilabel.steps.typing import StepOutput
class CustomStep(Step):
@property
def inputs(self) -> List[str]:
...
@property
def outputs(self) -> List[str]:
...
def process(self, *inputs: StepInput) -> StepOutput:
for input in inputs:
...
yield item
# When overridden (ideally under the `typing_extensions.override` decorator)
# @typing_extensions.override
# def process(self, inputs: StepInput) -> StepOutput:
# for input in inputs:
# ...
# yield inputs
Alternatively, a simpler and more suitable way of defining custom Step
subclasses is via the @step
decorator, which will take care of the boilerplate code, and will allow to define the inputs
, outputs
, and process
methods in a more straightforward way.
from distilabel.steps import StepInput, step
from distilabel.steps.typing import StepOutput
@step(inputs=[...], outputs=[...])
def CustomStep(inputs: StepInput) - StepOutput:
for input in inputs:
...
yield inputs
step = CustomStep(name="my-step")
Warning
One downside of the @step
decorator is that it won't let you access the self
attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom Step
subclass.