Examples¶
This section contains different example pipelines that showcase different tasks, maybe you can take inspiration from them.
llama.cpp with outlines
¶
Generate RPG characters following a pydantic.BaseModel
with outlines
in distilabel
.
See example
This script makes use of LlamaCppLLM
and the structured output capabilities thanks to outlines
to generate RPG characters that adhere to a JSON schema.
It makes use of a local model which can be downloaded using curl (explained in the script itself), and can be exchanged with other LLMs
like vLLM
.
# Copyright 2023-present, Argilla, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from pathlib import Path
from distilabel.llms import LlamaCppLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromDicts
from distilabel.steps.tasks import TextGeneration
from pydantic import BaseModel, StringConstraints, conint
from typing_extensions import Annotated
class Weapon(str, Enum):
sword = "sword"
axe = "axe"
mace = "mace"
spear = "spear"
bow = "bow"
crossbow = "crossbow"
class Armor(str, Enum):
leather = "leather"
chainmail = "chainmail"
plate = "plate"
mithril = "mithril"
class Character(BaseModel):
name: Annotated[str, StringConstraints(max_length=30)]
age: conint(gt=1, lt=3000)
armor: Armor
weapon: Weapon
# Download the model with
# curl -L -o ~/Downloads/openhermes-2.5-mistral-7b.Q4_K_M.gguf https://huggingface.co/TheBloke/OpenHermes-2.5-Mistral-7B-GGUF/resolve/main/openhermes-2.5-mistral-7b.Q4_K_M.gguf
model_path = "Downloads/openhermes-2.5-mistral-7b.Q4_K_M.gguf"
with Pipeline("RPG-characters") as pipeline:
system_prompt = (
"You are a leading role play gamer. You have seen thousands of different characters and their attributes."
" Please return a JSON object with common attributes of an RPG character."
)
load_dataset = LoadDataFromDicts(
name="load_instructions",
data=[
{
"system_prompt": system_prompt,
"instruction": f"Give me a character description for a {char}",
}
for char in ["dwarf", "elf", "human", "ork"]
],
)
llm = LlamaCppLLM(
model_path=str(Path.home() / model_path), # type: ignore
n_gpu_layers=-1,
n_ctx=1024,
structured_output={"format": "json", "schema": Character},
)
# Change to vLLM as such:
# llm = vLLM(
# model="teknium/OpenHermes-2.5-Mistral-7B",
# extra_kwargs={"tensor_parallel_size": 1},
# structured_output={"format": "json", "schema": Character},
# )
text_generation = TextGeneration(
name="text_generation_rpg",
llm=llm,
input_batch_size=8,
output_mappings={"model_name": "generation_model"},
)
load_dataset >> text_generation
if __name__ == "__main__":
distiset = pipeline.run(
parameters={
text_generation.name: {
"llm": {"generation_kwargs": {"max_new_tokens": 256}}
}
},
use_cache=False,
)
for num, character in enumerate(distiset["default"]["train"]["generation"]):
print(f"Character: {num}")
print(character)
# Character: 0
# {
# "name": "Gimli",
# "age": 42,
# "armor": "plate",
# "weapon": "axe" }
# Character: 1
# {"name":"Gaelen","age":600,"armor":"leather","weapon":"bow"}
# Character: 2
# {"name": "John Smith","age": 35,"armor": "leather","weapon": "sword"}
# Character: 3
# { "name": "Grug", "age": 35, "armor": "leather", "weapon": "axe"}
MistralAI with instructor
¶
Answer instructions with knowledge graphs defined as pydantic.BaseModel
objects using instructor
in distilabel
.
See example
This script makes use of MistralLLM
and the structured output capabilities thanks to instructor
to generate knowledge graphs from complex topics.
This example is translated from this awesome example from instructor
cookbook.
# Copyright 2023-present, Argilla, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from distilabel.llms import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromDicts
from distilabel.steps.tasks import TextGeneration
from pydantic import BaseModel, Field
class Node(BaseModel):
id: int
label: str
color: str
class Edge(BaseModel):
source: int
target: int
label: str
color: str = "black"
class KnowledgeGraph(BaseModel):
nodes: List[Node] = Field(..., default_factory=list)
edges: List[Edge] = Field(..., default_factory=list)
with Pipeline(
name="Knowledge-Graphs",
description=(
"Generate knowledge graphs to answer questions, this type of dataset can be used to "
"steer a model to answer questions with a knowledge graph."
),
) as pipeline:
sample_questions = [
"Teach me about quantum mechanics",
"Who is who in The Simpsons family?",
"Tell me about the evolution of programming languages",
]
load_dataset = LoadDataFromDicts(
name="load_instructions",
data=[
{
"system_prompt": "You are a knowledge graph expert generator. Help me understand by describing everything as a detailed knowledge graph.",
"instruction": f"{question}",
}
for question in sample_questions
],
)
text_generation = TextGeneration(
name="knowledge_graph_generation",
llm=MistralLLM(
model="open-mixtral-8x22b", structured_output={"schema": KnowledgeGraph}
),
input_batch_size=8,
output_mappings={"model_name": "generation_model"},
)
load_dataset >> text_generation
if __name__ == "__main__":
distiset = pipeline.run(
parameters={
text_generation.name: {
"llm": {"generation_kwargs": {"max_new_tokens": 2048}}
}
},
use_cache=False,
)
distiset.push_to_hub("distilabel-internal-testing/knowledge_graphs")
Visualizing the graphs
Want to see how to visualize the graphs? You can test it using the following script. Generate some samples on your own and take a look:
Note
This example uses graphviz to render the graph, you can install with pip
in the following way:
Benchmarking with distilabel
: Arena Hard¶
Benchmark LLMs with distilabel
: reproducing the Arena Hard benchmark.
See example
The script below first defines both the ArenaHard
and the ArenaHardResults
tasks, so as to generate responses for a given collection of prompts/questions with up to two LLMs, and then calculate the results as per the original implementation, respectively. Additionally, the second part of the example builds a Pipeline
to run the generation on top of the prompts with InferenceEndpointsLLM
while streaming the rest of the generations from a pre-computed set of GPT-4 generations, and then evaluate one against the other with OpenAILLM
generating an alternate response, a comparison between the responses, and a result as A>>B, A>B, B>A, B>>A, or tie.
To run this example you will first need to install the Arena Hard optional dependencies, being pandas
, scikit-learn
, and numpy
.
# Copyright 2023-present, Argilla, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from typing import Any, Dict, List, Optional, Union
from distilabel.steps import GlobalStep, StepInput
from distilabel.steps.tasks.base import Task
from distilabel.steps.tasks.typing import ChatType
from distilabel.steps.typing import StepOutput
from typing_extensions import override
class ArenaHard(Task):
"""Evaluates two assistant responses using an LLM as judge.
This `Task` is based on the "From Live Data to High-Quality Benchmarks: The
Arena-Hard Pipeline" paper that presents Arena Hard, which is a benchmark for
instruction-tuned LLMs that contains 500 challenging user queries. GPT-4 is used
as the judge to compare the model responses against a baseline model, which defaults
to `gpt-4-0314`.
Note:
Arena-Hard-Auto has the highest correlation and separability to Chatbot Arena
among popular open-ended LLM benchmarks.
Input columns:
- instruction (`str`): The instruction to evaluate the responses.
- generations (`List[str]`): The responses generated by two, and only two, LLMs.
Output columns:
- evaluation (`str`): The evaluation of the responses generated by the LLMs.
- score (`str`): The score extracted from the evaluation.
- model_name (`str`): The model name used to generate the evaluation.
Categories:
- benchmark
References:
- [From Live Data to High-Quality Benchmarks: The Arena-Hard Pipeline](https://lmsys.org/blog/2024-04-19-arena-hard/)
- [`arena-hard-auto`](https://github.com/lm-sys/arena-hard-auto/tree/main)
Examples:
Evaluate two assistant responses for a given instruction using Arean Hard prompts:
```python
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromDicts
from distilabel.steps.tasks import ArenaHard, TextGeneration
with Pipeline() as pipeline:
load_data = LoadDataFromDicts(
data=[{"instruction": "What is the capital of France?"}],
)
text_generation_a = TextGeneration(
llm=..., # LLM instance
output_mappings={"model_name": "generation_model"},
)
text_generation_b = TextGeneration(
llm=..., # LLM instance
output_mappings={"model_name": "generation_model"},
)
combine = GroupColumns(
columns=["generation", "generation_model"],
output_columns=["generations", "generation_models"],
)
arena_hard = ArenaHard(
llm=..., # LLM instance
)
load_data >> [text_generation_a, text_generation_b] >> combine >> arena_hard
```
"""
@property
def inputs(self) -> List[str]:
"""The inputs required by this task are the `instruction` and the `generations`,
which are the responses generated by two, and only two, LLMs."""
return ["instruction", "generations"]
def format_input(self, input: Dict[str, Any]) -> ChatType:
"""This method formats the input data as a `ChatType` using the prompt defined
by the Arena Hard benchmark, which consists on a `system_prompt` plus a template
for the user first message that contains the `instruction` and both `generations`.
"""
return [
{
"role": "system",
"content": "Please act as an impartial judge and evaluate the quality of the responses provided by two AI assistants to the user prompt displayed below. You will be given assistant A's answer and assistant B's answer. Your job is to evaluate which assistant's answer is better.\n\nBegin your evaluation by generating your own answer to the prompt. You must provide your answers before judging any answers.\n\nWhen evaluating the assistants' answers, compare both assistants' answers with your answer. You must identify and correct any mistakes or inaccurate information.\n\nThen consider if the assistant's answers are helpful, relevant, and concise. Helpful means the answer correctly responds to the prompt or follows the instructions. Note when user prompt has any ambiguity or more than one interpretation, it is more helpful and appropriate to ask for clarifications or more information from the user than providing an answer based on assumptions. Relevant means all parts of the response closely connect or are appropriate to what is being asked. Concise means the response is clear and not verbose or excessive.\n\nThen consider the creativity and novelty of the assistant's answers when needed. Finally, identify any missing important information in the assistants' answers that would be beneficial to include when responding to the user prompt.\n\nAfter providing your explanation, you must output only one of the following choices as your final verdict with a label:\n\n1. Assistant A is significantly better: [[A>>B]]\n2. Assistant A is slightly better: [[A>B]]\n3. Tie, relatively the same: [[A=B]]\n4. Assistant B is slightly better: [[B>A]]\n5. Assistant B is significantly better: [[B>>A]]\n\nExample output: \"My final verdict is tie: [[A=B]]\".",
},
{
"role": "user",
"content": f"<|User Prompt|>\n{input['instruction']}\n\n<|The Start of Assistant A's Answer|>\n{input['generations'][0]}\n<|The End of Assistant A's Answer|>\n\n<|The Start of Assistant B's Answer|>\n{input['generations'][1]}\n<|The End of Assistant B's Answer|>",
},
]
@property
def outputs(self) -> List[str]:
"""The outputs generated by this task are the `evaluation`, the `score` and
the `model_name` (which is automatically injected within the `process` method
of the parent task)."""
return ["evaluation", "score", "model_name"]
def format_output(
self,
output: Union[str, None],
input: Union[Dict[str, Any], None] = None,
) -> Dict[str, Any]:
"""This method formats the output generated by the LLM as a Python dictionary
containing the `evaluation` which is the raw output generated by the LLM (consisting
of the judge LLM alternate generation for the given instruction, plus an explanation
on the evaluation of the given responses; plus the `score` extracted from the output.
Args:
output: the raw output of the LLM.
input: the input to the task. Is provided in case it needs to be used to enrich
the output if needed.
Returns:
A dict with the keys `evaluation` with the raw output which contains the LLM
evaluation and the extracted `score` if possible.
"""
if output is None:
return {"evaluation": None, "score": None}
pattern = re.compile(r"\[\[([AB<>=]+)\]\]")
match = pattern.search(output)
if match is None:
return {"evaluation": output, "score": None}
return {"evaluation": output, "score": match.group(1)}
class ArenaHardResults(GlobalStep):
"""Process Arena Hard results to calculate the ELO scores.
This `Step` is based on the "From Live Data to High-Quality Benchmarks: The
Arena-Hard Pipeline" paper that presents Arena Hard, which is a benchmark for
instruction-tuned LLMs that contains 500 challenging user queries. This step is
a `GlobalStep` that should run right after the `ArenaHard` task to calculate the
ELO scores for the evaluated models.
Note:
Arena-Hard-Auto has the highest correlation and separability to Chatbot Arena
among popular open-ended LLM benchmarks.
Input columns:
- evaluation (`str`): The evaluation of the responses generated by the LLMs.
- score (`str`): The score extracted from the evaluation.
References:
- [From Live Data to High-Quality Benchmarks: The Arena-Hard Pipeline](https://lmsys.org/blog/2024-04-19-arena-hard/)
- [`arena-hard-auto`](https://github.com/lm-sys/arena-hard-auto/tree/main)
Examples:
Rate the ELO scores for two assistant responses for a given an evaluation / comparison between both using Arean Hard prompts:
```python
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromDicts
from distilabel.steps.tasks import ArenaHard, TextGeneration
with Pipeline() as pipeline:
load_data = LoadDataFromDicts(
data=[{"instruction": "What is the capital of France?"}],
)
text_generation_a = TextGeneration(
llm=..., # LLM instance
output_mappings={"model_name": "generation_model"},
)
text_generation_b = TextGeneration(
llm=..., # LLM instance
output_mappings={"model_name": "generation_model"},
)
combine = GroupColumns(
columns=["generation", "generation_model"],
output_columns=["generations", "generation_models"],
)
arena_hard = ArenaHard(
llm=..., # LLM instance
)
arena_hard_results = ArenaHardResults(
custom_model_column="generation_models",
custom_weights={"A>B": 1, "A>>B": 3, "B>A": 1, "B>>A": 3},
)
load_data >> [text_generation_a, text_generation_b] >> combine >> arena_hard >> arena_hard_results
```
"""
custom_model_column: Optional[str] = None
custom_weights: Dict[str, int] = {"A>B": 1, "A>>B": 3, "B>A": 1, "B>>A": 3}
def load(self) -> None:
"""Ensures that the required dependencies are installed."""
super().load()
try:
import numpy as np # noqa: F401
import pandas as pd # noqa: F401
from sklearn.linear_model import LogisticRegression # noqa: F401
except ImportError as e:
raise ImportError(
"In order to run `ArenaHardResults`, the `arena-hard` extra dependencies"
" must be installed i.e. `numpy`, `pandas`, and `scikit-learn`.\n"
"Please install the dependencies by running `pip install distilabel[arena-hard]`."
) from e
# TODO: the `evaluation` is not really required as an input, so it could be removed, since
# only `score` is used / required
@property
def inputs(self) -> List[str]:
"""The inputs required by this step are the `evaluation` and the `score` generated
by the `ArenaHard` task. Since this step does use the identifiers `model_a` and `model_b`,
optionally one can set `custom_model_column` to use the model names if existing within
the input data, ideally this value should be `model_name` if connected from the `ArenaHard`
step."""
columns = ["evaluation", "score"]
if self.custom_model_column:
columns.append(self.custom_model_column)
return columns
@override
def process(self, inputs: StepInput) -> StepOutput: # type: ignore
"""This method processes the inputs generated by the `ArenaHard` task to calculate the
win rates for each of the models to evaluate. Since this step inherits from the `GlobalStep`,
it will wait for all the input batches to be processed, and then the output will be yielded in
case there's a follow up step, since this step won't modify the received inputs.
Args:
inputs: A list of Python dictionaries with the inputs of the task.
Yields:
A list of Python dictionaries with the outputs of the task.
References:
- https://github.com/lm-sys/arena-hard-auto/blob/main/show_result.py
"""
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
models = ["A", "B"]
if self.custom_model_column:
models = inputs[0][self.custom_model_column]
# TODO: the battles are only calculated for the first game, even though the official
# implementation also covers the possibility of a second game (not within the released
# dataset yet)
battles = pd.DataFrame()
for input in inputs:
output = {
# TODO: "question_id": input["question_id"],
"model_a": models[0],
"model_b": models[1],
}
if input["score"] in ["A>B", "A>>B"]:
output["winner"] = models[0]
rows = [output] * self.custom_weights[input["score"]]
elif input["score"] in ["B>A", "B>>A"]:
output["winner"] = models[1]
rows = [output] * self.custom_weights[input["score"]]
elif input["score"] == "A=B":
output["winner"] = "tie"
rows = [output]
else:
continue
battles = pd.concat([battles, pd.DataFrame(rows)])
models = pd.concat([battles["model_a"], battles["model_b"]]).unique()
models = pd.Series(np.arange(len(models)), index=models)
battles = pd.concat([battles, battles], ignore_index=True)
p = len(models.index)
n = battles.shape[0]
X = np.zeros([n, p])
X[np.arange(n), models[battles["model_a"]]] = +np.log(10)
X[np.arange(n), models[battles["model_b"]]] = -np.log(10)
Y = np.zeros(n)
Y[battles["winner"] == "model_a"] = 1.0
tie_idx = battles["winner"] == "tie"
tie_idx[len(tie_idx) // 2 :] = False
Y[tie_idx] = 1.0
lr = LogisticRegression(fit_intercept=False, penalty=None, tol=1e-8) # type: ignore
lr.fit(X, Y)
# The ELO scores are calculated assuming that the reference is `gpt-4-0314`
# with an starting ELO of 1000, so that the evaluated models are compared with
# `gtp-4-0314` only if it's available within the models
elo_scores = 400 * lr.coef_[0] + 1000
# TODO: we could parametrize the reference / anchor model, but left as is to be faithful to the
# original implementation
if "gpt-4-0314" in models.index:
elo_scores += 1000 - elo_scores[models["gpt-4-0314"]]
output = pd.Series(elo_scores, index=models.index).sort_values(ascending=False)
self._logger.info(f"Arena Hard ELO: {output}")
# Here only so that if follow up steps are connected the inputs are preserved,
# since this step doesn't modify nor generate new inputs
yield inputs
if __name__ == "__main__":
import json
from distilabel.llms import InferenceEndpointsLLM, OpenAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
GroupColumns,
KeepColumns,
LoadDataFromHub,
StepInput,
step,
)
from distilabel.steps.tasks import TextGeneration
from distilabel.steps.typing import StepOutput
@step(inputs=["turns"], outputs=["system_prompt", "instruction"])
def PrepareForTextGeneration(*inputs: StepInput) -> StepOutput:
for input in inputs:
for item in input:
item["system_prompt"] = "You are a helpful assistant."
item["instruction"] = item["turns"][0]["content"]
yield input
@step(
inputs=["question_id"],
outputs=["generation", "generation_model"],
step_type="global",
)
def LoadReference(*inputs: StepInput) -> StepOutput:
# File downloaded from https://raw.githubusercontent.com/lm-sys/arena-hard-auto/e0a8ea1df42c1df76451a6cd04b14e31ff992b87/data/arena-hard-v0.1/model_answer/gpt-4-0314.jsonl
lines = open("gpt-4-0314.jsonl", mode="r").readlines()
for input in inputs:
for item in input:
for line in lines:
data = json.loads(line)
if data["question_id"] == item["question_id"]:
item["generation"] = data["choices"][0]["turns"][0]["content"]
item["generation_model"] = data["model_id"]
break
yield input
with Pipeline(name="arena-hard-v0.1") as pipeline:
load_dataset = LoadDataFromHub(
name="load_dataset",
repo_id="alvarobartt/lmsys-arena-hard-v0.1",
split="test",
num_examples=5,
)
load_reference = LoadReference(name="load_reference")
prepare = PrepareForTextGeneration(name="prepare")
text_generation_cohere = TextGeneration(
name="text_generation_cohere",
llm=InferenceEndpointsLLM(
model_id="CohereForAI/c4ai-command-r-plus",
tokenizer_id="CohereForAI/c4ai-command-r-plus",
),
use_system_prompt=True,
input_batch_size=10,
output_mappings={"model_name": "generation_model"},
)
combine_columns = GroupColumns(
name="combine_columns",
columns=["generation", "generation_model"],
output_columns=["generations", "generation_models"],
)
arena_hard = ArenaHard(
name="arena_hard",
llm=OpenAILLM(model="gpt-4-1106-preview"),
output_mappings={"model_name": "evaluation_model"},
)
keep_columns = KeepColumns(
name="keep_columns",
columns=[
"question_id",
"category",
"cluster",
"system_prompt",
"instruction",
"generations",
"generation_models",
"evaluation",
"score",
"evaluation_model",
],
)
win_rates = ArenaHardResults(
name="win_rates", custom_model_column="generation_models"
)
load_dataset >> load_reference # type: ignore
load_dataset >> prepare >> text_generation_cohere # type: ignore
( # type: ignore
[load_reference, text_generation_cohere]
>> combine_columns
>> arena_hard
>> keep_columns
>> win_rates
)
distiset = pipeline.run(
parameters={ # type: ignore
text_generation_cohere.name: {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 4096,
"stop_sequences": ["<EOS_TOKEN>", "<|END_OF_TURN_TOKEN|>"],
}
}
},
arena_hard.name: {
"llm": {
"generation_kwargs": {
"temperature": 0.0,
"max_new_tokens": 4096,
}
}
},
},
)
if distiset is not None:
distiset.push_to_hub("arena-hard-results")