Skip to content

GlobalStep

The GlobalStep is a subclass of Step that is used to define a step that requires the previous steps to be completed to run, since it will wait until all the input batches are received before running. This step is useful when you need to run a step that requires all the input data to be processed before running.

Working with GlobalSteps

The GlobalStep is intended to be used within the scope of a Pipeline and after some previous steps have been defined. Alternatively, it can also be used as a standalone Step if needed, but then using Step instead would be more appropriate.

Defining custom GlobalSteps

In order to define custom steps, we need to create a new subclass of the GlobalStep class, and set both the inputs and outputs property, as well as the process method.

So on, the following will need to be defined:

  • inputs: is a property that returns a list of strings with the names of the required input fields.

  • outputs: is a property that returns a list of strings with the names of the output fields.

  • process: is a method that receives the input data and returns the output data, and it should be a generator, meaning that it should yield the output data. It's important to preserve the default signature within the method def process(self, *inputs: StepInput) -> StepOutput, since that's the one that will be used by the Pipeline to orchestrate the steps, meaning that the argument inputs should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too.

Note

The default signature for the process method is process(self, *inputs: StepInput) -> StepOutput, meaning that it should be able to receive any number of inputs by default i.e. more than one Step at a time could be connected to the current one. Anyway, when defining custom steps, that can be overridden with process(self, inputs: StepInput) -> StepOutput, so that the process method only receives the outputs from one previous Step connected to it.

Warning

For the custom GlobalStep subclasses to work properly with distilabel and with the validation and serialization performed by default over each Step in the Pipeline, the type-hint for both StepInput and StepOutput should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING, otherwise, the validation and/or serialization will fail.

from distilabel.steps import GlobalStep, StepInput
from distilabel.steps.typing import StepOutput

class CustomStep(Step):
    @property
    def inputs(self) -> List[str]:
        ...

    @property
    def outputs(self) -> List[str]:
        ...

    def process(self, *inputs: StepInput) -> StepOutput:
        for input in inputs:
            for item in input:
                ...
            yield item

    # When overridden (ideally under the `typing_extensions.override` decorator)
    # @typing_extensions.override
    # def process(self, inputs: StepInput) -> StepOutput:
    #     for input in inputs:
    #         ...
    #     yield inputs

Alternatively, a simpler and more suitable way of defining custom GlobalStep subclasses is via the @step decorator with the step_type="global", which will take care of the boilerplate code, and will allow to define the inputs, outputs, and process methods in a more straightforward way.

from distilabel.steps import StepInput, step
from distilabel.steps.typing import StepOutput

@step(inputs=[...], outputs=[...], step_type="global")
def CustomStep(inputs: StepInput) -> StepOutput:
    for input in inputs:
        ...
    yield inputs

step = CustomStep(name="my-step")

Warning

One downside of the @step decorator is that it won't let you access the self attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom GlobalStep subclass.