Skip to content

Pipeline

The Pipeline is the central point in distilabel, the way to organize the steps to create your datasets. Up to this point we've seen how we can define different Steps and Tasks, that together with an LLM are the building blocks of our datasets, in this section we will take a look at how all these blocks are organized inside a Pipeline.

Note

Currently distilabel implements a local version of a Pipeline, and will assume that's the only definition, but this can be extended in the future to include remote execution of the Pipeline.

How to create a pipeline

The most common way a Pipeline should be created is by making use of the context manager, we just need to give our Pipeline a name, and optionally a description, and that's it1:

from distilabel.pipeline import Pipeline

with Pipeline("pipe-name", description="My first pipe") as pipeline:  # (1)
    ...
  1. Use the context manager to create a Pipeline with a name and an optional description.

This way, we ensure all the steps we define there are connected with each other under the same Pipeline. The next step is to define the steps of our Pipeline. It's mandatory that the root steps of the pipeline i.e. the ones that doesn't have any predecessors, are GeneratorSteps such as LoadDataFromDicts or LoadHubDataset.

from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(name="load_dataset")  # (1)
    ...
  1. Define the first step of the pipeline, in this case LoadHubDataset, a GeneratorStep used to load a dataset from the Hugging Face Hub.

Once we have a source of data, we can create another Steps that will consume and process the data generated by the GeneratorSteps. Let's assume that the dataset we're going to load from the Hub contains a prompt column and that we want to generate texts based on this prompt. We also want to use several LLMs for this task. To do so, we will create several TextGeneration tasks, each with a different LLM.

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(name="load_dataset")

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)  # (1)
        task.connect(load_dataset)  # (2)

    ...
  1. Create a TextGeneration task for each LLM we want to use.
  2. Connect the TextGeneration task with the LoadHubDataset step, so the output data from the dataset is passed to the task.

Note

The order of the execution of the steps will be determined by the connections of the steps. In this case, the TextGeneration tasks will be executed after the LoadHubDataset step.

For each row of the dataset, the TextGeneration task will generate a text based on the instruction column and the LLM model, and store the result (a single string) in a new column called generation. As we would like to have all the responses in the same column, we will add an extra step to combine them all in the same column, so the value of this column is a list of strings or responses.

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(name="load_dataset")

    combine_generations = CombineColumns(  # (1)
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset.connect(task)
        task.connect(combine_generations)  # (2)
  1. Create a CombineColumns step to combine all the generation columns into a single column called generations and the model_name columns into a single column called model_names.
  2. Connect the TextGeneration task with the CombineColumns step, so the output data from the task is passed to the step that will combine all the generation columns.

As the CombineColumns is the last step or it's a leaf step of the pipeline because it doesn't have any successors, that means that the outputs of this step will be included in the returned Distiset by the pipeline.

Note

One pipeline can have several leaf steps, which means that the outputs of all the leaf steps will be included in the returned Distiset, which will contain several subsets, one for each leaf step.

Running the pipeline

Once we have created the pipeline, we can run it. To do so, we need to call the run method of the Pipeline, and specify the runtime parameters for each step:

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            "load_dataset": {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
            "text_generation_with_gpt-4-0125-preview": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_mistral-large-2402": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_gemini-1.0-pro": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
        },
    )

But if we run it, we will see that the run method will fail:

ValueError: Step 'text_generation_with_gpt-4-0125-preview' requires inputs ['instruction'] which are not available when the step gets to be executed in the pipeline. Please make sure previous steps to 'text_generation_with_gpt-4-0125-preview' are 
generating the required inputs. Available inputs are: ['prompt', 'completion', 'meta']

This is because, before actually running the pipeline, the pipeline is validated to verify that everything is correct and all the steps in the pipeline are chainable i.e. each step has the necessary inputs to be executed. In this case, the TextGeneration task requires the instruction column, but the LoadHubDataset step generates the prompt column. To solve this, we can use the output_mappings argument that every Step has, to map or rename the output columns of a step to the required input columns of another step:

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},  # (1)
    )

    ...
  1. Use the output_mappings argument to map the prompt column generated by the LoadHubDataset step to the instruction column required by the TextGeneration task.

If we execute the pipeline again, it will run successfully and we will have a Distiset with the outputs of all the leaf steps of the pipeline which we can push to the Hugging Face Hub.

if __name__ == "__main__":
    distiset = pipeline.run(...)
    distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")

Cache

If we try to execute the pipeline again, the pipeline won't execute as it will load the dataset from the cache, and the outputs of the pipeline will be the same as the previous run. If for some reason, we decide to stop the pipeline execution in the middle of the process pressing CTRL + C, the pipeline will stop and the state of the pipeline and the outputs will be stored in the cache, so we can resume the pipeline execution from the point where it was stopped.

If we want to force the pipeline to run again, then we can use the use_cache argument of the run method and set it to False:

if __name__ == "__main__":
    distiset = pipeline.run(parameters={...}, use_cache=False)

Adjusting the batch size for each step

It's very likely that in some pipelines the batch size of the steps (the number of dictionaries that will receive every Step.process method when called) will need to be adjusted in order to avoid memory issues or a more efficient processing. To do so, we can use the input_batch_size argument of the run method:

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(
            name=f"text_generation_with_{llm.model_name}", 
            llm=llm,
            input_batch_size=5,  # (1)
        )

    ...
  1. Use the input_batch_size argument to set the batch size of the TextGeneration task to 5.

When we run the pipeline, the TextGeneration task will receive 5 dictionaries in every call to the process method. In addition, we can also adjust the batch size of the generated batches by the GeneratorSteps using the batch_size argument:

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
        batch_size=10  # (1)
    )

    ...
  1. Use the batch_size argument to set the batch size of the LoadHubDataset step to 10.

By default, both arguments have a value of 50.

Serializing the pipeline

Sharing a pipeline with others is very easy, as we can serialize the pipeline object using the save method. We can save the pipeline in different formats, such as yaml or json:

if __name__ == "__main__":
    pipeline.save("pipeline.yaml", format="yaml")

To load the pipeline, we can use the from_yaml or from_json methods:

pipeline = Pipeline.from_yaml("pipeline.yaml")

Serializing the pipeline is very useful when we want to share the pipeline with others, or when we want to store the pipeline for future use. It can even be hosted online, so the pipeline can be executed directly using the CLI knowing the URL of the pipeline.

Fully working example

To sump up, here is the full code of the pipeline we have created in this section. Note that you will need to change the name of the Hugging Face repository where the resulting will be pushed, set OPENAI_API_KEY environment variable, set MISTRAL_API_KEY and have gcloud installed and configured:

Code
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
    )

    combine_generations = CombineColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.0-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset.connect(task)
        task.connect(combine_generations)

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            "load_dataset": {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
            "text_generation_with_gpt-4-0125-preview": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_mistral-large-2402": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_gemini-1.0-pro": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
        },
    )
    distiset.push_to_hub(
        "distilabel-internal-testing/instruction-dataset-mini-with-generations"
    )

  1. We also have the cache_dir argument to pass, for more information on this parameter, we refer the reader to the caching section.