Skip to content

Pipelines

This section will detail the Pipeline, providing guidance on creating and using them.

Pipeline

The Pipeline class is a central component in distilabel, responsible for crafting datasets. It manages the generation of datasets and oversees the interaction between the generator and labeller LLMs.

You create an instance of the Pipeline by providing a generator and an optional labeller LLM. Interactions with it are facilitated through its generate method. This method requires a dataset, specifies the num_generations to determine the number of examples to be created, and includes additional parameters for controlling the batch_size and managing the generation process.

Let's start by a Pipeline with a single LLM as a generator.

Generator

We will create a Pipeline that will use Notus from a HuggingFace Inference Endpoint. For this matter, we need to create a TextGenerationTask, and specify the format we want to use for our Prompt, in this case Notus, which corresponds to the same for Zephyr.

import os

from distilabel.llm import InferenceEndpointsLLM
from distilabel.pipeline import Pipeline
from distilabel.tasks import TextGenerationTask

endpoint_name = "aws-notus-7b-v1-4052" or os.getenv("HF_INFERENCE_ENDPOINT_NAME")
endpoint_namespace = "argilla" or os.getenv("HF_NAMESPACE")

pipe_generation = Pipeline(
    generator=InferenceEndpointsLLM(
        endpoint_name_or_model_id=endpoint_name,  # The name given of the deployed model
        endpoint_namespace=endpoint_namespace,  # This usually corresponds to the organization, in this case "argilla"
        token=os.getenv("HF_TOKEN"),  # hf_...
        task=TextGenerationTask(),
        max_new_tokens=512,
        do_sample=True,
        prompt_format="notus",
    ),
)

We've set up our pipeline using a specialized TextGenerationTask (refer to the tasks section for more task details), and an InferenceEndpointsLLM configured for notus-7b-v1, although any of the available LLMs will work.

To use the Pipeline for dataset generation, we call the generate method. We provide it with the input dataset and specify the desired number of generations. In this example, we've prepared a Dataset with a single row to illustrate the process. This dataset contains one row, and we'll trigger 2 generations from it:

from datasets import Dataset

dataset = Dataset.from_dict(
    {"input": ["Create an easy dinner recipe with few ingredients"]}
)
dataset_generated = pipe_generation.generate(dataset, num_generations=2)

Now, let's examine the dataset that was generated. It's a CustomDataset, equipped with additional features for seamless interaction with Argilla.

print(dataset_generated)
# Dataset({
#     features: ['input', 'generation_model', 'generation_prompt', 'raw_generation_responses', 'generations'],
#     num_rows: 1
# })

print(dataset_generated[0]["generations"][0])
# Here's a simple and delicious dinner recipe with only a few ingredients:

# Garlic Butter Chicken with Roasted Vegetables

# Ingredients:
# - 4 boneless, skinless chicken breasts
# - 4 tablespoons butter
# - 4 cloves garlic, minced
# - 1 teaspoon dried oregano
# - 1/2 teaspoon salt
# - 1/4 teaspoon black pepper
# - 1 zucchini, sliced
# - 1 red bell pepper, sliced
# - 1 cup cherry tomatoes

# Instructions:

# 1. Preheat oven to 400°F (200°C).

# 2. Melt butter in a small saucepan over low heat. Add minced garlic and heat until fragrant, about 1-2 minutes.

# 3. Place chicken breasts in a baking dish and brush garlic butter over each one.

# 4. Sprinkle oregano, salt, and black pepper over the chicken.

# 5. In a separate baking dish, add sliced zucchini, red bell pepper, and cherry tomatoes. Brush with remaining garlic butter.

# 6. Roast the chicken and vegetables in the preheated oven for 25-30 minutes or until cooked through and the vegetables are tender and lightly browned.

# 7. Transfer the chicken to plates and serve with the roasted vegetables alongside. Enjoy!

# This recipe requires simple ingredients and is easy to prepare, making it perfect for a quick, satisfying dinner. The garlic butter adds maximum flavor, while the roasted vegetables complement the chicken beautifully, providing additional nutrition and texture. With minimal effort, you can have a delicious and balanced meal on the table in no time.

