GlobalStep¶
The GlobalStep
is a subclass of Step
that is used to define a step that requires the previous steps to be completed to run, since it will wait until all the input batches are received before running. This step is useful when you need to run a step that requires all the input data to be processed before running.
Working with GlobalSteps¶
The GlobalStep
is intended to be used within the scope of a Pipeline
and after some previous steps have been defined. Alternatively, it can also be used as a standalone Step
if needed, but then using Step
instead would be more appropriate.
Defining custom GlobalSteps¶
In order to define custom steps, we need to create a new subclass of the GlobalStep
class, and set both the inputs
and outputs
property, as well as the process
method.
So on, the following will need to be defined:
-
inputs
: is a property that returns a list of strings with the names of the required input fields. -
outputs
: is a property that returns a list of strings with the names of the output fields. -
process
: is a method that receives the input data and returns the output data, and it should be a generator, meaning that it shouldyield
the output data. It's important to preserve the default signature within the methoddef process(self, *inputs: StepInput) -> StepOutput
, since that's the one that will be used by thePipeline
to orchestrate the steps, meaning that the argumentinputs
should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too.
Note
The default signature for the process
method is process(self, *inputs: StepInput) -> StepOutput
, meaning that it should be able to receive any number of inputs by default i.e. more than one Step
at a time could be connected to the current one. Anyway, when defining custom steps, that can be overridden with process(self, inputs: StepInput) -> StepOutput
, so that the process
method only receives the outputs from one previous Step
connected to it.
Warning
For the custom GlobalStep
subclasses to work properly with distilabel
and with the validation and serialization performed by default over each Step
in the Pipeline
, the type-hint for both StepInput
and StepOutput
should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING
, otherwise, the validation and/or serialization will fail.
from distilabel.steps import GlobalStep, StepInput
from distilabel.steps.typing import StepOutput
class CustomStep(Step):
@property
def inputs(self) -> List[str]:
...
@property
def outputs(self) -> List[str]:
...
def process(self, *inputs: StepInput) -> StepOutput:
for input in inputs:
for item in input:
...
yield item
# When overridden (ideally under the `typing_extensions.override` decorator)
# @typing_extensions.override
# def process(self, inputs: StepInput) -> StepOutput:
# for input in inputs:
# ...
# yield inputs
Alternatively, a simpler and more suitable way of defining custom GlobalStep
subclasses is via the @step
decorator with the step_type="global"
, which will take care of the boilerplate code, and will allow to define the inputs
, outputs
, and process
methods in a more straightforward way.
from distilabel.steps import StepInput, step
from distilabel.steps.typing import StepOutput
@step(inputs=[...], outputs=[...], step_type="global")
def CustomStep(inputs: StepInput) -> StepOutput:
for input in inputs:
...
yield inputs
step = CustomStep(name="my-step")
Warning
One downside of the @step
decorator is that it won't let you access the self
attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom GlobalStep
subclass.