Pipeline¶
The Pipeline is the central point in distilabel, the way to organize the steps to create your datasets. Up to this point we've seen how we can define different Steps and Tasks, that together with an LLM are the building blocks of our datasets, in this section we will take a look at how all these blocks are organized inside a Pipeline.
Note
Currently distilabel implements a local version of a Pipeline, and will assume that's the only definition, but this can be extended in the future to include remote execution of the Pipeline.
How to create a pipeline¶
The most common way a Pipeline should be created is by making use of the context manager, we just need to give our Pipeline a name, and optionally a description, and that's it1:
from distilabel.pipeline import Pipeline
with Pipeline("pipe-name", description="My first pipe") as pipeline: # (1)
...
- Use the context manager to create a
Pipelinewith a name and an optional description.
This way, we ensure all the steps we define there are connected with each other under the same Pipeline. The next step is to define the steps of our Pipeline. It's mandatory that the root steps of the pipeline i.e. the ones that doesn't have any predecessors, are GeneratorSteps such as LoadDataFromDicts or LoadHubDataset.
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset") # (1)
...
- Define the first step of the pipeline, in this case
LoadHubDataset, aGeneratorStepused to load a dataset from the Hugging Face Hub.
Once we have a source of data, we can create another Steps that will consume and process the data generated by the GeneratorSteps. Let's assume that the dataset we're going to load from the Hub contains a prompt column and that we want to generate texts based on this prompt. We also want to use several LLMs for this task. To do so, we will create several TextGeneration tasks, each with a different LLM.
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset")
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm) # (1)
task.connect(load_dataset) # (2)
...
- Create a
TextGenerationtask for eachLLMwe want to use. - Connect the
TextGenerationtask with theLoadHubDatasetstep, so the output data from the dataset is passed to the task.
Note
The order of the execution of the steps will be determined by the connections of the steps. In this case, the TextGeneration tasks will be executed after the LoadHubDataset step.
For each row of the dataset, the TextGeneration task will generate a text based on the instruction column and the LLM model, and store the result (a single string) in a new column called generation. As we would like to have all the responses in the same column, we will add an extra step to combine them all in the same column, so the value of this column is a list of strings or responses.
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(name="load_dataset")
combine_generations = CombineColumns( # (1)
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
load_dataset.connect(task)
task.connect(combine_generations) # (2)
- Create a
CombineColumnsstep to combine all thegenerationcolumns into a single column calledgenerationsand themodel_namecolumns into a single column calledmodel_names. - Connect the
TextGenerationtask with theCombineColumnsstep, so the output data from the task is passed to the step that will combine all thegenerationcolumns.
As the CombineColumns is the last step or it's a leaf step of the pipeline because it doesn't have any successors, that means that the outputs of this step will be included in the returned Distiset by the pipeline.
Note
One pipeline can have several leaf steps, which means that the outputs of all the leaf steps will be included in the returned Distiset, which will contain several subsets, one for each leaf step.
Running the pipeline¶
Once we have created the pipeline, we can run it. To do so, we need to call the run method of the Pipeline, and specify the runtime parameters for each step:
with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
if __name__ == "__main__":
distiset = pipeline.run(
parameters={
"load_dataset": {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
"text_generation_with_gpt-4-0125-preview": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_mistral-large-2402": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_gemini-1.0-pro": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
},
)
But if we run it, we will see that the run method will fail:
ValueError: Step 'text_generation_with_gpt-4-0125-preview' requires inputs ['instruction'] which are not available when the step gets to be executed in the pipeline. Please make sure previous steps to 'text_generation_with_gpt-4-0125-preview' are
generating the required inputs. Available inputs are: ['prompt', 'completion', 'meta']
This is because, before actually running the pipeline, the pipeline is validated to verify that everything is correct and all the steps in the pipeline are chainable i.e. each step has the necessary inputs to be executed. In this case, the TextGeneration task requires the instruction column, but the LoadHubDataset step generates the prompt column. To solve this, we can use the output_mappings argument that every Step has, to map or rename the output columns of a step to the required input columns of another step:
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"}, # (1)
)
...
- Use the
output_mappingsargument to map thepromptcolumn generated by theLoadHubDatasetstep to theinstructioncolumn required by theTextGenerationtask.
If we execute the pipeline again, it will run successfully and we will have a Distiset with the outputs of all the leaf steps of the pipeline which we can push to the Hugging Face Hub.
if __name__ == "__main__":
distiset = pipeline.run(...)
distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")
Cache¶
If we try to execute the pipeline again, the pipeline won't execute as it will load the dataset from the cache, and the outputs of the pipeline will be the same as the previous run. If for some reason, we decide to stop the pipeline execution in the middle of the process pressing CTRL + C, the pipeline will stop and the state of the pipeline and the outputs will be stored in the cache, so we can resume the pipeline execution from the point where it was stopped.
If we want to force the pipeline to run again, then we can use the use_cache argument of the run method and set it to False:
Adjusting the batch size for each step¶
It's very likely that in some pipelines the batch size of the steps (the number of dictionaries that will receive every Step.process method when called) will need to be adjusted in order to avoid memory issues or a more efficient processing. To do so, we can use the input_batch_size argument of the run method:
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(
name=f"text_generation_with_{llm.model_name}",
llm=llm,
input_batch_size=5, # (1)
)
...
- Use the
input_batch_sizeargument to set the batch size of theTextGenerationtask to 5.
When we run the pipeline, the TextGeneration task will receive 5 dictionaries in every call to the process method. In addition, we can also adjust the batch size of the generated batches by the GeneratorSteps using the batch_size argument:
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"},
batch_size=10 # (1)
)
...
- Use the
batch_sizeargument to set the batch size of theLoadHubDatasetstep to 10.
By default, both arguments have a value of 50.
Serializing the pipeline¶
Sharing a pipeline with others is very easy, as we can serialize the pipeline object using the save method. We can save the pipeline in different formats, such as yaml or json:
To load the pipeline, we can use the from_yaml or from_json methods:
Serializing the pipeline is very useful when we want to share the pipeline with others, or when we want to store the pipeline for future use. It can even be hosted online, so the pipeline can be executed directly using the CLI knowing the URL of the pipeline.
Fully working example¶
To sump up, here is the full code of the pipeline we have created in this section. Note that you will need to change the name of the Hugging Face repository where the resulting will be pushed, set OPENAI_API_KEY environment variable, set MISTRAL_API_KEY and have gcloud installed and configured:
Code
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
name="load_dataset",
output_mappings={"prompt": "instruction"},
)
combine_generations = CombineColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.0-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
load_dataset.connect(task)
task.connect(combine_generations)
if __name__ == "__main__":
distiset = pipeline.run(
parameters={
"load_dataset": {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
"text_generation_with_gpt-4-0125-preview": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_mistral-large-2402": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_gemini-1.0-pro": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
},
)
distiset.push_to_hub(
"distilabel-internal-testing/instruction-dataset-mini-with-generations"
)