Labeller

Next, we move on to labelling a dataset. Just as before, we need an LLM for our Pipeline. In this case we will use OpenAILLM with gpt-4, and a PreferenceTask, UltraFeedbackTask for instruction following.

import os

from distilabel.llm import OpenAILLM
from distilabel.pipeline import Pipeline
from distilabel.tasks import UltraFeedbackTask

pipe_labeller = Pipeline(
    labeller=OpenAILLM(
        model="gpt-4",
        task=UltraFeedbackTask.for_instruction_following(),
        max_new_tokens=256,
        num_threads=8,
        api_key=os.getenv("OPENAI_API_KEY", None),
        temperature=0.3,
    ),
)

For this example dataset, we've extracted 2 sample rows from the UltraFeedback binarized dataset, formatted as expected by the default LLM and Task.

We've selected two distinct examples, one correctly labeled and the other incorrectly labeled in the original dataset. In this instance, the dataset being generated includes two columns: the input, as seen in the generator, and a generations column containing the model's responses.

from datasets import Dataset

dataset_test = Dataset.from_dict(
    {
        "input": [
            "Describe the capital of Spain in 25 words.",
            "Design a conversation between a customer and a customer service agent.",
        ],
        "generations": [
            ["Santo Domingo is the capital of Dominican Republic"],
            [
                "Customer: Hello, I'm having trouble with my purchase.\n\nCustomer Service Agent: I'm sorry to hear that. Could you please tell me more about the issue you are facing?\n\nCustomer: Yes, I ordered a pair of shoes from your company a week ago, but I haven't received them yet.\n\nCustomer Service Agent: I apologize for the inconvenience. Could you please provide me with your order number and full name so I can look into this for you?\n\nCustomer: Sure, my name is John Doe and my order number is ABCD1234.\n\nCustomer Service Agent: Thank you, John. I have checked on your order and it appears that it is still being processed. It should be shipped out within the next 24 hours.\n\nCustomer: That's good to hear, but can you also tell me the expected delivery time?\n\nCustomer Service Agent: Absolutely, based on your location, the estimated delivery time is 3-5 business days after shipping. You will receive a tracking number via email once the item is shipped, which will provide real-time updates on your package.\n\nCustomer: Thanks for the information. One more thing, what is your return policy if the shoes don't fit?\n\nCustomer Service Agent: Our company offers a 30-day return policy. If you are not satisfied with the product or if it doesn't fit, you can return it for a full refund or an exchange within 30 days of delivery. Please keep in mind that the product must be in its original packaging and in the same condition as when you received it.\n\nCustomer: Okay, that's good to know. Thank you for your help.\n\nCustomer Service Agent: You're welcome, John. I'm glad I could assist you. If you have any further questions or concerns, please don't hesitate to reach out to us. Have a great day!"
            ],
        ],
    }
)

ds_labelled = pipe_labeller.generate(dataset_test)

Let's select the relevant columns from the labelled dataset, and take a look at the first record. This allows us to observe the rating and the accompanying rationale that provides an explanation.

ds_labelled.select_columns(["input", "generations", "rating", "rationale"])[0]
# {
#     "input": "Describe the capital of Spain in 25 words.",
#     "generations": ["Santo Domingo is the capital of Dominican Republic"],
#     "rating": [1.0],
#     "rationale": [
#         "The text is irrelevant to the instruction. It describes the capital of the Dominican Republic instead of Spain."
#     ],
# }

Generator and Labeller

In the final scenario, we have a Pipeline utilizing both a generator and a labeller LLM. Once more, we'll employ the Inference Endpoint with notus-7b-v1 for the generator, using a different system prompt this time. As for the labeller, we'll use gpt-3.5-turbo, which will label the examples for instruction following.

import os

from distilabel.llm import InferenceEndpointsLLM, OpenAILLM
from distilabel.pipeline import Pipeline
from distilabel.tasks import TextGenerationTask, UltraFeedbackTask

