GeneratorStep¶
The GeneratorStep
is a subclass of Step
that is intended to be used as the first step within a Pipeline
, because it doesn't require input and generates data that can be used by other steps. Alternatively, it can also be used as a standalone.
from typing import List, TYPE_CHECKING
from typing_extensions import override
from distilabel.steps import GeneratorStep
if TYPE_CHECKING:
from distilabel.typing import StepColumns, GeneratorStepOutput
class MyGeneratorStep(GeneratorStep):
instructions: List[str]
@override
def process(self, offset: int = 0) -> "GeneratorStepOutput":
if offset:
self.instructions = self.instructions[offset:]
while self.instructions:
batch = [
{
"instruction": instruction
} for instruction in self.instructions[: self.batch_size]
]
self.instructions = self.instructions[self.batch_size :]
yield (
batch,
True if len(self.instructions) == 0 else False,
)
@property
def outputs(self) -> "StepColumns":
return ["instruction"]
Then we can use it as follows:
step = MyGeneratorStep(
name="my-generator-step",
instructions=["Tell me a joke.", "Tell me a story."],
batch_size=1,
)
step.load()
next(step.process(offset=0))
# ([{'instruction': 'Tell me a joke.'}], False)
next(step.process(offset=1))
# ([{'instruction': 'Tell me a story.'}], True)
Note
The Step.load()
always needs to be executed when being used as a standalone. Within a pipeline, this will be done automatically during pipeline execution.
Defining custom GeneratorSteps¶
We can define a custom generator step by creating a new subclass of the GeneratorStep
and defining the following:
-
outputs
: is a property that returns a list of strings with the names of the output fields or a dictionary in which the keys are the names of the columns and the values are boolean indicating whether the column is required or not. -
process
: is a method that yields output data and a boolean flag indicating whether that's the last batch to be generated.
Note
The default signature for the process
method is process(self, offset: int = 0) -> GeneratorStepOutput
. The argument offset
should be respected, no more arguments can be provided, and the type-hints and return type-hints should be respected too because it should be able to receive any number of inputs by default i.e. more than one Step
at a time could be connected to the current one.
Warning
For the custom Step
subclasses to work properly with distilabel
and with the validation and serialization performed by default over each Step
in the Pipeline
, the type-hint for both StepInput
and StepOutput
should be used and not surrounded with double-quotes or imported under typing.TYPE_CHECKING
, otherwise, the validation and/or serialization will fail.
We can inherit from the GeneratorStep
class and define the outputs
, and process
methods as follows:
from typing import List, TYPE_CHECKING
from typing_extensions import override
from distilabel.steps import GeneratorStep
if TYPE_CHECKING:
from distilabel.typing import StepColumns, GeneratorStepOutput
class MyGeneratorStep(GeneratorStep):
instructions: List[str]
@override
def process(self, offset: int = 0) -> "GeneratorStepOutput":
...
@property
def outputs(self) -> "StepColumns":
...
The @step
decorator will take care of the boilerplate code, and will allow to define the outputs
, and process
methods in a more straightforward way. One downside is that it won't let you access the self
attributes if any, neither set those, so if you need to access or set any attribute, you should go with the first approach of defining the custom GeneratorStep
subclass.
from typing import TYPE_CHECKING
from distilabel.steps import step
if TYPE_CHECKING:
from distilabel.typing import GeneratorStepOutput
@step(outputs=[...], step_type="generator")
def CustomGeneratorStep(offset: int = 0) -> "GeneratorStepOutput":
yield (
...,
True if offset == 10 else False,
)
step = CustomGeneratorStep(name="my-step")