Pipeline¶
The Pipeline
is the central point in distilabel
, the way to organize the steps to create your datasets. Up to this point we've seen how we can define different Step
and Task
subclasses in Tutorial - Step and Tutorial - Task, respectively; which together with an LLM
are the building blocks of our datasets, in this section we will take a look at how all these blocks are organized inside a Pipeline
.
Note
Currently distilabel
implements a local version of a Pipeline
, and will assume that's the only definition, but this can be extended in the future to include remote execution of the Pipeline
.
How to create a pipeline¶
The most common way a Pipeline
should be created is by making use of the context manager, we just need to give our Pipeline
a name, and optionally a description, and that's it1:
from distilabel.pipeline import Pipeline
with Pipeline("pipe-name", description="My first pipe") as pipeline: # (1)
...
- Use the context manager to create a
Pipeline
with a name and an optional description.
This way, we ensure all the steps we define there are connected with each other under the same Pipeline
. The next step is to define the steps of our Pipeline
. It's mandatory that the root steps of the pipeline i.e. the ones that doesn't have any predecessors, are GeneratorStep
s such as LoadDataFromDicts
or LoadHubDataset
.
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset") # (1)
...
- Define the first step of the pipeline, in this case
LoadHubDataset
, aGeneratorStep
used to load a dataset from the Hugging Face Hub.
Once we have a source of data, we can create another Step
s that will consume and process the data generated by the GeneratorStep
s. Let's assume that the dataset we're going to load from the Hub contains a prompt
column and that we want to generate texts based on this prompt. We also want to use several LLM
s for this task. To do so, we will create several TextGeneration
tasks, each with a different LLM
.
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset")
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm) # (1)
task.connect(load_dataset) # (2)
...
- Create a
TextGeneration
task for eachLLM
we want to use. - Connect the
TextGeneration
task with theLoadHubDataset
step, so the output data from the dataset is passed to the task.
Note
The order of the execution of the steps will be determined by the connections of the steps. In this case, the TextGeneration
tasks will be executed after the LoadHubDataset
step.
For each row of the dataset, the TextGeneration
task will generate a text based on the instruction
column and the LLM
model, and store the result (a single string) in a new column called generation
. As we would like to have all the response
s in the same column, we will add an extra step to combine them all in the same column, so the value of this column is a list of strings or responses.
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset")
combine_generations = CombineColumns( # (1)
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
load_dataset.connect(task)
task.connect(combine_generations) # (2)
- Create a
CombineColumns
step to combine all thegeneration
columns into a single column calledgenerations
and themodel_name
columns into a single column calledmodel_names
. - Connect the
TextGeneration
task with theCombineColumns
step, so the output data from the task is passed to the step that will combine all thegeneration
columns.
As the CombineColumns
is the last step or it's a leaf step of the pipeline because it doesn't have any successors, that means that the outputs of this step will be included in the returned Distiset
(more information about it in Advanced - Distiset).
Note
One pipeline can have several leaf steps, which means that the outputs of all the leaf steps will be included in the returned Distiset
, which will contain several subsets, one for each leaf step.
Connecting steps¶
In the previous example we saw how to create a Pipeline
and connect different steps using the Step.connect
method: step1.connect(step2)
, but there's an alternative way by making use of the >>
operator, let's see how using the previous Pipeline
as an example:
Each call to step1.connect(step2)
has been exchanged by step1 >> step2
:
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset")
combine_generations = CombineColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
load_dataset >> task >> combine_generations # (1)
- Here
load_dataset >> task >> combine_generations
was exchanged withload_dataset.connect(task).connect(combine_generations)
.
All the calls to connections from the load_dataset
step to the different task
objects are done in a single call:
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset")
combine_generations = CombineColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
tasks = []
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
tasks.append(
TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
)
load_dataset >> tasks >> combine_generations # (1)
- Notice how
tasks
is a list of differentTasks
. In a single call to the operator we are connectingload_dataset
with all thetasks
, and all of those again tocombine_generations
.
Routing batches to specific downstream steps¶
In some pipelines, it's likely that you will need to have a list of downstream steps receiving batches from the same upstream step, but you would like to route the batches to specific downstream steps based on some condition. To do so, you can use a routing_batch_function
, which is a simple function that receives a list of the downstream steps to which a batch can be routed, and returns a list containing the names of steps to which the batch should be routed. Let's update the example above to route the batches loaded by the LoadHubDataset
step to just 2 of the TextGeneration
tasks. First, we will create our custom routing_batch_function
, and then we will update the pipeline to use it:
import random
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration
@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
return random.sample(steps, 2)
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"},
)
tasks = []
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.0-pro"),
):
tasks.append(
TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
)
combine_generations = CombineColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
load_dataset >> sample_two_steps >> tasks >> combine_generations
As it can be seen, the routing_batch_function
can be used with the >>
operator to route the batches to specific downstream steps. In this case, each batch yielded by the load_dataset
step will be routed to just 2 of the TextGeneration
tasks, and then all the outputs of the tasks will be combined in the CombineColumns
step so each row of the final dataset will contain generations of 2 LLM
s at most. The routing_batch_function
that we just built is a common one, so distilabel
comes with an auxiliary function that can be used to achieve the same behavior:
Running the pipeline¶
Pipeline.dry_run¶
Before running the Pipeline
we may want to check all the components behave as expected. We can do a dry_run
for this case:
with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
if __name__ == "__main__":
distiset = pipeline.dry_run(parameters=..., batch_size=1)
It takes the same parameters as the run
method we will see in the following section, plus the batch_size
we want the dry run to use (by default set to 1). In this case, the Pipeline
would select a single example from our generator steps and pass through all the steps. Assuming the dry_run
runs successfully, we are ready to run our pipeline.
Pipeline.run¶
Once we have created the pipeline, we can run it. To do so, we need to call the run
method of the Pipeline
, and specify the runtime parameters for each step:
with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
if __name__ == "__main__":
distiset = pipeline.run(
parameters={
"load_dataset": {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
"text_generation_with_gpt-4-0125-preview": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_mistral-large-2402": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_gemini-1.0-pro": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
},
)
But if we run it, we will see that the run
method will fail:
ValueError: Step 'text_generation_with_gpt-4-0125-preview' requires inputs ['instruction'], but only the inputs=['prompt', 'completion', 'meta'] are available, which means that the inputs=['instruction'] are missing or not available
when the step gets to be executed in the pipeline. Please make sure previous steps to 'text_generation_with_gpt-4-0125-preview' are generating the required inputs.
This is because, before actually running the pipeline, the pipeline is validated to verify that everything is correct and all the steps in the pipeline are chainable i.e. each step has the necessary inputs to be executed. In this case, the TextGeneration
task requires the instruction
column, but the LoadHubDataset
step generates the prompt
column. To solve this, we can use the output_mappings
argument that every Step
has, to map or rename the output columns of a step to the required input columns of another step:
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"}, # (1)
)
...
- Use the
output_mappings
argument to map theprompt
column generated by theLoadHubDataset
step to theinstruction
column required by theTextGeneration
task.
If we execute the pipeline again, it will run successfully and we will have a Distiset
with the outputs of all the leaf steps of the pipeline which we can push to the Hugging Face Hub.
if __name__ == "__main__":
distiset = pipeline.run(...)
distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")
Stopping the pipeline¶
In case you want to stop the pipeline while it's running using the Ctrl+c
(Cmd+c
in macos), we automatically catch the signal and try to finish whatever steps are currently running. If it got hang by some reason, repeating the command 2 times it will force the pipeline close.
Note
When pushing sending the signal to kill the process, you can expect to see the following log messages:
Cache¶
If we try to execute the pipeline again, the pipeline won't execute as it will load the dataset from the cache, and the outputs of the pipeline will be the same as the previous run. If for some reason, we decide to stop the pipeline execution in the middle of the process pressing CTRL + C, the pipeline will stop and the state of the pipeline and the outputs will be stored in the cache, so we can resume the pipeline execution from the point where it was stopped.
If we want to force the pipeline to run again, then we can use the use_cache
argument of the run
method and set it to False
:
Adjusting the batch size for each step¶
It's very likely that in some pipelines the batch size of the steps (the number of dictionaries that will receive every Step.process
method when called) will need to be adjusted in order to avoid memory issues or a more efficient processing. To do so, we can use the input_batch_size
argument of the run
method:
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(
name=f"text_generation_with_{llm.model_name}",
llm=llm,
input_batch_size=5, # (1)
)
...
- Use the
input_batch_size
argument to set the batch size of theTextGeneration
task to 5.
When we run the pipeline, the TextGeneration
task will receive 5 dictionaries in every call to the process
method. In addition, we can also adjust the batch size of the generated batches by the GeneratorStep
s using the batch_size
argument:
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"},
batch_size=10 # (1)
)
...
- Use the
batch_size
argument to set the batch size of theLoadHubDataset
step to 10.
By default, both arguments have a value of 50
.
Serializing the pipeline¶
Sharing a pipeline with others is very easy, as we can serialize the pipeline object using the save
method. We can save the pipeline in different formats, such as yaml
or json
:
To load the pipeline, we can use the from_yaml
or from_json
methods:
Serializing the pipeline is very useful when we want to share the pipeline with others, or when we want to store the pipeline for future use. It can even be hosted online, so the pipeline can be executed directly using the CLI knowing the URL of the pipeline.
Fully working example¶
To sump up, here is the full code of the pipeline we have created in this section. Note that you will need to change the name of the Hugging Face repository where the resulting will be pushed, set OPENAI_API_KEY
environment variable, set MISTRAL_API_KEY
and have gcloud
installed and configured:
Code
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"},
)
combine_generations = CombineColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.0-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
load_dataset.connect(task)
task.connect(combine_generations)
if __name__ == "__main__":
distiset = pipeline.run(
parameters={
"load_dataset": {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
"text_generation_with_gpt-4-0125-preview": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_mistral-large-2402": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_gemini-1.0-pro": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
},
)
distiset.push_to_hub(
"distilabel-internal-testing/instruction-dataset-mini-with-generations"
)