pipe_full = Pipeline(
    generator=InferenceEndpointsLLM(
        endpoint_name_or_model_id=endpoint_name,
        endpoint_namespace=endpoint_namespace,
        task=TextGenerationTask(
            system_prompt="You are an expert writer of XKCD, a webcomic of romance, sarcasm, math, and language."
        ),
        max_new_tokens=512,
        do_sample=True,
        prompt_format="notus",
    ),
    labeller=OpenAILLM(
        model="gpt-3.5-turbo",
        task=UltraFeedbackTask.for_instruction_following(),
        max_new_tokens=256,
        num_threads=4,
        api_key=os.getenv("OPENAI_API_KEY", None),
        temperature=0.3,
    ),
)

For this example, we'll set up a pipeline to generate and label a dataset of short stories inspired by XKCD. To do this, we'll define the system_prompt for the NotusTextGenerationTask. The dataset will follow the same format we used for the generator scenario, featuring an input column with the examples, in this case, just one.

from datasets import Dataset

xkcd_instructions = Dataset.from_dict(
    {"input": ["Could you imagine an interview process going sideways?"]}
)
ds_xkcd = pipe_full.generate(xkcd_instructions, num_generations=3)

We will now take a look to one of the generations, along with the rating and rational given by our labeller LLM:

print(ds_xkcd[1]["generations"][0])
print("-----" * 5)
print("RATING: ", ds_xkcd[1]["rating"][0])
print("RATIONALE: ", ds_xkcd[1]["rationale"][0])

# Yes, absolutely! Here's a fictional interview scenario turned into an XKCD-style comic:

# (Interviewee meets with an unsmiling interviewer)

# Interviewer: Good morning! Have a seat. Tell me about your experience working with teams.

# Interviewee: Well, I've worked in large teams on group projects before. It could be challenging, but we always managed to pull through.

# (Smugly) Interviewer: Challenging, huh? (tapping pen on desk) And how did you manage to overcome these challenges?

# Interviewee: (confidently) Communication was key. I made sure to stay in touch with the team and keep everyone updated on our progress.

# Interviewer: Communication. Hm. And what if communication failed?

# Interviewee: (thrown off balance) Well, I mean...there was one time when we couldn't connect via video call. But we picked up the phone, and we all understood what needed to be done.

# Interviewer: But what if the communication on the technical level failed, say, like a computer system with a software glitch?

# Interviewee: (feeling the pressure) That's never happened to me before, but if it did, we would have to troubleshoot and find a workaround, right?

# Interviewer: (smirking) Oh, but finding a workaround could mean delegating responsibilities among the team, which requires communication. It's a vicious cycle!

# (Interviewee visibly uncomfortable)

# Interviewer: And what if there was a communication breakdown among the team members themselves?

# Interviewee: (unsure) I think we would try to sort it out as soon as possible to avoid any further problems.

# Interviewer: (sarcastically) Yes, avoiding further problems is critical. Don't want to let those deadlines slip, do we?

# (Interviewer types frantically on their computer keyboard)

# Interviewer: (softly but wordily) Note to self: Avoid this candidate for team projects.

# (The interviewer returns his attention back to the interviewee)

# Interviewer: Well, moving on...
# -------------------------
# RATING:  4.0
# RATIONALE:  The text provides a fictional interview scenario that aligns with the task goal of imagining an interview process going sideways. It includes dialogue between an interviewer and interviewee, showcasing a breakdown in communication and the interviewer's sarcastic and dismissive attitude towards the interviewee's responses.

Running several generators in parallel

distilabel also allows to use several LLMs as generators in parallel, thanks to the ProcessLLM and LLMPool classes. This comes handy for the cases where we want to use several LLMs and fed them with the same input, allowing us to later compare their outputs (to see which one is better) or even creating a Preference dataset, following a similar process to UltraFeedback dataset generation.

For this example, we will load four 7B LLMs using vLLM and a machine with 4 GPUs (to load each LLM in a different GPU). Then we will give instructions to all of them, and we will use GPT-4 to label the generated instructions using the UltraFeedbackTask for instruction-following.

