GlobalStep¶
The GlobalStep
is a subclass of Step
that is used to define a step that requires the previous steps to be completed to run, since it will wait until all the input batches are received before running. This step is useful when you need to run a step that requires all the input data to be processed before running. Alternatively, it can also be used as a standalone.
Defining custom GlobalSteps¶
We can define a custom step by creating a new subclass of the GlobalStep
and defining the following:
-
inputs
: is a property that returns a list of strings with the names of the required input fields or a dictionary in which the keys are the names of the columns and the values are boolean indicating whether the column is required or not. -
outputs
: is a property that returns a list of strings with the names of the output fields or a dictionary in which the keys are the names of the columns and the values are boolean indicating whether the column is required or not. -
process
: is a method that receives the input data and returns the output data, and it should be a generator, meaning that it shouldyield
the output data.
Note
The default signature for the process
method is process(self, *inputs: StepInput) -> StepOutput
. The argument inputs
should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too because it should be able to receive any number of inputs by default i.e. more than one Step
at a time could be connected to the current one.
Warning
For the custom GlobalStep
subclasses to work properly with distilabel
and with the validation and serialization performed by default over each Step
in the Pipeline
, the type-hint for both StepInput
and StepOutput
should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING
, otherwise, the validation and/or serialization will fail.
We can inherit from the GlobalStep
class and define the inputs
, outputs
, and process
methods as follows:
from typing import TYPE_CHECKING
from distilabel.steps import GlobalStep, StepInput
if TYPE_CHECKING:
from distilabel.steps.typing import StepColumns, StepOutput
class CustomStep(Step):
@property
def inputs(self) -> "StepColumns":
...
@property
def outputs(self) -> "StepColumns":
...
def process(self, *inputs: StepInput) -> StepOutput:
for upstream_step_inputs in inputs:
for item in input:
...
yield item
# When overridden (ideally under the `typing_extensions.override` decorator)
# @typing_extensions.override
# def process(self, inputs: StepInput) -> StepOutput:
# for input in inputs:
# ...
# yield inputs
The @step
decorator will take care of the boilerplate code, and will allow to define the inputs
, outputs
, and process
methods in a more straightforward way. One downside is that it won't let you access the self
attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom GlobalStep
subclass.
from typing import TYPE_CHECKING
from distilabel.steps import StepInput, step
if TYPE_CHECKING:
from distilabel.steps.typing import StepOutput
@step(inputs=[...], outputs=[...], step_type="global")
def CustomStep(inputs: StepInput) -> "StepOutput":
for input in inputs:
...
yield inputs
step = CustomStep(name="my-step")