Skip to content

Pipeline

The Pipeline is the central point in distilabel, the way to organize the steps to create your datasets. Up to this point we've seen how we can define different Step and Task subclasses in Tutorial - Step and Tutorial - Task, respectively; which together with an LLM are the building blocks of our datasets, in this section we will take a look at how all these blocks are organized inside a Pipeline.

Note

Currently distilabel implements a local version of a Pipeline, and will assume that's the only definition, but this can be extended in the future to include remote execution of the Pipeline.

How to create a pipeline

The most common way a Pipeline should be created is by making use of the context manager, we just need to give our Pipeline a name, and optionally a description, and that's it1:

from distilabel.pipeline import Pipeline

with Pipeline("pipe-name", description="My first pipe") as pipeline:  # (1)
    ...
  1. Use the context manager to create a Pipeline with a name and an optional description.

This way, we ensure all the steps we define there are connected with each other under the same Pipeline. The next step is to define the steps of our Pipeline. It's mandatory that the root steps of the pipeline i.e. the ones that doesn't have any predecessors, are GeneratorSteps such as LoadDataFromDicts or LoadHubDataset.

from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(name="load_dataset")  # (1)
    ...
  1. Define the first step of the pipeline, in this case LoadHubDataset, a GeneratorStep used to load a dataset from the Hugging Face Hub.

Once we have a source of data, we can create another Steps that will consume and process the data generated by the GeneratorSteps. Let's assume that the dataset we're going to load from the Hub contains a prompt column and that we want to generate texts based on this prompt. We also want to use several LLMs for this task. To do so, we will create several TextGeneration tasks, each with a different LLM.

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(name="load_dataset")

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)  # (1)
        task.connect(load_dataset)  # (2)

    ...
  1. Create a TextGeneration task for each LLM we want to use.
  2. Connect the TextGeneration task with the LoadHubDataset step, so the output data from the dataset is passed to the task.

Note

The order of the execution of the steps will be determined by the connections of the steps. In this case, the TextGeneration tasks will be executed after the LoadHubDataset step.

For each row of the dataset, the TextGeneration task will generate a text based on the instruction column and the LLM model, and store the result (a single string) in a new column called generation. As we would like to have all the responses in the same column, we will add an extra step to combine them all in the same column, so the value of this column is a list of strings or responses.

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(name="load_dataset")

    combine_generations = CombineColumns(  # (1)
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset.connect(task)
        task.connect(combine_generations)  # (2)
  1. Create a CombineColumns step to combine all the generation columns into a single column called generations and the model_name columns into a single column called model_names.
  2. Connect the TextGeneration task with the CombineColumns step, so the output data from the task is passed to the step that will combine all the generation columns.

As the CombineColumns is the last step or it's a leaf step of the pipeline because it doesn't have any successors, that means that the outputs of this step will be included in the returned Distiset (more information about it in Advanced - Distiset).

Note

One pipeline can have several leaf steps, which means that the outputs of all the leaf steps will be included in the returned Distiset, which will contain several subsets, one for each leaf step.

Connecting steps

In the previous example we saw how to create a Pipeline and connect different steps using the Step.connect method: step1.connect(step2), but there's an alternative way by making use of the >> operator, let's see how using the previous Pipeline as an example:

Each call to step1.connect(step2) has been exchanged by step1 >> step2:

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(name="load_dataset")

    combine_generations = CombineColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset >> task >> combine_generations  # (1)
  1. Here load_dataset >> task >> combine_generations was exchanged with load_dataset.connect(task).connect(combine_generations).

All the calls to connections from the load_dataset step to the different task objects are done in a single call:

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(name="load_dataset")

    combine_generations = CombineColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    tasks = []
    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        tasks.append(
            TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        )

    load_dataset >> tasks >> combine_generations  # (1)
  1. Notice how tasks is a list of different Tasks. In a single call to the operator we are connecting load_dataset with all the tasks, and all of those again to combine_generations.

Routing batches to specific downstream steps

In some pipelines, it's likely that you will need to have a list of downstream steps receiving batches from the same upstream step, but you would like to route the batches to specific downstream steps based on some condition. To do so, you can use a routing_batch_function, which is a simple function that receives a list of the downstream steps to which a batch can be routed, and returns a list containing the names of steps to which the batch should be routed. Let's update the example above to route the batches loaded by the LoadHubDataset step to just 2 of the TextGeneration tasks. First, we will create our custom routing_batch_function, and then we will update the pipeline to use it:

import random
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
    return random.sample(steps, 2)

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
    )

    tasks = []
    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.0-pro"),
    ):
        tasks.append(
            TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        )

    combine_generations = CombineColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    load_dataset >> sample_two_steps >> tasks >> combine_generations

As it can be seen, the routing_batch_function can be used with the >> operator to route the batches to specific downstream steps. In this case, each batch yielded by the load_dataset step will be routed to just 2 of the TextGeneration tasks, and then all the outputs of the tasks will be combined in the CombineColumns step so each row of the final dataset will contain generations of 2 LLMs at most. The routing_batch_function that we just built is a common one, so distilabel comes with an auxiliary function that can be used to achieve the same behavior:

from distilable.pipeline import sample_n_steps