First of all, we will need to load each LLM using a ProcessLLM. ProcessLLM will create a child process which will load the LLM using the load_llm_fn.

from distilabel.llm import LLM, ProcessLLM
from distilabel.tasks import Task, TextGenerationTask


def load_notus(task: Task) -> LLM:  # (1)
    import os

    from distilabel.llm import vLLM
    from vllm import LLM

    os.environ["CUDA_VISIBLE_DEVICES"] = "0"  # (2)

    return vLLM(
        model=LLM(model="argilla/notus-7b-v1"),
        task=task,
        max_new_tokens=512,
        temperature=0.7,
        prompt_format="notus",
    )


llm = ProcessLLM(task=TextGenerationTask(), load_llm_fn=load_notus)
  1. The ProcessLLM will create a child process in which the LLM will be loaded. Therefore, we will need to define a function that will be executed by the child process to load the LLM. The child process will pass the provided Task to the load_llm_fn.
  2. We set a value for CUDA_VISIBLE_DEVICES environment variable to make sure that each LLM is loaded in a different GPU.

We will repeat this pattern 4 times, each time with a different LLM and a different GPU.

from distilabel.llm import LLM, ProcessLLM
from distilabel.tasks import Task, TextGenerationTask


def load_notus(task: Task) -> LLM:
    import os

    from distilabel.llm import vLLM
    from vllm import LLM

    os.environ["CUDA_VISIBLE_DEVICES"] = "0"

    return vLLM(
        model=LLM(model="argilla/notus-7b-v1"),
        task=task,
        max_new_tokens=512,
        temperature=0.7,
        prompt_format="notus",
    )


def load_zephyr(task: Task) -> LLM:
    import os

    from distilabel.llm import vLLM
    from vllm import LLM

    os.environ["CUDA_VISIBLE_DEVICES"] = "1"

    return vLLM(
        model=LLM(model="HuggingFaceH4/zephyr-7b-beta"),
        task=task,
        max_new_tokens=512,
        temperature=0.7,
        prompt_format="notus",
    )


def load_starling(task: Task) -> LLM:
    import os

    from distilabel.llm import vLLM
    from vllm import LLM

    os.environ["CUDA_VISIBLE_DEVICES"] = "2"

    return vLLM(
        model=LLM(model="berkeley-nest/Starling-LM-7B-alpha"),
        task=task,
        max_new_tokens=512,
        temperature=0.7,
        prompt_format="notus",
    )


def load_neural_chat(task: Task) -> LLM:
    import os

    from distilabel.llm import vLLM
    from vllm import LLM

    os.environ["CUDA_VISIBLE_DEVICES"] = "3"

    return vLLM(
        model=LLM(model="Intel/neural-chat-7b-v3-3"),
        task=task,
        max_new_tokens=512,
        temperature=0.7,
        prompt_format="notus",
    )


notus = ProcessLLM(task=TextGenerationTask(), load_llm_fn=load_notus)
zephyr = ProcessLLM(task=TextGenerationTask(), load_llm_fn=load_zephyr)
starling = ProcessLLM(task=TextGenerationTask(), load_llm_fn=load_starling)
neural_chat = ProcessLLM(task=TextGenerationTask(), load_llm_fn=load_neural_chat)

In order to distribute the generations among the different LLMs, we will use a LLMPool. This class expects a list of ProcessLLM. Calling the generate method of the LLMPool will call the generate method of each LLMProcess in parallel, and will wait for all of them to finish, returning a list of lists of LLMOutputs with the generations.

from distilabel.llm import LLMPool

pool = LLMPool(llms=[notus, zephyr, starling, neural_chat])

We will use this LLMPool as the generator for our pipeline and we will use GPT-4 to label the generated instructions using the UltraFeedbackTask for instruction-following.

from distilabel.llm import LLM, ProcessLLM
from distilabel.pipeline import Pipeline
from distilabel.tasks import UltraFeedbackTask


def load_gpt_4(task: UltraFeedbackTask) -> LLM:
    from distilabel.llm import OpenAILLM

    return OpenAILLM(
        model="gpt-4-1106-preview",
        task=task,
        max_new_tokens=512,
        num_threads=4,
    )


pipeline = Pipeline(
    generator=pool,
    labeller=ProcessLLM(task=UltraFeedbackTask(), load_llm_fn=load_gpt_4),  # (1)
)
  1. We also will execute the calls to OpenAI API in a different process using the ProcessLLM. This will allow to not block the main process GIL, and allowing the generator to continue with the next batch.

Then, we will load the dataset and call the generate method of the pipeline. For each input in the dataset, the LLMPool will randomly select two LLMs and will generate 1 generation from each of them to generate 2 generations in total. The generations will be labelled by GPT-4 using the UltraFeedbackTask for instruction-following. Finally, we will push the generated dataset to Argilla, in order to review the generations and labels that were automatically generated, and to manually correct them if needed.

from datasets import load_dataset

dataset = (
    load_dataset("HuggingFaceH4/instruction-dataset", split="test[:50]")
    .remove_columns(["completion", "meta"])
    .rename_column("prompt", "input")
)

dataset = pipeline.generate(
    dataset=dataset,
    num_generations=2,
    batch_size=5,
    display_progress_bar=True,
)

dataset.to_argilla().push_to_argilla(name="preference-dataset", workspace="admin")

With a few lines of code, we have easily generated a dataset with 2 generations per input, using 4 different LLMs, and labelled the generations using GPT-4. You can check the full code here.

Dataset checkpoints

With long pipelines, it may be useful to review the dataset during the process, or have it saved in case something fails before obtaining the final dataset. We can use the checkpoint_strategy in Pipeline.generate method for this end:

from pathlib import Path
from distilabel.dataset import DatasetCheckpoint

# Assuming we want to save the dataset every 10% of the records generated.

freq = len(dataset) // 10
dataset_checkpoint = DatasetCheckpoint(path=Path.cwd() / "checkpoint_folder", save_frequency=freq)

By passing the checkpoint strategy to the generate method, the dataset will be saved to disk automatically every freq generations:

new_ds = pipe.generate(
    dataset=dataset,
    num_generations=1,
    checkpoint_strategy=dataset_checkpoint,
)

The dataset can be regenerated from the checkpoint by simply calling the CustomDataset.load_from_disk method.

from distilabel.dataset import CustomDataset
new_ds = CustomDataset.load_from_disk(dataset_checkpoint.path)

And with the dataset regenerated we can easily call push_to_argilla on it to review it.

Since distilabel 0.6.0, the DatasetCheckpoint includes a new argument strategy to determine how the dataset should be saved during the generation process.

By default, the strategy will be set to disk, and the dataset will be saved to disk as we saw in the previous example. Now it also includes hf-hub strategy, to push the dataset to the hub. Everything else remains the same, but now you can push your dataset to the HuggingFace hub every freq iterations. Let's see an example:

from distilabel.dataset import DatasetCheckpoint

dataset_checkpoint = DatasetCheckpoint(
    strategy="hf-hub",
    save_frequency=1,
    extra_kwargs={
        "repo_id": "username/dataset-name"
    }
)

new_ds = pipe.generate(
    dataset=dataset,
    num_generations=1,
    checkpoint_strategy=dataset_checkpoint,
)

To ensure the dataset is indeed saved once the generation process starts, the DatasetCheckpoint will check the token for HuggingFace is set, either via the HF_API_TOKEN environment variable, or you'll be required to pass the token variable with your token via the extra_kwargs argument.

Take a look at the API reference at DatasetCheckpoint.

pipeline

Considering recurring patterns in dataset creation, we can facilitate the process by utilizing the Pipeline. This is made simpler through the pipeline function, which provides the necessary parameters for creating a Pipeline.