sample_two_steps = sample_n_steps(2)

Running the pipeline

Pipeline.dry_run

Before running the Pipeline we may want to check all the components behave as expected. We can do a dry_run for this case:

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

if __name__ == "__main__":
    distiset = pipeline.dry_run(parameters=..., batch_size=1)

It takes the same parameters as the run method we will see in the following section, plus the batch_size we want the dry run to use (by default set to 1). In this case, the Pipeline would select a single example from our generator steps and pass through all the steps. Assuming the dry_run runs successfully, we are ready to run our pipeline.

Pipeline.run

Once we have created the pipeline, we can run it. To do so, we need to call the run method of the Pipeline, and specify the runtime parameters for each step:

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            "load_dataset": {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
            "text_generation_with_gpt-4-0125-preview": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_mistral-large-2402": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_gemini-1.0-pro": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
        },
    )

But if we run it, we will see that the run method will fail:

ValueError: Step 'text_generation_with_gpt-4-0125-preview' requires inputs ['instruction'], but only the inputs=['prompt', 'completion', 'meta'] are available, which means that the inputs=['instruction'] are missing or not available
when the step gets to be executed in the pipeline. Please make sure previous steps to 'text_generation_with_gpt-4-0125-preview' are generating the required inputs.

This is because, before actually running the pipeline, the pipeline is validated to verify that everything is correct and all the steps in the pipeline are chainable i.e. each step has the necessary inputs to be executed. In this case, the TextGeneration task requires the instruction column, but the LoadHubDataset step generates the prompt column. To solve this, we can use the output_mappings argument that every Step has, to map or rename the output columns of a step to the required input columns of another step:

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},  # (1)
    )

    ...
  1. Use the output_mappings argument to map the prompt column generated by the LoadHubDataset step to the instruction column required by the TextGeneration task.

If we execute the pipeline again, it will run successfully and we will have a Distiset with the outputs of all the leaf steps of the pipeline which we can push to the Hugging Face Hub.

if __name__ == "__main__":
    distiset = pipeline.run(...)
    distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")

Stopping the pipeline

In case you want to stop the pipeline while it's running using the Ctrl+c (Cmd+c in macos), we automatically catch the signal and try to finish whatever steps are currently running. If it got hang by some reason, repeating the command 2 times it will force the pipeline close.

Note

When pushing sending the signal to kill the process, you can expect to see the following log messages:

Pipeline ctrl+c

Cache

If we try to execute the pipeline again, the pipeline won't execute as it will load the dataset from the cache, and the outputs of the pipeline will be the same as the previous run. If for some reason, we decide to stop the pipeline execution in the middle of the process pressing CTRL + C, the pipeline will stop and the state of the pipeline and the outputs will be stored in the cache, so we can resume the pipeline execution from the point where it was stopped.

If we want to force the pipeline to run again, then we can use the use_cache argument of the run method and set it to False:

if __name__ == "__main__":
    distiset = pipeline.run(parameters={...}, use_cache=False)

Adjusting the batch size for each step

It's very likely that in some pipelines the batch size of the steps (the number of dictionaries that will receive every Step.process method when called) will need to be adjusted in order to avoid memory issues or a more efficient processing. To do so, we can use the input_batch_size argument of the run method:

from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(
            name=f"text_generation_with_{llm.model_name}",
            llm=llm,
            input_batch_size=5,  # (1)
        )

    ...
  1. Use the input_batch_size argument to set the batch size of the TextGeneration task to 5.

When we run the pipeline, the TextGeneration task will receive 5 dictionaries in every call to the process method. In addition, we can also adjust the batch size of the generated batches by the GeneratorSteps using the batch_size argument:

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
        batch_size=10  # (1)
    )

    ...
  1. Use the batch_size argument to set the batch size of the LoadHubDataset step to 10.

By default, both arguments have a value of 50.

Serializing the pipeline

Sharing a pipeline with others is very easy, as we can serialize the pipeline object using the save method. We can save the pipeline in different formats, such as yaml or json:

if __name__ == "__main__":
    pipeline.save("pipeline.yaml", format="yaml")

To load the pipeline, we can use the from_yaml or from_json methods:

pipeline = Pipeline.from_yaml("pipeline.yaml")

Serializing the pipeline is very useful when we want to share the pipeline with others, or when we want to store the pipeline for future use. It can even be hosted online, so the pipeline can be executed directly using the CLI knowing the URL of the pipeline.

Fully working example

To sump up, here is the full code of the pipeline we have created in this section. Note that you will need to change the name of the Hugging Face repository where the resulting will be pushed, set OPENAI_API_KEY environment variable, set MISTRAL_API_KEY and have gcloud installed and configured:

Code
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineColumns, LoadHubDataset
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
    )

    combine_generations = CombineColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.0-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset.connect(task)
        task.connect(combine_generations)

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            "load_dataset": {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
            "text_generation_with_gpt-4-0125-preview": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_mistral-large-2402": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_gemini-1.0-pro": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
        },
    )
    distiset.push_to_hub(
        "distilabel-internal-testing/instruction-dataset-mini-with-generations"
    )

  1. We also have the cache_dir argument to pass, for more information on this parameter, we refer the reader to the caching section.