In the code snippet below, we use the pipeline function to craft a pipeline tailored for a preference task, specifically focusing on text-quality as the subtask. If we don't initially provide a labeller LLM, we can specify the subtask we want our pipeline to address. By default, this corresponds to UltraFeedbackTask. It's mandatory to specify the generator of our choice; however, the labeller defaults to gpt-3.5-turbo. Optional parameters required for OpenAILLM can also be passed as optional keyword arguments.

import os

from distilabel.llm import InferenceEndpointsLLM
from distilabel.pipeline import pipeline
from distilabel.tasks import TextGenerationTask

pipe = pipeline(
    "preference",
    "text-quality",
    generator=InferenceEndpointsLLM(
        endpoint_name_or_model_id=endpoint_name,
        endpoint_namespace=endpoint_namespace,
        token=token,
        task=TextGenerationTask(),
        max_new_tokens=512,
        do_sample=True,
        prompt_format="notus",
    ),
    max_new_tokens=256,
    num_threads=2,
    api_key=os.getenv("OPENAI_API_KEY", None),
    temperature=0.0,
)

For the dataset, we'll begin with three rows from HuggingFaceH4/instruction-dataset. We'll request two generations with checkpoints enabled to safeguard the data in the event of any failures, which is the default behavior.

from datasets import load_dataset

instruction_dataset = (
    load_dataset("HuggingFaceH4/instruction-dataset", split="test[:3]")
    .remove_columns(["completion", "meta"])
    .rename_column("prompt", "input")
)

pipe_dataset = pipe.generate(
    instruction_dataset,
    num_generations=2,
    batch_size=1,
    checkpoint_strategy=True,
    display_progress_bar=True,
)

Finally, let's see one of the examples from the dataset:

print(pipe_dataset["input"][-1])
# Create a 3 turn conversation between a customer and a grocery store clerk - that is, 3 per person. Then tell me what they talked about.

print(pipe_dataset["generations"][-1][-1])
# Customer: Hi there, I'm looking for some fresh berries. Do you have any raspberries or blueberries in stock?

# Grocery Store Clerk: Yes, we have both raspberries and blueberries in stock today. Would you like me to grab some for you or can you find them yourself?

# Customer: I'd like your help getting some berries. Can you also suggest which variety is sweeter? Raspberries or blueberries?

# Grocery Store Clerk: Raspberries and blueberries both have distinct flavors. Raspberries are more tart and a little sweeter whereas blueberries tend to be a little sweeter and have a milder taste. It ultimately depends on your personal preference. Let me grab some of each for you to try at home and see which one you like better.

# Customer: That sounds like a great plan. How often do you receive deliveries? Do you have some new varieties of berries arriving soon?

# Grocery Store Clerk: We receive deliveries twice a week, on Wednesdays and Sundays. We also have a rotation of different varieties of berries throughout the season, so keep an eye out for new arrivals. Thanks for shopping with us, can I help you with anything else today?

# Customer: No, that's all for now. I'm always happy to support your local store.

# turn 1: berries, fresh produce availability, customer preference
# turn 2: product recommendations based on taste and personal preference, availability
# turn 3: store acknowledgment, shopping gratitude, loyalty and repeat business expectation.

print(pipe_dataset["rating"][-1][-1])
# 5.0

print(pipe_dataset["rationale"][-1][-1])
# The text accurately follows the given instructions and provides a conversation between a customer and a grocery store clerk. The information provided is correct, informative, and aligned with the user's intent. There are no hallucinations or misleading details.

The API reference can be found here: pipeline

Argilla integration

The CustomDataset generated entirely by AI models may require some additional human processing. To facilitate human feedback, the dataset can be uploaded to Argilla. This process involves logging into an Argilla instance, converting the dataset to the required format using CustomDataset.to_argilla(), and subsequently using push_to_argilla on the resulting dataset. This conversion automatically adds some out-of-the-box filtering and search parameters as semantic search vectors and through metrics as MetadataProperties. Some metrics are inferred from the Task outcome and some text-descriptives metrics added. These can directly be used within the Argilla UI to help you find the most relevant examples. Let's briefly introduce the the parameters we may find:

  • dataset_columns: The names of the columns in the dataset to be used for vectors and metadata. By default, it is set to None meaning the first 5 fields from input and output columns will be taken.
  • vector_strategy: The strategy used to generate the semantic search vectors. By default, it is set to True which initializes a standard SentenceTransformersExtractor() that computes vectors for all fields in the dataset using TaylorAI/bge-micro-v2. Alternatively, you can pass a SentenceTransformersExtractor by importing it from argilla.client.feedback.integrations.sentencetransformers.
  • metric_strategy: The strategy used to create the metrics. By default, it is set to True which initializes a standard TextDescriptivesExtractor() that computes metrics for all fields in the dataset using a default spaCy model. Alternatively, you can pass a TextDescriptivesExtractor by importing it from argilla.client.feedback.integrations.textdescriptives.
import argilla as rg
from argilla.client.feedback.integrations.sentencetransformers import (
    SentenceTransformersExtractor,
)
from argilla.client.feedback.integrations.textdescriptives import (
    TextDescriptivesExtractor,
)

rg.init(api_key="<YOUR_ARGILLA_API_KEY>", api_url="<YOUR_ARGILLA_API_URL>")

# with default `vector_strategy` and `metric_strategy`
rg_dataset = pipe_dataset.to_argilla()
rg_dataset.push_to_argilla(name="preference-dataset", workspace="admin")

# with a custom `vector_strategy``
vector_strategy = SentenceTransformersExtractor(model="TaylorAI/bge-micro-v2")
rg_dataset = pipe_dataset.to_argilla(vector_strategy=vector_strategy)

# with a custom `metric_strategy`
metric_strategy = TextDescriptivesExtractor(model="en_core_web_sm")
rg_dataset = pipe_dataset.to_argilla(metric_strategy=vector_strategy)

# without `vector_strategy` and `metric_strategy`
rg_dataset = pipe_dataset.to_argilla(metric_strategy=None, vector_strategy=None)

Prepare datasets for fine-tuning

The preference datasets generated by distilabel out of the box contain all the raw information generated by the Pipeline, but some processing is necessary in order to prepare the dataset for alignment or instruction fine-tuning, like for DPO (initially we only cover the case for DPO).

distilabel offers helper functions to prepare the CustomDataset for DPO. The current definition works for datasets labelled using PreferenceTask, and prepares them by binarizing the data. Go to the following section for an introduction to dataset binarization.

By default, the ties (rows for which the rating of the chosen and rejected responses are the same) are removed from the dataset, as that's expected for fine-tuning, but those can be kept in case it wants to be analysed. Take a look at dataset.utils.prepare_dataset for more information.

Binarization

from datasets import load_dataset
from distilabel.tasks import JudgeLMTask
from distilabel.utils import prepare_dataset

dataset = load_dataset("argilla/distilabel-intel-orca-dpo-pairs", split="train")
dataset.task = JudgeLMTask()
dataset_binarized_random = prepare_dataset(dataset, strategy="random", keep_ties=True)
# >>> len(dataset)
# 12859
# >>> len(dataset_binarized_random)
# 12817
dataset_binarized_random = prepare_dataset(dataset, strategy="random", keep_ties=False)
# >>> len(dataset_binarized_random)
# 8850
from datasets import load_dataset
from distilabel.tasks import JudgeLMTask
from distilabel.utils import prepare_dataset

dataset = load_dataset("argilla/distilabel-intel-orca-dpo-pairs", split="train")
dataset.task = JudgeLMTask()
dataset_binarized_random = prepare_dataset(dataset, strategy="worst", keep_ties=True)
# >>> len(dataset)
# 12859
# >>> len(dataset_binarized_random)
# 12817
dataset_binarized_random = prepare_dataset(dataset, strategy="worst", keep_ties=False)
# >>> len(dataset_binarized_random)
# 8850

What's binarization?

In the context of preference datasets (datasets for LLM instruction-tuning) one can come up with datasets formatted following the UltraFeedback format (the same format one obtains from a Pipeline that labels a dataset with a PreferenceTask), where for a given instruction we can have multiple completions according to one or more models, rated either by humans or other LLMs.

From distilabel, we would obtain from a labelling Pipeline a dataset with the following format:

input generations rating
Generate an approximately fifteen-word sentence that... [Midsummer House is a moderately..., Sure! Here's a sentence that...] [9.0, 7.0]

Where each columns represents the following:

  • input: Input for the LLM to generate text.

  • generations: List of generations from the LLM (maybe an LLMPool with different models).

  • rating: A list of the ratings for each of the generations obtained by an LLM using one of the PreferenceTasks, like JudgeLMTask or UltraFeedbackTask

This dataset format contains all the raw information, but in order to use it in the common frameworks, the expected format is usually a prompt, a chosen and a rejected response to align the model with those preferences.

We would want the following dataset format for fine-tuning:

prompt chosen rejected
Generate an approximately fifteen-word sentence that... [{'content': 'Generate an approximately...', 'role': 'user'}, {'content': 'Midsummer House is a moderately...', 'role': 'assistant'}] [{'content': 'Generate an approximately...', 'role': 'user'}, {'content': ' Sure! Here\'s a sentence that...', 'role': 'assistant'}]

Take a look at this explanation for the binarization of UltraFeedback done to train Notus-7B-v1.

What does each column represents.

  • prompt: Instruction given to the model.

  • chosen: Response chosen following the OpenAI format.

  • rejected: Response rejected following the OpenAI format.

We refer to the OpenAI's chat format for more information on the chosen/rejected format.

This dataset processing is called binarization. In the context of distilabel, this transformation (dataset prepartion) is done by dataset.utils.prepare_dataset, and given that the generated datasets contain additional information, one can also see the following additional columns:

prompt chosen rejected rating_chosen rating_rejected chosen_model rejected_model
Generate an approximately fifteen-word sentence that... [{'content': 'Generate an approximately...', 'role': 'user'}, {'content': 'Midsummer House is a moderately...', 'role': 'assistant'}] [{'content': 'Generate an approximately...', 'role': 'user'}, {'content': ' Sure! Here\'s a sentence that...', 'role': 'assistant'}] 9 7
  • rating_chosen: Rating of the chosen instruction.

  • rating_rejected: Rating of the rejected instruction.

  • chosen_model: (Optional, only returned if the dataset contains it, otherwise it's a null string like here) The model used to generate the chosen instruction.

  • rejected_model: (Optional, only returned if the dataset contains it, otherwise it's a null string like here) The model used to generate the rejected instruction.

  • chosen_rationale: (Optional, only returned if the dataset contains the rationale, otherwise it's not added) The rationale behind the rating of the chosen response.

  • rejected_rationale: (Optional, only returned if the dataset contains the rationale*, otherwise it's not added) The rationale behind the rating of the rejected response.

Need more information? Take a look at argilla/ultrafeedback-binarized-preferences to get an idea of how openbmb/UltraFeedback can be binarized to prepare it for DPO.

CustomDataset in HuggingFace hub

Since distilabel 0.5.0 when a CustomDataset is pushed to the HuggingFace Hub, the Task that comes with it will be pushed to the hub too. The CustomDataset.push_to_hub method will by default upload the Task to the huggingface hub:

dataset.push_to_hub("path/to/hub", split="train")
# ...
# 7:30:20 INFO     [PID: 52657] Pushing task to the hub...                                            dataset.py:247

On the other hand, this will be useful when downloading datasets from the hub which already have the Task to be downloaded (by default if there's a Task in the repository it will be downloaded, no error will appear if that's not possible). Just import the corresponding distilabel.dataset.load_dataset function instead of the datasets.load_dataset version.

from distilabel.dataset import load_dataset

dataset: "CustomDataset" = load_dataset("argilla/distilabel-sample-evol-instruct", split="train")
print(dataset.task)
# EvolInstructTask(system_prompt="", task_description=...)

A sample task can be seen here.