Skip to content

Extra

steps

DBSCAN

Bases: GlobalStep

DBSCAN (Density-Based Spatial Clustering of Applications with Noise) finds core samples in regions of high density and expands clusters from them. This algorithm is good for data which contains clusters of similar density.

This is a GlobalStep that clusters the embeddings using the DBSCAN algorithm from sklearn. Visit TextClustering step for an example of use. The trained model is saved as an artifact when creating a distiset and pushing it to the Hugging Face Hub.

Input columns
  • projection (List[float]): Vector representation of the text to cluster, normally the output from the UMAP step.
Output columns
  • cluster_label (int): Integer representing the label of a given cluster. -1 means it wasn't clustered.
Categories
  • clustering
  • text-classification
References

Attributes:

Name Type Description
- eps

The maximum distance between two samples for one to be considered as in the neighborhood of the other. This is not a maximum bound on the distances of points within a cluster. This is the most important DBSCAN parameter to choose appropriately for your data set and distance function.

- min_samples

The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. This includes the point itself. If min_samples is set to a higher value, DBSCAN will find denser clusters, whereas if it is set to a lower value, the found clusters will be more sparse.

- metric

The metric to use when calculating distance between instances in a feature array. If metric is a string or callable, it must be one of the options allowed by sklearn.metrics.pairwise_distances for its metric parameter.

- n_jobs

The number of parallel jobs to run.

Runtime parameters
  • eps: The maximum distance between two samples for one to be considered as in the neighborhood of the other. This is not a maximum bound on the distances of points within a cluster. This is the most important DBSCAN parameter to choose appropriately for your data set and distance function.
  • min_samples: The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. This includes the point itself. If min_samples is set to a higher value, DBSCAN will find denser clusters, whereas if it is set to a lower value, the found clusters will be more sparse.
  • metric: The metric to use when calculating distance between instances in a feature array. If metric is a string or callable, it must be one of the options allowed by sklearn.metrics.pairwise_distances for its metric parameter.
  • n_jobs: The number of parallel jobs to run.
Source code in src/distilabel/steps/clustering/dbscan.py
class DBSCAN(GlobalStep):
    r"""DBSCAN (Density-Based Spatial Clustering of Applications with Noise) finds core
    samples in regions of high density and expands clusters from them. This algorithm
    is good for data which contains clusters of similar density.

    This is a `GlobalStep` that clusters the embeddings using the DBSCAN algorithm
    from `sklearn`. Visit `TextClustering` step for an example of use.
    The trained model is saved as an artifact when creating a distiset
    and pushing it to the Hugging Face Hub.

    Input columns:
        - projection (`List[float]`): Vector representation of the text to cluster,
            normally the output from the `UMAP` step.

    Output columns:
        - cluster_label (`int`): Integer representing the label of a given cluster. -1
            means it wasn't clustered.

    Categories:
        - clustering
        - text-classification

    References:
        - [`DBSCAN demo of sklearn`](https://scikit-learn.org/stable/auto_examples/cluster/plot_dbscan.html#demo-of-dbscan-clustering-algorithm)
        - [`sklearn dbscan`](https://scikit-learn.org/stable/modules/clustering.html#dbscan)

    Attributes:
        - eps: The maximum distance between two samples for one to be considered as in the
            neighborhood of the other. This is not a maximum bound on the distances of
            points within a cluster. This is the most important DBSCAN parameter to
            choose appropriately for your data set and distance function.
        - min_samples: The number of samples (or total weight) in a neighborhood for a point
            to be considered as a core point. This includes the point itself. If `min_samples`
            is set to a higher value, DBSCAN will find denser clusters, whereas if it is set
            to a lower value, the found clusters will be more sparse.
        - metric: The metric to use when calculating distance between instances in a feature
            array. If metric is a string or callable, it must be one of the options allowed
            by `sklearn.metrics.pairwise_distances` for its metric parameter.
        - n_jobs: The number of parallel jobs to run.

    Runtime parameters:
        - `eps`: The maximum distance between two samples for one to be considered as in the
            neighborhood of the other. This is not a maximum bound on the distances of
            points within a cluster. This is the most important DBSCAN parameter to
            choose appropriately for your data set and distance function.
        - `min_samples`: The number of samples (or total weight) in a neighborhood for a point
            to be considered as a core point. This includes the point itself. If `min_samples`
            is set to a higher value, DBSCAN will find denser clusters, whereas if it is set
            to a lower value, the found clusters will be more sparse.
        - `metric`: The metric to use when calculating distance between instances in a feature
            array. If metric is a string or callable, it must be one of the options allowed
            by `sklearn.metrics.pairwise_distances` for its metric parameter.
        - `n_jobs`: The number of parallel jobs to run.
    """

    eps: Optional[RuntimeParameter[float]] = Field(
        default=0.3,
        description=(
            "The maximum distance between two samples for one to be considered "
            "as in the neighborhood of the other. This is not a maximum bound "
            "on the distances of points within a cluster. This is the most "
            "important DBSCAN parameter to choose appropriately for your data set "
            "and distance function."
        ),
    )
    min_samples: Optional[RuntimeParameter[int]] = Field(
        default=30,
        description=(
            "The number of samples (or total weight) in a neighborhood for a point to "
            "be considered as a core point. This includes the point itself. If "
            "`min_samples` is set to a higher value, DBSCAN will find denser clusters, "
            "whereas if it is set to a lower value, the found clusters will be more "
            "sparse."
        ),
    )
    metric: Optional[RuntimeParameter[str]] = Field(
        default="euclidean",
        description=(
            "The metric to use when calculating distance between instances in a "
            "feature array. If metric is a string or callable, it must be one of "
            "the options allowed by `sklearn.metrics.pairwise_distances` for "
            "its metric parameter."
        ),
    )
    n_jobs: Optional[RuntimeParameter[int]] = Field(
        default=8, description="The number of parallel jobs to run."
    )

    _clusterer: Optional["_DBSCAN"] = PrivateAttr(None)

    def load(self) -> None:
        super().load()
        if importlib.util.find_spec("sklearn") is None:
            raise ImportError(
                "`sklearn` package is not installed. Please install it using `pip install scikit-learn`."
            )
        from sklearn.cluster import DBSCAN as _DBSCAN

        self._clusterer = _DBSCAN(
            eps=self.eps,
            min_samples=self.min_samples,
            metric=self.metric,
            n_jobs=self.n_jobs,
        )

    def unload(self) -> None:
        self._clusterer = None

    @property
    def inputs(self) -> List[str]:
        return ["projection"]

    @property
    def outputs(self) -> List[str]:
        return ["cluster_label"]

    def _save_model(self, model: Any) -> None:
        import joblib

        def save_model(path):
            with open(str(path / "DBSCAN.joblib"), "wb") as f:
                joblib.dump(model, f)

        self.save_artifact(
            name="DBSCAN_model",
            write_function=lambda path: save_model(path),
            metadata={
                "eps": self.eps,
                "min_samples": self.min_samples,
                "metric": self.metric,
            },
        )

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        projections = np.array([input["projection"] for input in inputs])

        self._logger.info("🏋️‍♀️ Start training DBSCAN...")
        fitted_clusterer = self._clusterer.fit(projections)
        cluster_labels = fitted_clusterer.labels_
        # Sets the cluster labels for each input, -1 means it wasn't clustered
        for input, cluster_label in zip(inputs, cluster_labels):
            input["cluster_label"] = cluster_label
        self._logger.info(f"DBSCAN labels assigned: {len(set(cluster_labels))}")
        self._save_model(fitted_clusterer)
        yield inputs

TextClustering

Bases: TextClassification, GlobalTask

Task that clusters a set of texts and generates summary labels for each cluster.

This is a GlobalTask that inherits from TextClassification, this means that all the attributes from that class are available here. Also, in this case we deal with all the inputs at once, instead of using batches. The input_batch_size is used here to send the examples to the LLM in batches (a subtle difference with the more common Task definitions). The task looks in each cluster for a given number of representative examples (the number is set by the samples_per_cluster attribute), and sends them to the LLM to get a label/s that represent the cluster. The labels are then assigned to each text in the cluster. The clusters and projections used in the step, are assumed to be obtained from the UMAP + DBSCAN steps, but could be generated for similar steps, as long as they represent the same concepts. This step runs a pipeline like the one in this repository: https://github.com/huggingface/text-clustering

Input columns
  • text (str): The reference text we want to obtain labels for.
  • projection (List[float]): Vector representation of the text to cluster, normally the output from the UMAP step.
  • cluster_label (int): Integer representing the label of a given cluster. -1 means it wasn't clustered.
Output columns
  • summary_label (str): The label or list of labels for the text.
  • model_name (str): The name of the model used to generate the label/s.
Categories
  • clustering
  • text-classification
References

Attributes:

Name Type Description
- savefig

Whether to generate and save a figure with the clustering of the texts.

- samples_per_cluster

The number of examples to use in the LLM as a sample of the cluster.

Examples:

Generate labels for a set of texts using clustering:

from distilabel.llms import InferenceEndpointsLLM
from distilabel.steps import UMAP, DBSCAN, TextClustering
from distilabel.pipeline import Pipeline

ds_name = "argilla-warehouse/personahub-fineweb-edu-4-clustering-100k"

with Pipeline(name="Text clustering dataset") as pipeline:
    batch_size = 500

    ds = load_dataset(ds_name, split="train").select(range(10000))
    loader = make_generator_step(ds, batch_size=batch_size, repo_id=ds_name)

    umap = UMAP(n_components=2, metric="cosine")
    dbscan = DBSCAN(eps=0.3, min_samples=30)

    text_clustering = TextClustering(
        llm=InferenceEndpointsLLM(
            model_id="meta-llama/Meta-Llama-3.1-70B-Instruct",
            tokenizer_id="meta-llama/Meta-Llama-3.1-70B-Instruct",
        ),
        n=3,  # 3 labels per example
        query_title="Examples of Personas",
        samples_per_cluster=10,
        context=(
            "Describe the main themes, topics, or categories that could describe the "
            "following types of personas. All the examples of personas must share "
            "the same set of labels."
        ),
        default_label="None",
        savefig=True,
        input_batch_size=8,
        input_mappings={"text": "persona"},
        use_default_structured_output=True,
    )

    loader >> umap >> dbscan >> text_clustering
Source code in src/distilabel/steps/clustering/text_clustering.py
class TextClustering(TextClassification, GlobalTask):
    """Task that clusters a set of texts and generates summary labels for each cluster.

    This is a `GlobalTask` that inherits from `TextClassification`, this means that all
    the attributes from that class are available here. Also, in this case we deal
    with all the inputs at once, instead of using batches. The `input_batch_size` is
    used here to send the examples to the LLM in batches (a subtle difference with the
    more common `Task` definitions).
    The task looks in each cluster for a given number of representative examples (the number
    is set by the `samples_per_cluster` attribute), and sends them to the LLM to get a label/s
    that represent the cluster. The labels are then assigned to each text in the cluster.
    The clusters and projections used in the step, are assumed to be obtained from the `UMAP`
    + `DBSCAN` steps, but could be generated for similar steps, as long as they represent the
    same concepts.
    This step runs a pipeline like the one in this repository:
    https://github.com/huggingface/text-clustering

    Input columns:
        - text (`str`): The reference text we want to obtain labels for.
        - projection (`List[float]`): Vector representation of the text to cluster,
            normally the output from the `UMAP` step.
        - cluster_label (`int`): Integer representing the label of a given cluster. -1
            means it wasn't clustered.

    Output columns:
        - summary_label (`str`): The label or list of labels for the text.
        - model_name (`str`): The name of the model used to generate the label/s.

    Categories:
        - clustering
        - text-classification

    References:
        - [`text-clustering repository`](https://github.com/huggingface/text-clustering)

    Attributes:
        - savefig: Whether to generate and save a figure with the clustering of the texts.
        - samples_per_cluster: The number of examples to use in the LLM as a sample of the cluster.

    Examples:
        Generate labels for a set of texts using clustering:

        ```python
        from distilabel.llms import InferenceEndpointsLLM
        from distilabel.steps import UMAP, DBSCAN, TextClustering
        from distilabel.pipeline import Pipeline

        ds_name = "argilla-warehouse/personahub-fineweb-edu-4-clustering-100k"

        with Pipeline(name="Text clustering dataset") as pipeline:
            batch_size = 500

            ds = load_dataset(ds_name, split="train").select(range(10000))
            loader = make_generator_step(ds, batch_size=batch_size, repo_id=ds_name)

            umap = UMAP(n_components=2, metric="cosine")
            dbscan = DBSCAN(eps=0.3, min_samples=30)

            text_clustering = TextClustering(
                llm=InferenceEndpointsLLM(
                    model_id="meta-llama/Meta-Llama-3.1-70B-Instruct",
                    tokenizer_id="meta-llama/Meta-Llama-3.1-70B-Instruct",
                ),
                n=3,  # 3 labels per example
                query_title="Examples of Personas",
                samples_per_cluster=10,
                context=(
                    "Describe the main themes, topics, or categories that could describe the "
                    "following types of personas. All the examples of personas must share "
                    "the same set of labels."
                ),
                default_label="None",
                savefig=True,
                input_batch_size=8,
                input_mappings={"text": "persona"},
                use_default_structured_output=True,
            )

            loader >> umap >> dbscan >> text_clustering
        ```
    """

    savefig: Optional[RuntimeParameter[bool]] = Field(
        default=True,
        description="Whether to generate and save a figure with the clustering of the texts.",
    )
    samples_per_cluster: int = Field(
        default=10,
        description="The number of examples to use in the LLM as a sample of the cluster.",
    )

    @property
    def inputs(self) -> List[str]:
        """The input for the task are the same as those for `TextClassification` plus
        the `projection` and `cluster_label` columns (which can be obtained from
        UMAP + DBSCAN steps).
        """
        return super().inputs + ["projection", "cluster_label"]

    @property
    def outputs(self) -> List[str]:
        """The output for the task is the `summary_label` and the `model_name`."""
        return ["summary_label", "model_name"]

    def load(self) -> None:
        super().load()
        if self.savefig and (importlib.util.find_spec("matplotlib") is None):
            raise ImportError(
                "`matplotlib` package is not installed. Please install it using `pip install matplotlib`."
            )

    def _save_figure(
        self,
        data: pd.DataFrame,
        cluster_centers: Dict[str, Tuple[float, float]],
        cluster_summaries: Dict[int, str],
    ) -> None:
        """Saves the figure starting from the dataframe, using matplotlib.

        Args:
            data: pd.DataFrame with the columns 'X', 'Y' and 'labels' representing
                the projections and the label of each text respectively.
            cluster_centers: Dictionary mapping from each label the center of a cluster,
                to help with the placement of the annotations.
            cluster_summaries: The summaries of the clusters, obtained from the LLM.
        """
        import matplotlib.pyplot as plt

        fig, ax = plt.subplots(figsize=(12, 8), dpi=300)
        unique_labels = data["labels"].unique()
        # Map of colors for each label (-1 is black)
        colormap = dict(
            zip(unique_labels, plt.cm.Spectral(np.linspace(0, 1, len(unique_labels))))
        )
        colormap[-1] = np.array([0, 0, 0, 0])
        data["color"] = data["labels"].map(colormap)

        data.plot(
            kind="scatter",
            x="X",
            y="Y",
            c="color",
            s=0.75,
            alpha=0.8,
            linewidth=0.4,
            ax=ax,
            colorbar=False,
        )

        for label in cluster_summaries.keys():
            if label == -1:
                continue
            summary = str(cluster_summaries[label])  # These are obtained from the LLM
            position = cluster_centers[label]
            t = ax.text(
                position[0],
                position[1],
                summary,
                horizontalalignment="center",
                verticalalignment="center",
                fontsize=4,
            )
            t.set_bbox(
                {
                    "facecolor": "white",
                    "alpha": 0.9,
                    "linewidth": 0,
                    "boxstyle": "square,pad=0.1",
                }
            )

        ax.set_axis_off()
        # Save the plot as an artifact of the step
        self.save_artifact(
            name="Text clusters",
            write_function=lambda path: fig.savefig(path / "figure_clustering.png"),
            metadata={"type": "image", "library": "matplotlib"},
        )
        plt.close()

    def _create_figure(
        self,
        inputs: StepInput,
        label2docs: Dict[int, List[str]],
        cluster_summaries: Dict[int, str],
    ) -> None:
        """Creates a figure of the clustered texts and save it as an artifact.

        Args:
            inputs: The inputs of the step, as we will extract information from them again.
            label2docs: Map from each label to the list of documents (texts) that belong to that cluster.
            cluster_summaries: The summaries of the clusters, obtained from the LLM.
            labels: The labels of the clusters (integers representing each predicted class).
        """
        self._logger.info("🖼️ Creating figure for the clusters...")

        labels = []
        projections = []
        id2cluster = {}
        for i, input in enumerate(inputs):
            label = input["cluster_label"]
            id2cluster[i] = label
            labels.append(label)
            projections.append(input["projection"])

        projections = np.array(projections)

        # Contains the placement of the cluster centers in the figure
        cluster_centers: Dict[str, Tuple[float, float]] = {}
        for label in label2docs.keys():
            x = np.mean([projections[doc, 0] for doc in label2docs[label]])
            y = np.mean([projections[doc, 1] for doc in label2docs[label]])
            cluster_centers[label] = (x, y)

        df = pd.DataFrame(
            data={
                "X": projections[:, 0],
                "Y": projections[:, 1],
                "labels": labels,
            }
        )

        self._save_figure(
            df, cluster_centers=cluster_centers, cluster_summaries=cluster_summaries
        )

    def _prepare_input_texts(
        self,
        inputs: StepInput,
        label2docs: Dict[int, List[int]],
        unique_labels: List[int],
    ) -> List[Dict[str, Union[str, int]]]:
        """Prepares a batch of inputs to send to the LLM, with the examples of each cluster.

        Args:
            inputs: Inputs from the step.
            label2docs: Map from each label to the list of documents (texts) that
                belong to that cluster.
            unique_labels: The unique labels of the clusters.

        Returns:
            The input texts to send to the LLM, with the examples of each cluster
            prepared to be used in the prompt, and an additional key to store the
            labels (that will be needed to find the data after the batches are
            returned from the LLM).
        """
        input_texts = []
        for label in range(unique_labels):  # The label -1 is implicitly excluded
            # Get the ids but remove possible duplicates, which could happen with bigger probability
            # the bigger the number of examples requested, and the smaller the subset of examples
            ids = set(
                np.random.choice(label2docs[label], size=self.samples_per_cluster)
            )  # Grab the number of examples
            examples = [inputs[i]["text"] for i in ids]
            input_text = {
                "text": "\n\n".join(
                    [f"Example {i}:\n{t}" for i, t in enumerate(examples, start=1)]
                ),
                "__LABEL": label,
            }
            input_texts.append(input_text)
        return input_texts

    def process(self, inputs: StepInput) -> "StepOutput":
        labels = [input["cluster_label"] for input in inputs]
        # -1 because -1 is the label for the unclassified
        unique_labels = len(set(labels)) - 1
        # This will be the output of the LLM, the set of labels for each cluster
        cluster_summaries: Dict[int, str] = {-1: self.default_label}

        # Map from label to list of documents, will use them to select examples from each cluster
        label2docs = defaultdict(list)
        for i, label in enumerate(labels):
            label2docs[label].append(i)

        input_texts = self._prepare_input_texts(inputs, label2docs, unique_labels)

        # Send the texts in batches to the LLM, and get the labels for each cluster
        for i, batched_inputs in enumerate(batched(input_texts, self.input_batch_size)):
            self._logger.info(f"📦 Processing internal batch of inputs {i}...")
            results = super().process(batched_inputs)
            for result in next(results):  # Extract the elements from the generator
                cluster_summaries[result["__LABEL"]] = result["labels"]

        # Assign the labels to each text
        for input in inputs:
            input["summary_label"] = json.dumps(
                cluster_summaries[input["cluster_label"]]
            )

        if self.savefig:
            self._create_figure(inputs, label2docs, cluster_summaries)

        yield inputs
inputs: List[str] property

The input for the task are the same as those for TextClassification plus the projection and cluster_label columns (which can be obtained from UMAP + DBSCAN steps).

outputs: List[str] property

The output for the task is the summary_label and the model_name.

_save_figure(data, cluster_centers, cluster_summaries)

Saves the figure starting from the dataframe, using matplotlib.

Parameters:

Name Type Description Default
data DataFrame

pd.DataFrame with the columns 'X', 'Y' and 'labels' representing the projections and the label of each text respectively.

required
cluster_centers Dict[str, Tuple[float, float]]

Dictionary mapping from each label the center of a cluster, to help with the placement of the annotations.

required
cluster_summaries Dict[int, str]

The summaries of the clusters, obtained from the LLM.

required
Source code in src/distilabel/steps/clustering/text_clustering.py
def _save_figure(
    self,
    data: pd.DataFrame,
    cluster_centers: Dict[str, Tuple[float, float]],
    cluster_summaries: Dict[int, str],
) -> None:
    """Saves the figure starting from the dataframe, using matplotlib.

    Args:
        data: pd.DataFrame with the columns 'X', 'Y' and 'labels' representing
            the projections and the label of each text respectively.
        cluster_centers: Dictionary mapping from each label the center of a cluster,
            to help with the placement of the annotations.
        cluster_summaries: The summaries of the clusters, obtained from the LLM.
    """
    import matplotlib.pyplot as plt

    fig, ax = plt.subplots(figsize=(12, 8), dpi=300)
    unique_labels = data["labels"].unique()
    # Map of colors for each label (-1 is black)
    colormap = dict(
        zip(unique_labels, plt.cm.Spectral(np.linspace(0, 1, len(unique_labels))))
    )
    colormap[-1] = np.array([0, 0, 0, 0])
    data["color"] = data["labels"].map(colormap)

    data.plot(
        kind="scatter",
        x="X",
        y="Y",
        c="color",
        s=0.75,
        alpha=0.8,
        linewidth=0.4,
        ax=ax,
        colorbar=False,
    )

    for label in cluster_summaries.keys():
        if label == -1:
            continue
        summary = str(cluster_summaries[label])  # These are obtained from the LLM
        position = cluster_centers[label]
        t = ax.text(
            position[0],
            position[1],
            summary,
            horizontalalignment="center",
            verticalalignment="center",
            fontsize=4,
        )
        t.set_bbox(
            {
                "facecolor": "white",
                "alpha": 0.9,
                "linewidth": 0,
                "boxstyle": "square,pad=0.1",
            }
        )

    ax.set_axis_off()
    # Save the plot as an artifact of the step
    self.save_artifact(
        name="Text clusters",
        write_function=lambda path: fig.savefig(path / "figure_clustering.png"),
        metadata={"type": "image", "library": "matplotlib"},
    )
    plt.close()
_create_figure(inputs, label2docs, cluster_summaries)

Creates a figure of the clustered texts and save it as an artifact.

Parameters:

Name Type Description Default
inputs StepInput

The inputs of the step, as we will extract information from them again.

required
label2docs Dict[int, List[str]]

Map from each label to the list of documents (texts) that belong to that cluster.

required
cluster_summaries Dict[int, str]

The summaries of the clusters, obtained from the LLM.

required
labels

The labels of the clusters (integers representing each predicted class).

required
Source code in src/distilabel/steps/clustering/text_clustering.py
def _create_figure(
    self,
    inputs: StepInput,
    label2docs: Dict[int, List[str]],
    cluster_summaries: Dict[int, str],
) -> None:
    """Creates a figure of the clustered texts and save it as an artifact.

    Args:
        inputs: The inputs of the step, as we will extract information from them again.
        label2docs: Map from each label to the list of documents (texts) that belong to that cluster.
        cluster_summaries: The summaries of the clusters, obtained from the LLM.
        labels: The labels of the clusters (integers representing each predicted class).
    """
    self._logger.info("🖼️ Creating figure for the clusters...")

    labels = []
    projections = []
    id2cluster = {}
    for i, input in enumerate(inputs):
        label = input["cluster_label"]
        id2cluster[i] = label
        labels.append(label)
        projections.append(input["projection"])

    projections = np.array(projections)

    # Contains the placement of the cluster centers in the figure
    cluster_centers: Dict[str, Tuple[float, float]] = {}
    for label in label2docs.keys():
        x = np.mean([projections[doc, 0] for doc in label2docs[label]])
        y = np.mean([projections[doc, 1] for doc in label2docs[label]])
        cluster_centers[label] = (x, y)

    df = pd.DataFrame(
        data={
            "X": projections[:, 0],
            "Y": projections[:, 1],
            "labels": labels,
        }
    )

    self._save_figure(
        df, cluster_centers=cluster_centers, cluster_summaries=cluster_summaries
    )
_prepare_input_texts(inputs, label2docs, unique_labels)

Prepares a batch of inputs to send to the LLM, with the examples of each cluster.

Parameters:

Name Type Description Default
inputs StepInput

Inputs from the step.

required
label2docs Dict[int, List[int]]

Map from each label to the list of documents (texts) that belong to that cluster.

required
unique_labels List[int]

The unique labels of the clusters.

required

Returns:

Type Description
List[Dict[str, Union[str, int]]]

The input texts to send to the LLM, with the examples of each cluster

List[Dict[str, Union[str, int]]]

prepared to be used in the prompt, and an additional key to store the

List[Dict[str, Union[str, int]]]

labels (that will be needed to find the data after the batches are

List[Dict[str, Union[str, int]]]

returned from the LLM).

Source code in src/distilabel/steps/clustering/text_clustering.py
def _prepare_input_texts(
    self,
    inputs: StepInput,
    label2docs: Dict[int, List[int]],
    unique_labels: List[int],
) -> List[Dict[str, Union[str, int]]]:
    """Prepares a batch of inputs to send to the LLM, with the examples of each cluster.

    Args:
        inputs: Inputs from the step.
        label2docs: Map from each label to the list of documents (texts) that
            belong to that cluster.
        unique_labels: The unique labels of the clusters.

    Returns:
        The input texts to send to the LLM, with the examples of each cluster
        prepared to be used in the prompt, and an additional key to store the
        labels (that will be needed to find the data after the batches are
        returned from the LLM).
    """
    input_texts = []
    for label in range(unique_labels):  # The label -1 is implicitly excluded
        # Get the ids but remove possible duplicates, which could happen with bigger probability
        # the bigger the number of examples requested, and the smaller the subset of examples
        ids = set(
            np.random.choice(label2docs[label], size=self.samples_per_cluster)
        )  # Grab the number of examples
        examples = [inputs[i]["text"] for i in ids]
        input_text = {
            "text": "\n\n".join(
                [f"Example {i}:\n{t}" for i, t in enumerate(examples, start=1)]
            ),
            "__LABEL": label,
        }
        input_texts.append(input_text)
    return input_texts

UMAP

Bases: GlobalStep

UMAP is a general purpose manifold learning and dimension reduction algorithm.

This is a GlobalStep that reduces the dimensionality of the embeddings using. Visit the TextClustering step for an example of use. The trained model is saved as an artifact when creating a distiset and pushing it to the Hugging Face Hub.

Input columns
  • embedding (List[float]): The original embeddings we want to reduce the dimension.
Output columns
  • projection (List[float]): Embedding reduced to the number of components specified, the size of the new embeddings will be determined by the n_components.
Categories
  • clustering
  • text-classification
References

Attributes:

Name Type Description
- n_components

The dimension of the space to embed into. This defaults to 2 to provide easy visualization (that's probably what you want), but can reasonably be set to any integer value in the range 2 to 100.

- metric

The metric to use to compute distances in high dimensional space. Visit UMAP's documentation for more information. Defaults to euclidean.

- n_jobs

The number of parallel jobs to run. Defaults to 8.

- random_state

The random state to use for the UMAP algorithm.

Runtime parameters
  • n_components: The dimension of the space to embed into. This defaults to 2 to provide easy visualization (that's probably what you want), but can reasonably be set to any integer value in the range 2 to 100.
  • metric: The metric to use to compute distances in high dimensional space. Visit UMAP's documentation for more information. Defaults to euclidean.
  • n_jobs: The number of parallel jobs to run. Defaults to 8.
  • random_state: The random state to use for the UMAP algorithm.
Citations
@misc{mcinnes2020umapuniformmanifoldapproximation,
    title={UMAP: Uniform Manifold Approximation and Projection for Dimension Reduction},
    author={Leland McInnes and John Healy and James Melville},
    year={2020},
    eprint={1802.03426},
    archivePrefix={arXiv},
    primaryClass={stat.ML},
    url={https://arxiv.org/abs/1802.03426},
}
Source code in src/distilabel/steps/clustering/umap.py
class UMAP(GlobalStep):
    r"""UMAP is a general purpose manifold learning and dimension reduction algorithm.

    This is a `GlobalStep` that reduces the dimensionality of the embeddings using. Visit
    the `TextClustering` step for an example of use. The trained model is saved as an artifact
    when creating a distiset and pushing it to the Hugging Face Hub.

    Input columns:
        - embedding (`List[float]`): The original embeddings we want to reduce the dimension.

    Output columns:
        - projection (`List[float]`): Embedding reduced to the number of components specified,
            the size of the new embeddings will be determined by the `n_components`.

    Categories:
        - clustering
        - text-classification

    References:
        - [`UMAP repository`](https://github.com/lmcinnes/umap/tree/master)
        - [`UMAP documentation`](https://umap-learn.readthedocs.io/en/latest/)

    Attributes:
        - n_components: The dimension of the space to embed into. This defaults to 2 to
            provide easy visualization (that's probably what you want), but can
            reasonably be set to any integer value in the range 2 to 100.
        - metric: The metric to use to compute distances in high dimensional space.
            Visit UMAP's documentation for more information. Defaults to `euclidean`.
        - n_jobs: The number of parallel jobs to run. Defaults to `8`.
        - random_state: The random state to use for the UMAP algorithm.

    Runtime parameters:
        - `n_components`: The dimension of the space to embed into. This defaults to 2 to
            provide easy visualization (that's probably what you want), but can
            reasonably be set to any integer value in the range 2 to 100.
        - `metric`: The metric to use to compute distances in high dimensional space.
            Visit UMAP's documentation for more information. Defaults to `euclidean`.
        - `n_jobs`: The number of parallel jobs to run. Defaults to `8`.
        - `random_state`: The random state to use for the UMAP algorithm.

    Citations:
        ```
        @misc{mcinnes2020umapuniformmanifoldapproximation,
            title={UMAP: Uniform Manifold Approximation and Projection for Dimension Reduction},
            author={Leland McInnes and John Healy and James Melville},
            year={2020},
            eprint={1802.03426},
            archivePrefix={arXiv},
            primaryClass={stat.ML},
            url={https://arxiv.org/abs/1802.03426},
        }
        ```
    """

    n_components: Optional[RuntimeParameter[int]] = Field(
        default=2,
        description=(
            "The dimension of the space to embed into. This defaults to 2 to "
            "provide easy visualization, but can reasonably be set to any "
            "integer value in the range 2 to 100."
        ),
    )
    metric: Optional[RuntimeParameter[str]] = Field(
        default="euclidean",
        description=(
            "The metric to use to compute distances in high dimensional space. "
            "Visit UMAP's documentation for more information."
        ),
    )
    n_jobs: Optional[RuntimeParameter[int]] = Field(
        default=8, description="The number of parallel jobs to run."
    )
    random_state: Optional[RuntimeParameter[int]] = Field(
        default=None, description="The random state to use for the UMAP algorithm."
    )

    _umap: Optional["_UMAP"] = PrivateAttr(None)

    def load(self) -> None:
        super().load()
        if importlib.util.find_spec("umap") is None:
            raise ImportError(
                "`umap` package is not installed. Please install it using `pip install umap-learn`."
            )
        from umap import UMAP as _UMAP

        self._umap = _UMAP(
            n_components=self.n_components,
            metric=self.metric,
            n_jobs=self.n_jobs,
            random_state=self.random_state,
        )

    def unload(self) -> None:
        self._umap = None

    @property
    def inputs(self) -> List[str]:
        return ["embedding"]

    @property
    def outputs(self) -> List[str]:
        return ["projection"]

    def _save_model(self, model: Any) -> None:
        import joblib

        def save_model(path):
            with open(str(path / "UMAP.joblib"), "wb") as f:
                joblib.dump(model, f)

        self.save_artifact(
            name="UMAP_model",
            write_function=lambda path: save_model(path),
            metadata={
                "n_components": self.n_components,
                "metric": self.metric,
            },
        )

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        # Shape of the embeddings is (n_samples, n_features)
        embeddings = np.array([input["embedding"] for input in inputs])

        self._logger.info("🏋️‍♀️ Start UMAP training...")
        mapper = self._umap.fit(embeddings)
        # Shape of the projection will be (n_samples, n_components)
        for input, projection in zip(inputs, mapper.embedding_):
            input["projection"] = projection

        self._save_model(mapper)
        yield inputs

CombineOutputs

Bases: Step

Combine the outputs of several upstream steps.

CombineOutputs is a Step that takes the outputs of several upstream steps and combines them to generate a new dictionary with all keys/columns of the upstream steps outputs.

Input columns
  • dynamic (based on the upstream Steps): All the columns of the upstream steps outputs.
Output columns
  • dynamic (based on the upstream Steps): All the columns of the upstream steps outputs.
Categories
  • columns

Examples:

Combine dictionaries of a dataset:

```python
from distilabel.steps import CombineOutputs

combine_outputs = CombineOutputs()
combine_outputs.load()

result = next(
    combine_outputs.process(
        [{"a": 1, "b": 2}, {"a": 3, "b": 4}],
        [{"c": 5, "d": 6}, {"c": 7, "d": 8}],
    )
)
# [
#   {"a": 1, "b": 2, "c": 5, "d": 6},
#   {"a": 3, "b": 4, "c": 7, "d": 8},
# ]
```

Combine upstream steps outputs in a pipeline:

```python
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineOutputs

with Pipeline() as pipeline:
    step_1 = ...
    step_2 = ...
    step_3 = ...
    combine = CombineOutputs()

    [step_1, step_2, step_3] >> combine
```
Source code in src/distilabel/steps/columns/combine.py
class CombineOutputs(Step):
    """Combine the outputs of several upstream steps.

    `CombineOutputs` is a `Step` that takes the outputs of several upstream steps and combines
    them to generate a new dictionary with all keys/columns of the upstream steps outputs.

    Input columns:
        - dynamic (based on the upstream `Step`s): All the columns of the upstream steps outputs.

    Output columns:
        - dynamic (based on the upstream `Step`s): All the columns of the upstream steps outputs.

    Categories:
        - columns

    Examples:

        Combine dictionaries of a dataset:

        ```python
        from distilabel.steps import CombineOutputs

        combine_outputs = CombineOutputs()
        combine_outputs.load()

        result = next(
            combine_outputs.process(
                [{"a": 1, "b": 2}, {"a": 3, "b": 4}],
                [{"c": 5, "d": 6}, {"c": 7, "d": 8}],
            )
        )
        # [
        #   {"a": 1, "b": 2, "c": 5, "d": 6},
        #   {"a": 3, "b": 4, "c": 7, "d": 8},
        # ]
        ```

        Combine upstream steps outputs in a pipeline:

        ```python
        from distilabel.pipeline import Pipeline
        from distilabel.steps import CombineOutputs

        with Pipeline() as pipeline:
            step_1 = ...
            step_2 = ...
            step_3 = ...
            combine = CombineOutputs()

            [step_1, step_2, step_3] >> combine
        ```
    """

    def process(self, *inputs: StepInput) -> "StepOutput":
        combined_outputs = []
        for output_dicts in zip(*inputs):
            combined_dict = {}
            for output_dict in output_dicts:
                combined_dict.update(
                    {
                        k: v
                        for k, v in output_dict.items()
                        if k != DISTILABEL_METADATA_KEY
                    }
                )

            if any(
                DISTILABEL_METADATA_KEY in output_dict for output_dict in output_dicts
            ):
                combined_dict[DISTILABEL_METADATA_KEY] = merge_distilabel_metadata(
                    *output_dicts
                )
            combined_outputs.append(combined_dict)

        yield combined_outputs

DeitaFiltering

Bases: GlobalStep

Filter dataset rows using DEITA filtering strategy.

Filter the dataset based on the DEITA score and the cosine distance between the embeddings. It's an implementation of the filtering step from the paper 'What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning'.

Attributes:

Name Type Description
data_budget RuntimeParameter[int]

The desired size of the dataset after filtering.

diversity_threshold RuntimeParameter[float]

If a row has a cosine distance with respect to it's nearest neighbor greater than this value, it will be included in the filtered dataset. Defaults to 0.9.

normalize_embeddings RuntimeParameter[bool]

Whether to normalize the embeddings before computing the cosine distance. Defaults to True.

Runtime parameters
  • data_budget: The desired size of the dataset after filtering.
  • diversity_threshold: If a row has a cosine distance with respect to it's nearest neighbor greater than this value, it will be included in the filtered dataset.
Input columns
  • evol_instruction_score (float): The score of the instruction generated by ComplexityScorer step.
  • evol_response_score (float): The score of the response generated by QualityScorer step.
  • embedding (List[float]): The embedding generated for the conversation of the instruction-response pair using GenerateEmbeddings step.
Output columns
  • deita_score (float): The DEITA score for the instruction-response pair.
  • deita_score_computed_with (List[str]): The scores used to compute the DEITA score.
  • nearest_neighbor_distance (float): The cosine distance between the embeddings of the instruction-response pair.
Categories
  • filtering
References

Examples:

Filter the dataset based on the DEITA score and the cosine distance between the embeddings:

from distilabel.steps import DeitaFiltering

deita_filtering = DeitaFiltering(data_budget=1)

deita_filtering.load()

result = next(
    deita_filtering.process(
        [
            {
                "evol_instruction_score": 0.5,
                "evol_response_score": 0.5,
                "embedding": [-8.12729941, -5.24642847, -6.34003029],
            },
            {
                "evol_instruction_score": 0.6,
                "evol_response_score": 0.6,
                "embedding": [2.99329242, 0.7800932, 0.7799726],
            },
            {
                "evol_instruction_score": 0.7,
                "evol_response_score": 0.7,
                "embedding": [10.29041806, 14.33088073, 13.00557506],
            },
        ],
    )
)
# >>> result
# [{'evol_instruction_score': 0.5, 'evol_response_score': 0.5, 'embedding': [-8.12729941, -5.24642847, -6.34003029], 'deita_score': 0.25, 'deita_score_computed_with': ['evol_instruction_score', 'evol_response_score'], 'nearest_neighbor_distance': 1.9042812683723933}]
Citations
@misc{liu2024makesgooddataalignment,
    title={What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning},
    author={Wei Liu and Weihao Zeng and Keqing He and Yong Jiang and Junxian He},
    year={2024},
    eprint={2312.15685},
    archivePrefix={arXiv},
    primaryClass={cs.CL},
    url={https://arxiv.org/abs/2312.15685},
}
Source code in src/distilabel/steps/deita.py
class DeitaFiltering(GlobalStep):
    """Filter dataset rows using DEITA filtering strategy.

    Filter the dataset based on the DEITA score and the cosine distance between the embeddings.
    It's an implementation of the filtering step from the paper 'What Makes Good Data
    for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning'.

    Attributes:
        data_budget: The desired size of the dataset after filtering.
        diversity_threshold: If a row has a cosine distance with respect to it's nearest
            neighbor greater than this value, it will be included in the filtered dataset.
            Defaults to `0.9`.
        normalize_embeddings: Whether to normalize the embeddings before computing the cosine
            distance. Defaults to `True`.

    Runtime parameters:
        - `data_budget`: The desired size of the dataset after filtering.
        - `diversity_threshold`: If a row has a cosine distance with respect to it's nearest
            neighbor greater than this value, it will be included in the filtered dataset.

    Input columns:
        - evol_instruction_score (`float`): The score of the instruction generated by
            `ComplexityScorer` step.
        - evol_response_score (`float`): The score of the response generated by
            `QualityScorer` step.
        - embedding (`List[float]`): The embedding generated for the conversation of the
            instruction-response pair using `GenerateEmbeddings` step.

    Output columns:
        - deita_score (`float`): The DEITA score for the instruction-response pair.
        - deita_score_computed_with (`List[str]`): The scores used to compute the DEITA
            score.
        - nearest_neighbor_distance (`float`): The cosine distance between the embeddings
            of the instruction-response pair.

    Categories:
        - filtering

    References:
        - [`What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning`](https://arxiv.org/abs/2312.15685)

    Examples:
        Filter the dataset based on the DEITA score and the cosine distance between the embeddings:

        ```python
        from distilabel.steps import DeitaFiltering

        deita_filtering = DeitaFiltering(data_budget=1)

        deita_filtering.load()

        result = next(
            deita_filtering.process(
                [
                    {
                        "evol_instruction_score": 0.5,
                        "evol_response_score": 0.5,
                        "embedding": [-8.12729941, -5.24642847, -6.34003029],
                    },
                    {
                        "evol_instruction_score": 0.6,
                        "evol_response_score": 0.6,
                        "embedding": [2.99329242, 0.7800932, 0.7799726],
                    },
                    {
                        "evol_instruction_score": 0.7,
                        "evol_response_score": 0.7,
                        "embedding": [10.29041806, 14.33088073, 13.00557506],
                    },
                ],
            )
        )
        # >>> result
        # [{'evol_instruction_score': 0.5, 'evol_response_score': 0.5, 'embedding': [-8.12729941, -5.24642847, -6.34003029], 'deita_score': 0.25, 'deita_score_computed_with': ['evol_instruction_score', 'evol_response_score'], 'nearest_neighbor_distance': 1.9042812683723933}]
        ```

    Citations:
        ```
        @misc{liu2024makesgooddataalignment,
            title={What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning},
            author={Wei Liu and Weihao Zeng and Keqing He and Yong Jiang and Junxian He},
            year={2024},
            eprint={2312.15685},
            archivePrefix={arXiv},
            primaryClass={cs.CL},
            url={https://arxiv.org/abs/2312.15685},
        }
        ```
    """

    data_budget: RuntimeParameter[int] = Field(
        default=None, description="The desired size of the dataset after filtering."
    )
    diversity_threshold: RuntimeParameter[float] = Field(
        default=0.9,
        description="If a row has a cosine distance with respect to it's nearest neighbor"
        " greater than this value, it will be included in the filtered dataset.",
    )
    normalize_embeddings: RuntimeParameter[bool] = Field(
        default=True,
        description="Whether to normalize the embeddings before computing the cosine distance.",
    )
    distance_metric: RuntimeParameter[Literal["cosine", "manhattan"]] = Field(
        default="cosine",
        description="The distance metric to use. Currently only 'cosine' is supported.",
    )

    @property
    def inputs(self) -> List[str]:
        return ["evol_instruction_score", "evol_response_score", "embedding"]

    @property
    def outputs(self) -> List[str]:
        return ["deita_score", "nearest_neighbor_distance", "deita_score_computed_with"]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Filter the dataset based on the DEITA score and the cosine distance between the
        embeddings.

        Args:
            inputs: The input data.

        Returns:
            The filtered dataset.
        """
        inputs = self._compute_deita_score(inputs)
        inputs = self._compute_nearest_neighbor(inputs)
        inputs.sort(key=lambda x: x["deita_score"], reverse=True)

        selected_rows = []
        for input in inputs:
            if len(selected_rows) >= self.data_budget:  # type: ignore
                break
            if input["nearest_neighbor_distance"] >= self.diversity_threshold:
                selected_rows.append(input)
        yield selected_rows

    def _compute_deita_score(self, inputs: StepInput) -> StepInput:
        """Computes the DEITA score for each instruction-response pair. The DEITA score is
        the product of the instruction score and the response score.

        Args:
            inputs: The input data.

        Returns:
            The input data with the DEITA score computed.
        """
        for input_ in inputs:
            evol_instruction_score = input_.get("evol_instruction_score")
            evol_response_score = input_.get("evol_response_score")

            if evol_instruction_score and evol_response_score:
                deita_score = evol_instruction_score * evol_response_score
                score_computed_with = ["evol_instruction_score", "evol_response_score"]
            elif evol_instruction_score:
                self._logger.warning(
                    "Response score is missing for the instruction-response pair. Using"
                    " instruction score as DEITA score."
                )
                deita_score = evol_instruction_score
                score_computed_with = ["evol_instruction_score"]
            elif evol_response_score:
                self._logger.warning(
                    "Instruction score is missing for the instruction-response pair. Using"
                    " response score as DEITA score."
                )
                deita_score = evol_response_score
                score_computed_with = ["evol_response_score"]
            else:
                self._logger.warning(
                    "Instruction and response scores are missing for the instruction-response"
                    " pair. Setting DEITA score to 0."
                )
                deita_score = 0
                score_computed_with = []

            input_.update(
                {
                    "deita_score": deita_score,
                    "deita_score_computed_with": score_computed_with,
                }
            )
        return inputs

    def _compute_nearest_neighbor(self, inputs: StepInput) -> StepInput:
        """Computes the cosine distance between the embeddings of the instruction-response
        pairs and the nearest neighbor.

        Args:
            inputs: The input data.

        Returns:
            The input data with the cosine distance computed.
        """
        embeddings = np.array([input["embedding"] for input in inputs])
        if self.normalize_embeddings:
            embeddings = self._normalize_embeddings(embeddings)
        self._logger.info("📏 Computing nearest neighbor distance...")

        if self.distance_metric == "cosine":
            self._logger.info("📏 Using cosine distance.")
            distances = self._cosine_distance(embeddings)
        else:
            self._logger.info("📏 Using manhattan distance.")
            distances = self._manhattan_distance(embeddings)

        for distance, input in zip(distances, inputs):
            input["nearest_neighbor_distance"] = distance
        return inputs

    def _normalize_embeddings(self, embeddings: np.ndarray) -> np.ndarray:
        """Normalize the embeddings.

        Args:
            embeddings: The embeddings to normalize.

        Returns:
            The normalized embeddings.
        """
        self._logger.info("⚖️ Normalizing embeddings...")
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        return embeddings / norms

    def _cosine_distance(self, embeddings: np.array) -> np.array:  # type: ignore
        """Computes the cosine distance between the embeddings.

        Args:
            embeddings: The embeddings.

        Returns:
            The cosine distance between the embeddings.
        """
        cosine_similarity = np.dot(embeddings, embeddings.T)
        cosine_distance = 1 - cosine_similarity
        # Ignore self-distance
        np.fill_diagonal(cosine_distance, np.inf)
        return np.min(cosine_distance, axis=1)

    def _manhattan_distance(self, embeddings: np.array) -> np.array:  # type: ignore
        """Computes the manhattan distance between the embeddings.

        Args:
            embeddings: The embeddings.

        Returns:
            The manhattan distance between the embeddings.
        """
        manhattan_distance = np.abs(embeddings[:, None] - embeddings).sum(-1)
        # Ignore self-distance
        np.fill_diagonal(manhattan_distance, np.inf)
        return np.min(manhattan_distance, axis=1)
process(inputs)

Filter the dataset based on the DEITA score and the cosine distance between the embeddings.

Parameters:

Name Type Description Default
inputs StepInput

The input data.

required

Returns:

Type Description
StepOutput

The filtered dataset.

Source code in src/distilabel/steps/deita.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Filter the dataset based on the DEITA score and the cosine distance between the
    embeddings.

    Args:
        inputs: The input data.

    Returns:
        The filtered dataset.
    """
    inputs = self._compute_deita_score(inputs)
    inputs = self._compute_nearest_neighbor(inputs)
    inputs.sort(key=lambda x: x["deita_score"], reverse=True)

    selected_rows = []
    for input in inputs:
        if len(selected_rows) >= self.data_budget:  # type: ignore
            break
        if input["nearest_neighbor_distance"] >= self.diversity_threshold:
            selected_rows.append(input)
    yield selected_rows
_compute_deita_score(inputs)

Computes the DEITA score for each instruction-response pair. The DEITA score is the product of the instruction score and the response score.

Parameters:

Name Type Description Default
inputs StepInput

The input data.

required

Returns:

Type Description
StepInput

The input data with the DEITA score computed.

Source code in src/distilabel/steps/deita.py
def _compute_deita_score(self, inputs: StepInput) -> StepInput:
    """Computes the DEITA score for each instruction-response pair. The DEITA score is
    the product of the instruction score and the response score.

    Args:
        inputs: The input data.

    Returns:
        The input data with the DEITA score computed.
    """
    for input_ in inputs:
        evol_instruction_score = input_.get("evol_instruction_score")
        evol_response_score = input_.get("evol_response_score")

        if evol_instruction_score and evol_response_score:
            deita_score = evol_instruction_score * evol_response_score
            score_computed_with = ["evol_instruction_score", "evol_response_score"]
        elif evol_instruction_score:
            self._logger.warning(
                "Response score is missing for the instruction-response pair. Using"
                " instruction score as DEITA score."
            )
            deita_score = evol_instruction_score
            score_computed_with = ["evol_instruction_score"]
        elif evol_response_score:
            self._logger.warning(
                "Instruction score is missing for the instruction-response pair. Using"
                " response score as DEITA score."
            )
            deita_score = evol_response_score
            score_computed_with = ["evol_response_score"]
        else:
            self._logger.warning(
                "Instruction and response scores are missing for the instruction-response"
                " pair. Setting DEITA score to 0."
            )
            deita_score = 0
            score_computed_with = []

        input_.update(
            {
                "deita_score": deita_score,
                "deita_score_computed_with": score_computed_with,
            }
        )
    return inputs
_compute_nearest_neighbor(inputs)

Computes the cosine distance between the embeddings of the instruction-response pairs and the nearest neighbor.

Parameters:

Name Type Description Default
inputs StepInput

The input data.

required

Returns:

Type Description
StepInput

The input data with the cosine distance computed.

Source code in src/distilabel/steps/deita.py
def _compute_nearest_neighbor(self, inputs: StepInput) -> StepInput:
    """Computes the cosine distance between the embeddings of the instruction-response
    pairs and the nearest neighbor.

    Args:
        inputs: The input data.

    Returns:
        The input data with the cosine distance computed.
    """
    embeddings = np.array([input["embedding"] for input in inputs])
    if self.normalize_embeddings:
        embeddings = self._normalize_embeddings(embeddings)
    self._logger.info("📏 Computing nearest neighbor distance...")

    if self.distance_metric == "cosine":
        self._logger.info("📏 Using cosine distance.")
        distances = self._cosine_distance(embeddings)
    else:
        self._logger.info("📏 Using manhattan distance.")
        distances = self._manhattan_distance(embeddings)

    for distance, input in zip(distances, inputs):
        input["nearest_neighbor_distance"] = distance
    return inputs
_normalize_embeddings(embeddings)

Normalize the embeddings.

Parameters:

Name Type Description Default
embeddings ndarray

The embeddings to normalize.

required

Returns:

Type Description
ndarray

The normalized embeddings.

Source code in src/distilabel/steps/deita.py
def _normalize_embeddings(self, embeddings: np.ndarray) -> np.ndarray:
    """Normalize the embeddings.

    Args:
        embeddings: The embeddings to normalize.

    Returns:
        The normalized embeddings.
    """
    self._logger.info("⚖️ Normalizing embeddings...")
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    return embeddings / norms
_cosine_distance(embeddings)

Computes the cosine distance between the embeddings.

Parameters:

Name Type Description Default
embeddings array

The embeddings.

required

Returns:

Type Description
array

The cosine distance between the embeddings.

Source code in src/distilabel/steps/deita.py
def _cosine_distance(self, embeddings: np.array) -> np.array:  # type: ignore
    """Computes the cosine distance between the embeddings.

    Args:
        embeddings: The embeddings.

    Returns:
        The cosine distance between the embeddings.
    """
    cosine_similarity = np.dot(embeddings, embeddings.T)
    cosine_distance = 1 - cosine_similarity
    # Ignore self-distance
    np.fill_diagonal(cosine_distance, np.inf)
    return np.min(cosine_distance, axis=1)
_manhattan_distance(embeddings)

Computes the manhattan distance between the embeddings.

Parameters:

Name Type Description Default
embeddings array

The embeddings.

required

Returns:

Type Description
array

The manhattan distance between the embeddings.

Source code in src/distilabel/steps/deita.py
def _manhattan_distance(self, embeddings: np.array) -> np.array:  # type: ignore
    """Computes the manhattan distance between the embeddings.

    Args:
        embeddings: The embeddings.

    Returns:
        The manhattan distance between the embeddings.
    """
    manhattan_distance = np.abs(embeddings[:, None] - embeddings).sum(-1)
    # Ignore self-distance
    np.fill_diagonal(manhattan_distance, np.inf)
    return np.min(manhattan_distance, axis=1)

EmbeddingGeneration

Bases: Step

Generate embeddings using an Embeddings model.

EmbeddingGeneration is a Step that using an Embeddings model generates sentence embeddings for the provided input texts.

Attributes:

Name Type Description
embeddings Embeddings

the Embeddings model used to generate the sentence embeddings.

Input columns
  • text (str): The text for which the sentence embedding has to be generated.
Output columns
  • embedding (List[Union[float, int]]): the generated sentence embedding.
Categories
  • embedding

Examples:

Generate sentence embeddings with Sentence Transformers:

from distilabel.embeddings import SentenceTransformerEmbeddings
from distilabel.steps import EmbeddingGeneration

embedding_generation = EmbeddingGeneration(
    embeddings=SentenceTransformerEmbeddings(
        model="mixedbread-ai/mxbai-embed-large-v1",
    )
)

embedding_generation.load()

result = next(embedding_generation.process([{"text": "Hello, how are you?"}]))
# [{'text': 'Hello, how are you?', 'embedding': [0.06209656596183777, -0.015797119587659836, ...]}]
Source code in src/distilabel/steps/embeddings/embedding_generation.py
class EmbeddingGeneration(Step):
    """Generate embeddings using an `Embeddings` model.

    `EmbeddingGeneration` is a `Step` that using an `Embeddings` model generates sentence
    embeddings for the provided input texts.

    Attributes:
        embeddings: the `Embeddings` model used to generate the sentence embeddings.

    Input columns:
        - text (`str`): The text for which the sentence embedding has to be generated.

    Output columns:
        - embedding (`List[Union[float, int]]`): the generated sentence embedding.

    Categories:
        - embedding

    Examples:
        Generate sentence embeddings with Sentence Transformers:

        ```python
        from distilabel.embeddings import SentenceTransformerEmbeddings
        from distilabel.steps import EmbeddingGeneration

        embedding_generation = EmbeddingGeneration(
            embeddings=SentenceTransformerEmbeddings(
                model="mixedbread-ai/mxbai-embed-large-v1",
            )
        )

        embedding_generation.load()

        result = next(embedding_generation.process([{"text": "Hello, how are you?"}]))
        # [{'text': 'Hello, how are you?', 'embedding': [0.06209656596183777, -0.015797119587659836, ...]}]
        ```

    """

    embeddings: Embeddings

    @property
    def inputs(self) -> "StepColumns":
        return ["text"]

    @property
    def outputs(self) -> "StepColumns":
        return ["embedding", "model_name"]

    def load(self) -> None:
        """Loads the `Embeddings` model."""
        super().load()

        self.embeddings.load()

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        embeddings = self.embeddings.encode(inputs=[input["text"] for input in inputs])
        for input, embedding in zip(inputs, embeddings):
            input["embedding"] = embedding
            input["model_name"] = self.embeddings.model_name
        yield inputs

    def unload(self) -> None:
        super().unload()
        self.embeddings.unload()
load()

Loads the Embeddings model.

Source code in src/distilabel/steps/embeddings/embedding_generation.py
def load(self) -> None:
    """Loads the `Embeddings` model."""
    super().load()

    self.embeddings.load()

FaissNearestNeighbour

Bases: GlobalStep

Create a faiss index to get the nearest neighbours.

FaissNearestNeighbour is a GlobalStep that creates a faiss index using the Hugging Face datasets library integration, and then gets the nearest neighbours and the scores or distance of the nearest neighbours for each input row.

Attributes:

Name Type Description
device Optional[RuntimeParameter[Union[int, List[int]]]]

the CUDA device ID or a list of IDs to be used. If negative integer, it will use all the available GPUs. Defaults to None.

string_factory Optional[RuntimeParameter[str]]

the name of the factory to be used to build the faiss index. Available string factories can be checked here: https://github.com/facebookresearch/faiss/wiki/Faiss-indexes. Defaults to None.

metric_type Optional[RuntimeParameter[int]]

the metric to be used to measure the distance between the points. It's an integer and the recommend way to pass it is importing faiss and then passing one of faiss.METRIC_x variables. Defaults to None.

k Optional[RuntimeParameter[int]]

the number of nearest neighbours to search for each input row. Defaults to 1.

search_batch_size Optional[RuntimeParameter[int]]

the number of rows to include in a search batch. The value can be adjusted to maximize the resources usage or to avoid OOM issues. Defaults to 50.

train_size Optional[RuntimeParameter[int]]

If the index needs a training step, specifies how many vectors will be used to train the index.

Runtime parameters
  • device: the CUDA device ID or a list of IDs to be used. If negative integer, it will use all the available GPUs. Defaults to None.
  • string_factory: the name of the factory to be used to build the faiss index. Available string factories can be checked here: https://github.com/facebookresearch/faiss/wiki/Faiss-indexes. Defaults to None.
  • metric_type: the metric to be used to measure the distance between the points. It's an integer and the recommend way to pass it is importing faiss and then passing one of faiss.METRIC_x variables. Defaults to None.
  • k: the number of nearest neighbours to search for each input row. Defaults to 1.
  • search_batch_size: the number of rows to include in a search batch. The value can be adjusted to maximize the resources usage or to avoid OOM issues. Defaults to 50.
  • train_size: If the index needs a training step, specifies how many vectors will be used to train the index.
Input columns
  • embedding (List[Union[float, int]]): a sentence embedding.
Output columns
  • nn_indices (List[int]): a list containing the indices of the k nearest neighbours in the inputs for the row.
  • nn_scores (List[float]): a list containing the score or distance to each k nearest neighbour in the inputs.
Categories
  • embedding
References

Examples:

Generating embeddings and getting the nearest neighbours:

from distilabel.embeddings.sentence_transformers import SentenceTransformerEmbeddings
from distilabel.pipeline import Pipeline
from distilabel.steps import EmbeddingGeneration, FaissNearestNeighbour, LoadDataFromHub

with Pipeline(name="hello") as pipeline:
    load_data = LoadDataFromHub(output_mappings={"prompt": "text"})

    embeddings = EmbeddingGeneration(
        embeddings=SentenceTransformerEmbeddings(
            model="mixedbread-ai/mxbai-embed-large-v1"
        )
    )

    nearest_neighbours = FaissNearestNeighbour()

    load_data >> embeddings >> nearest_neighbours

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            load_data.name: {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
        },
        use_cache=False,
    )
Citations
@misc{douze2024faisslibrary,
    title={The Faiss library},
    author={Matthijs Douze and Alexandr Guzhva and Chengqi Deng and Jeff Johnson and Gergely Szilvasy and Pierre-Emmanuel Mazaré and Maria Lomeli and Lucas Hosseini and Hervé Jégou},
    year={2024},
    eprint={2401.08281},
    archivePrefix={arXiv},
    primaryClass={cs.LG},
    url={https://arxiv.org/abs/2401.08281},
}
Source code in src/distilabel/steps/embeddings/nearest_neighbour.py
class FaissNearestNeighbour(GlobalStep):
    """Create a `faiss` index to get the nearest neighbours.

    `FaissNearestNeighbour` is a `GlobalStep` that creates a `faiss` index using the Hugging
    Face `datasets` library integration, and then gets the nearest neighbours and the scores
    or distance of the nearest neighbours for each input row.

    Attributes:
        device: the CUDA device ID or a list of IDs to be used. If negative integer, it
            will use all the available GPUs. Defaults to `None`.
        string_factory: the name of the factory to be used to build the `faiss` index.
            Available string factories can be checked here: https://github.com/facebookresearch/faiss/wiki/Faiss-indexes.
            Defaults to `None`.
        metric_type: the metric to be used to measure the distance between the points. It's
            an integer and the recommend way to pass it is importing `faiss` and then passing
            one of `faiss.METRIC_x` variables. Defaults to `None`.
        k: the number of nearest neighbours to search for each input row. Defaults to `1`.
        search_batch_size: the number of rows to include in a search batch. The value can
            be adjusted to maximize the resources usage or to avoid OOM issues. Defaults
            to `50`.
        train_size: If the index needs a training step, specifies how many vectors will be
            used to train the index.

    Runtime parameters:
        - `device`: the CUDA device ID or a list of IDs to be used. If negative integer,
            it will use all the available GPUs. Defaults to `None`.
        - `string_factory`: the name of the factory to be used to build the `faiss` index.
            Available string factories can be checked here: https://github.com/facebookresearch/faiss/wiki/Faiss-indexes.
            Defaults to `None`.
        - `metric_type`: the metric to be used to measure the distance between the points.
            It's an integer and the recommend way to pass it is importing `faiss` and then
            passing one of `faiss.METRIC_x` variables. Defaults to `None`.
        - `k`: the number of nearest neighbours to search for each input row. Defaults to `1`.
        - `search_batch_size`: the number of rows to include in a search batch. The value
            can be adjusted to maximize the resources usage or to avoid OOM issues. Defaults
            to `50`.
        - `train_size`: If the index needs a training step, specifies how many vectors will
            be used to train the index.

    Input columns:
        - embedding (`List[Union[float, int]]`): a sentence embedding.

    Output columns:
        - nn_indices (`List[int]`): a list containing the indices of the `k` nearest neighbours
            in the inputs for the row.
        - nn_scores (`List[float]`): a list containing the score or distance to each `k`
            nearest neighbour in the inputs.

    Categories:
        - embedding

    References:
        - [`The Faiss library`](https://arxiv.org/abs/2401.08281)

    Examples:
        Generating embeddings and getting the nearest neighbours:

        ```python
        from distilabel.embeddings.sentence_transformers import SentenceTransformerEmbeddings
        from distilabel.pipeline import Pipeline
        from distilabel.steps import EmbeddingGeneration, FaissNearestNeighbour, LoadDataFromHub

        with Pipeline(name="hello") as pipeline:
            load_data = LoadDataFromHub(output_mappings={"prompt": "text"})

            embeddings = EmbeddingGeneration(
                embeddings=SentenceTransformerEmbeddings(
                    model="mixedbread-ai/mxbai-embed-large-v1"
                )
            )

            nearest_neighbours = FaissNearestNeighbour()

            load_data >> embeddings >> nearest_neighbours

        if __name__ == "__main__":
            distiset = pipeline.run(
                parameters={
                    load_data.name: {
                        "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                        "split": "test",
                    },
                },
                use_cache=False,
            )
        ```

    Citations:
        ```
        @misc{douze2024faisslibrary,
            title={The Faiss library},
            author={Matthijs Douze and Alexandr Guzhva and Chengqi Deng and Jeff Johnson and Gergely Szilvasy and Pierre-Emmanuel Mazaré and Maria Lomeli and Lucas Hosseini and Hervé Jégou},
            year={2024},
            eprint={2401.08281},
            archivePrefix={arXiv},
            primaryClass={cs.LG},
            url={https://arxiv.org/abs/2401.08281},
        }
        ```
    """

    device: Optional[RuntimeParameter[Union[int, List[int]]]] = Field(
        default=None,
        description="The CUDA device ID or a list of IDs to be used. If negative integer,"
        " it will use all the available GPUs.",
    )
    string_factory: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The name of the factory to be used to build the `faiss` index."
        "Available string factories can be checked here: https://github.com/facebookresearch/faiss/wiki/Faiss-indexes.",
    )
    metric_type: Optional[RuntimeParameter[int]] = Field(
        default=None,
        description="The metric to be used to measure the distance between the points. It's"
        " an integer and the recommend way to pass it is importing `faiss` and thenpassing"
        " one of `faiss.METRIC_x` variables.",
    )
    k: Optional[RuntimeParameter[int]] = Field(
        default=1,
        description="The number of nearest neighbours to search for each input row.",
    )
    search_batch_size: Optional[RuntimeParameter[int]] = Field(
        default=50,
        description="The number of rows to include in a search batch. The value can be adjusted"
        " to maximize the resources usage or to avoid OOM issues.",
    )
    train_size: Optional[RuntimeParameter[int]] = Field(
        default=None,
        description="If the index needs a training step, specifies how many vectors will be used to train the index.",
    )

    def load(self) -> None:
        super().load()

        if importlib.util.find_spec("faiss") is None:
            raise ImportError(
                "`faiss` package is not installed. Please install it using `pip install"
                " faiss-cpu` or `pip install faiss-gpu`."
            )

    @property
    def inputs(self) -> List[str]:
        return ["embedding"]

    @property
    def outputs(self) -> List[str]:
        return ["nn_indices", "nn_scores"]

    def _build_index(self, inputs: List[Dict[str, Any]]) -> Dataset:
        """Builds a `faiss` index using `datasets` integration.

        Args:
            inputs: a list of dictionaries.

        Returns:
            The build `datasets.Dataset` with its `faiss` index.
        """
        dataset = Dataset.from_list(inputs)
        if self.train_size is not None and self.string_factory:
            self._logger.info("🏋️‍♀️ Starting Faiss index training...")
        dataset.add_faiss_index(
            column="embedding",
            device=self.device,  # type: ignore
            string_factory=self.string_factory,
            metric_type=self.metric_type,
            train_size=self.train_size,
        )
        return dataset

    def _save_index(self, dataset: Dataset) -> None:
        """Save the generated Faiss index as an artifact of the step.

        Args:
            dataset: the dataset with the `faiss` index built.
        """
        self.save_artifact(
            name="faiss_index",
            write_function=lambda path: dataset.save_faiss_index(
                index_name="embedding", file=path / "index.faiss"
            ),
            metadata={
                "num_rows": len(dataset),
                "embedding_dim": len(dataset[0]["embedding"]),
            },
        )

    def _search(self, dataset: Dataset) -> Dataset:
        """Search the top `k` nearest neighbours for each row in the dataset.

        Args:
            dataset: the dataset with the `faiss` index built.

        Returns:
            The updated dataset containing the top `k` nearest neighbours for each row,
            as well as the score or distance.
        """

        def add_search_results(examples: Dict[str, List[Any]]) -> Dict[str, List[Any]]:
            queries = np.array(examples["embedding"])
            results = dataset.search_batch(
                index_name="embedding",
                queries=queries,
                k=self.k + 1,  # type: ignore
            )
            examples["nn_indices"] = [indices[1:] for indices in results.total_indices]
            examples["nn_scores"] = [scores[1:] for scores in results.total_scores]
            return examples

        return dataset.map(
            add_search_results, batched=True, batch_size=self.search_batch_size
        )

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        dataset = self._build_index(inputs)
        dataset_with_search_results = self._search(dataset)
        self._save_index(dataset)
        yield dataset_with_search_results.to_list()
_build_index(inputs)

Builds a faiss index using datasets integration.

Parameters:

Name Type Description Default
inputs List[Dict[str, Any]]

a list of dictionaries.

required

Returns:

Type Description
Dataset

The build datasets.Dataset with its faiss index.

Source code in src/distilabel/steps/embeddings/nearest_neighbour.py
def _build_index(self, inputs: List[Dict[str, Any]]) -> Dataset:
    """Builds a `faiss` index using `datasets` integration.

    Args:
        inputs: a list of dictionaries.

    Returns:
        The build `datasets.Dataset` with its `faiss` index.
    """
    dataset = Dataset.from_list(inputs)
    if self.train_size is not None and self.string_factory:
        self._logger.info("🏋️‍♀️ Starting Faiss index training...")
    dataset.add_faiss_index(
        column="embedding",
        device=self.device,  # type: ignore
        string_factory=self.string_factory,
        metric_type=self.metric_type,
        train_size=self.train_size,
    )
    return dataset
_save_index(dataset)

Save the generated Faiss index as an artifact of the step.

Parameters:

Name Type Description Default
dataset Dataset

the dataset with the faiss index built.

required
Source code in src/distilabel/steps/embeddings/nearest_neighbour.py
def _save_index(self, dataset: Dataset) -> None:
    """Save the generated Faiss index as an artifact of the step.

    Args:
        dataset: the dataset with the `faiss` index built.
    """
    self.save_artifact(
        name="faiss_index",
        write_function=lambda path: dataset.save_faiss_index(
            index_name="embedding", file=path / "index.faiss"
        ),
        metadata={
            "num_rows": len(dataset),
            "embedding_dim": len(dataset[0]["embedding"]),
        },
    )

Search the top k nearest neighbours for each row in the dataset.

Parameters:

Name Type Description Default
dataset Dataset

the dataset with the faiss index built.

required

Returns:

Type Description
Dataset

The updated dataset containing the top k nearest neighbours for each row,

Dataset

as well as the score or distance.

Source code in src/distilabel/steps/embeddings/nearest_neighbour.py
def _search(self, dataset: Dataset) -> Dataset:
    """Search the top `k` nearest neighbours for each row in the dataset.

    Args:
        dataset: the dataset with the `faiss` index built.

    Returns:
        The updated dataset containing the top `k` nearest neighbours for each row,
        as well as the score or distance.
    """

    def add_search_results(examples: Dict[str, List[Any]]) -> Dict[str, List[Any]]:
        queries = np.array(examples["embedding"])
        results = dataset.search_batch(
            index_name="embedding",
            queries=queries,
            k=self.k + 1,  # type: ignore
        )
        examples["nn_indices"] = [indices[1:] for indices in results.total_indices]
        examples["nn_scores"] = [scores[1:] for scores in results.total_scores]
        return examples

    return dataset.map(
        add_search_results, batched=True, batch_size=self.search_batch_size
    )

EmbeddingDedup

Bases: GlobalStep

Deduplicates text using embeddings.

EmbeddingDedup is a Step that detects near-duplicates in datasets, using embeddings to compare the similarity between the texts. The typical workflow with this step would include having a dataset with embeddings precomputed, and then (possibly using the FaissNearestNeighbour) using the nn_indices and nn_scores, determine the texts that are duplicate.

Attributes:

Name Type Description
threshold Optional[RuntimeParameter[float]]

the threshold to consider 2 examples as duplicates. It's dependent on the type of index that was used to generate the embeddings. For example, if the embeddings were generated using cosine similarity, a threshold of 0.9 would make all the texts with a cosine similarity above the value duplicates. Higher values detect less duplicates in such an index, but that should be taken into account when building it. Defaults to 0.9.

Runtime Parameters
  • threshold: the threshold to consider 2 examples as duplicates.
Input columns
  • nn_indices (List[int]): a list containing the indices of the k nearest neighbours in the inputs for the row.
  • nn_scores (List[float]): a list containing the score or distance to each k nearest neighbour in the inputs.
Output columns
  • keep_row_after_embedding_filtering (bool): boolean indicating if the piece text is not a duplicate i.e. this text should be kept.
Categories
  • filtering

Examples:

Deduplicate a list of texts using embedding information:

```python
from distilabel.pipeline import Pipeline
from distilabel.steps import EmbeddingDedup
from distilabel.steps import LoadDataFromDicts

with Pipeline() as pipeline:
    data = LoadDataFromDicts(
        data=[
            {
                "persona": "A chemistry student or academic researcher interested in inorganic or physical chemistry, likely at an advanced undergraduate or graduate level, studying acid-base interactions and chemical bonding.",
                "embedding": [
                    0.018477669046149742,
                    -0.03748236608841726,
                    0.001919870620352492,
                    0.024918478063770535,
                    0.02348063521315178,
                    0.0038251285566308375,
                    -0.01723884983037716,
                    0.02881971942372201,
                ],
                "nn_indices": [0, 1],
                "nn_scores": [
                    0.9164746999740601,
                    0.782106876373291,
                ],
            },
            {
                "persona": "A music teacher or instructor focused on theoretical and practical piano lessons.",
                "embedding": [
                    -0.0023464179614082125,
                    -0.07325472251663565,
                    -0.06058678419516501,
                    -0.02100326928586996,
                    -0.013462744792362657,
                    0.027368447064244242,
                    -0.003916070100455717,
                    0.01243614518480423,
                ],
                "nn_indices": [0, 2],
                "nn_scores": [
                    0.7552462220191956,
                    0.7261884808540344,
                ],
            },
            {
                "persona": "A classical guitar teacher or instructor, likely with experience teaching beginners, who focuses on breaking down complex music notation into understandable steps for their students.",
                "embedding": [
                    -0.01630817942328242,
                    -0.023760151552345232,
                    -0.014249650090627883,
                    -0.005713686451446624,
                    -0.016033059279131567,
                    0.0071440908501058786,
                    -0.05691099643425161,
                    0.01597412704817784,
                ],
                "nn_indices": [1, 2],
                "nn_scores": [
                    0.8107735514640808,
                    0.7172299027442932,
                ],
            },
        ],
        batch_size=batch_size,
    )
    # In general you should do something like this before the deduplication step, to obtain the
    # `nn_indices` and `nn_scores`. In this case the embeddings are already normalized, so there's
    # no need for it.
    # nn = FaissNearestNeighbour(
    #     k=30,
    #     metric_type=faiss.METRIC_INNER_PRODUCT,
    #     search_batch_size=50,
    #     train_size=len(dataset),              # The number of embeddings to use for training
    #     string_factory="IVF300_HNSW32,Flat"   # To use an index (optional, maybe required for big datasets)
    # )
    # Read more about the `string_factory` here:
    # https://github.com/facebookresearch/faiss/wiki/Guidelines-to-choose-an-index

    embedding_dedup = EmbeddingDedup(
        threshold=0.8,
        input_batch_size=batch_size,
    )

    data >> embedding_dedup

if __name__ == "__main__":
    distiset = pipeline.run(use_cache=False)
    ds = distiset["default"]["train"]
    # Filter out the duplicates
    ds_dedup = ds.filter(lambda x: x["keep_row_after_embedding_filtering"])
```
Source code in src/distilabel/steps/filtering/embedding.py
class EmbeddingDedup(GlobalStep):
    """Deduplicates text using embeddings.

    `EmbeddingDedup` is a Step that detects near-duplicates in datasets, using
    embeddings to compare the similarity between the texts. The typical workflow with this step
    would include having a dataset with embeddings precomputed, and then (possibly using the
    `FaissNearestNeighbour`) using the `nn_indices` and `nn_scores`, determine the texts that
    are duplicate.

    Attributes:
        threshold: the threshold to consider 2 examples as duplicates.
            It's dependent on the type of index that was used to generate the embeddings.
            For example, if the embeddings were generated using cosine similarity, a threshold
            of `0.9` would make all the texts with a cosine similarity above the value
            duplicates. Higher values detect less duplicates in such an index, but that should
            be taken into account when building it. Defaults to `0.9`.

    Runtime Parameters:
        - `threshold`: the threshold to consider 2 examples as duplicates.

    Input columns:
        - nn_indices (`List[int]`): a list containing the indices of the `k` nearest neighbours
            in the inputs for the row.
        - nn_scores (`List[float]`): a list containing the score or distance to each `k`
            nearest neighbour in the inputs.

    Output columns:
        - keep_row_after_embedding_filtering (`bool`): boolean indicating if the piece `text` is
            not a duplicate i.e. this text should be kept.

    Categories:
        - filtering

    Examples:

        Deduplicate a list of texts using embedding information:

        ```python
        from distilabel.pipeline import Pipeline
        from distilabel.steps import EmbeddingDedup
        from distilabel.steps import LoadDataFromDicts

        with Pipeline() as pipeline:
            data = LoadDataFromDicts(
                data=[
                    {
                        "persona": "A chemistry student or academic researcher interested in inorganic or physical chemistry, likely at an advanced undergraduate or graduate level, studying acid-base interactions and chemical bonding.",
                        "embedding": [
                            0.018477669046149742,
                            -0.03748236608841726,
                            0.001919870620352492,
                            0.024918478063770535,
                            0.02348063521315178,
                            0.0038251285566308375,
                            -0.01723884983037716,
                            0.02881971942372201,
                        ],
                        "nn_indices": [0, 1],
                        "nn_scores": [
                            0.9164746999740601,
                            0.782106876373291,
                        ],
                    },
                    {
                        "persona": "A music teacher or instructor focused on theoretical and practical piano lessons.",
                        "embedding": [
                            -0.0023464179614082125,
                            -0.07325472251663565,
                            -0.06058678419516501,
                            -0.02100326928586996,
                            -0.013462744792362657,
                            0.027368447064244242,
                            -0.003916070100455717,
                            0.01243614518480423,
                        ],
                        "nn_indices": [0, 2],
                        "nn_scores": [
                            0.7552462220191956,
                            0.7261884808540344,
                        ],
                    },
                    {
                        "persona": "A classical guitar teacher or instructor, likely with experience teaching beginners, who focuses on breaking down complex music notation into understandable steps for their students.",
                        "embedding": [
                            -0.01630817942328242,
                            -0.023760151552345232,
                            -0.014249650090627883,
                            -0.005713686451446624,
                            -0.016033059279131567,
                            0.0071440908501058786,
                            -0.05691099643425161,
                            0.01597412704817784,
                        ],
                        "nn_indices": [1, 2],
                        "nn_scores": [
                            0.8107735514640808,
                            0.7172299027442932,
                        ],
                    },
                ],
                batch_size=batch_size,
            )
            # In general you should do something like this before the deduplication step, to obtain the
            # `nn_indices` and `nn_scores`. In this case the embeddings are already normalized, so there's
            # no need for it.
            # nn = FaissNearestNeighbour(
            #     k=30,
            #     metric_type=faiss.METRIC_INNER_PRODUCT,
            #     search_batch_size=50,
            #     train_size=len(dataset),              # The number of embeddings to use for training
            #     string_factory="IVF300_HNSW32,Flat"   # To use an index (optional, maybe required for big datasets)
            # )
            # Read more about the `string_factory` here:
            # https://github.com/facebookresearch/faiss/wiki/Guidelines-to-choose-an-index

            embedding_dedup = EmbeddingDedup(
                threshold=0.8,
                input_batch_size=batch_size,
            )

            data >> embedding_dedup

        if __name__ == "__main__":
            distiset = pipeline.run(use_cache=False)
            ds = distiset["default"]["train"]
            # Filter out the duplicates
            ds_dedup = ds.filter(lambda x: x["keep_row_after_embedding_filtering"])
        ```
    """

    threshold: Optional[RuntimeParameter[float]] = Field(
        default=0.9,
        description="The threshold to consider 2 examples as duplicates. It's dependent "
        "on the type of index that was used to generate the embeddings. For example, if "
        "the embeddings were generated using cosine similarity, a threshold of `0.9` "
        "would make all the texts with a cosine similarity above the value duplicates. "
        "Higher values detect less duplicates in such an index, but that should be "
        "taken into account when building it.",
    )

    @property
    def inputs(self) -> List[str]:
        return ["nn_scores", "nn_indices"]

    @property
    def outputs(self) -> List[str]:
        return ["keep_row_after_embedding_filtering"]

    @override
    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        rows_to_remove = set()

        for input in track(inputs, description="Running Embedding deduplication..."):
            input["keep_row_after_embedding_filtering"] = True
            indices_scores = np.array(input["nn_scores"]) > self.threshold
            indices = np.array(input["nn_indices"])[indices_scores]
            if len(indices) > 0:  # If there are any rows found over the threshold
                rows_to_remove.update(list(indices))

        # Remove duplicates and get the list of rows to remove
        for idx in rows_to_remove:
            inputs[idx]["keep_row_after_embedding_filtering"] = False

        yield inputs

MinHashDedup

Bases: Step

Deduplicates text using MinHash and MinHashLSH.

MinHashDedup is a Step that detects near-duplicates in datasets. The idea roughly translates to the following steps: 1. Tokenize the text into words or ngrams. 2. Create a MinHash for each text. 3. Store the MinHashes in a MinHashLSH. 4. Check if the MinHash is already in the LSH, if so, it is a duplicate.

Attributes:

Name Type Description
num_perm int

the number of permutations to use. Defaults to 128.

seed int

the seed to use for the MinHash. This seed must be the same used for MinHash, keep in mind when both steps are created. Defaults to 1.

tokenizer Literal['words', 'ngrams']

the tokenizer to use. Available ones are words or ngrams. If words is selected, it tokenize the text into words using nltk's word tokenizer. ngram estimates the ngrams (together with the size n) using. Defaults to words.

n Optional[int]

the size of the ngrams to use. Only relevant if tokenizer="ngrams". Defaults to 5.

threshold float

the threshold to consider two MinHashes as duplicates. Values closer to 0 detect more duplicates. Defaults to 0.9.

storage Literal['dict', 'disk']

the storage to use for the LSH. Can be dict to store the index in memory, or disk. Keep in mind, disk is an experimental feature not defined in datasketch, that is based on DiskCache's Index class. It should work as a dict, but backed by disk, but depending on the system it can be slower. Defaults to dict. which uses a custom shelve backend. Note the disk is an experimetal feature that may cause issues. Defaults to dict.

Input columns
  • text (str): the texts to be filtered.
Output columns
  • keep_row_after_minhash_filtering (bool): boolean indicating if the piece text is not a duplicate i.e. this text should be kept.
Categories
  • filtering
References

Examples:

Deduplicate a list of texts using MinHash and MinHashLSH:

```python
from distilabel.pipeline import Pipeline
from distilabel.steps import MinHashDedup
from distilabel.steps import LoadDataFromDicts

with Pipeline() as pipeline:
    ds_size = 1000
    batch_size = 500  # Bigger batch sizes work better for this step
    data = LoadDataFromDicts(
        data=[
            {"text": "This is a test document."},
            {"text": "This document is a test."},
            {"text": "Test document for duplication."},
            {"text": "Document for duplication test."},
            {"text": "This is another unique document."},
        ]
        * (ds_size // 5),
        batch_size=batch_size,
    )
    minhash_dedup = MinHashDedup(
        tokenizer="words",
        threshold=0.9,      # lower values will increase the number of duplicates
        storage="dict",     # or "disk" for bigger datasets
    )

    data >> minhash_dedup

if __name__ == "__main__":
    distiset = pipeline.run(use_cache=False)
    ds = distiset["default"]["train"]
    # Filter out the duplicates
    ds_dedup = ds.filter(lambda x: x["keep_row_after_minhash_filtering"])
```
Source code in src/distilabel/steps/filtering/minhash.py
class MinHashDedup(Step):
    """Deduplicates text using `MinHash` and `MinHashLSH`.

    `MinHashDedup` is a Step that detects near-duplicates in datasets. The idea roughly translates
    to the following steps:
    1. Tokenize the text into words or ngrams.
    2. Create a `MinHash` for each text.
    3. Store the `MinHashes` in a `MinHashLSH`.
    4. Check if the `MinHash` is already in the `LSH`, if so, it is a duplicate.

    Attributes:
        num_perm: the number of permutations to use. Defaults to `128`.
        seed: the seed to use for the MinHash. This seed must be the same
            used for `MinHash`, keep in mind when both steps are created. Defaults to `1`.
        tokenizer: the tokenizer to use. Available ones are `words` or `ngrams`.
            If `words` is selected, it tokenize the text into words using nltk's
            word tokenizer. `ngram` estimates the ngrams (together with the size
            `n`) using. Defaults to `words`.
        n: the size of the ngrams to use. Only relevant if `tokenizer="ngrams"`. Defaults to `5`.
        threshold: the threshold to consider two MinHashes as duplicates.
            Values closer to 0 detect more duplicates. Defaults to `0.9`.
        storage: the storage to use for the LSH. Can be `dict` to store the index
            in memory, or `disk`. Keep in mind, `disk` is an experimental feature
            not defined in `datasketch`, that is based on DiskCache's `Index` class.
            It should work as a `dict`, but backed by disk, but depending on the system
            it can be slower. Defaults to `dict`.
            which uses a custom `shelve` backend. Note the `disk`
            is an experimetal feature that may cause issues. Defaults to `dict`.

    Input columns:
        - text (`str`): the texts to be filtered.

    Output columns:
        - keep_row_after_minhash_filtering (`bool`): boolean indicating if the piece `text` is
            not a duplicate i.e. this text should be kept.

    Categories:
        - filtering

    References:
        - [`datasketch documentation`](https://ekzhu.github.io/datasketch/lsh.html)
        - [Identifying and Filtering Near-Duplicate Documents](https://cs.brown.edu/courses/cs253/papers/nearduplicate.pdf)
        - [Diskcache's Index](https://grantjenks.com/docs/diskcache/api.html#diskcache.Index)

    Examples:

        Deduplicate a list of texts using MinHash and MinHashLSH:

        ```python
        from distilabel.pipeline import Pipeline
        from distilabel.steps import MinHashDedup
        from distilabel.steps import LoadDataFromDicts

        with Pipeline() as pipeline:
            ds_size = 1000
            batch_size = 500  # Bigger batch sizes work better for this step
            data = LoadDataFromDicts(
                data=[
                    {"text": "This is a test document."},
                    {"text": "This document is a test."},
                    {"text": "Test document for duplication."},
                    {"text": "Document for duplication test."},
                    {"text": "This is another unique document."},
                ]
                * (ds_size // 5),
                batch_size=batch_size,
            )
            minhash_dedup = MinHashDedup(
                tokenizer="words",
                threshold=0.9,      # lower values will increase the number of duplicates
                storage="dict",     # or "disk" for bigger datasets
            )

            data >> minhash_dedup

        if __name__ == "__main__":
            distiset = pipeline.run(use_cache=False)
            ds = distiset["default"]["train"]
            # Filter out the duplicates
            ds_dedup = ds.filter(lambda x: x["keep_row_after_minhash_filtering"])
        ```
    """

    num_perm: int = 128
    seed: int = 1
    tokenizer: Literal["words", "ngrams"] = "words"
    n: Optional[int] = 5
    threshold: float = 0.9
    storage: Literal["dict", "disk"] = "dict"

    _hasher: Union["MinHash", None] = PrivateAttr(None)
    _tokenizer: Union[Callable, None] = PrivateAttr(None)
    _lhs: Union["MinHashLSH", None] = PrivateAttr(None)

    def load(self) -> None:
        super().load()
        if not importlib.import_module("datasketch"):
            raise ImportError(
                "`datasketch` is needed to deduplicate with MinHash, but is not installed. "
                "Please install it using `pip install datasketch`."
            )
        from datasketch import MinHash

        from distilabel.steps.filtering._datasketch import MinHashLSH

        self._hasher = MinHash.bulk
        self._lsh = MinHashLSH(
            num_perm=self.num_perm,
            threshold=self.threshold,
            storage_config={"type": self.storage},
        )

        if self.tokenizer == "words":
            if not importlib.import_module("nltk"):
                raise ImportError(
                    "`nltk` is needed to tokenize based on words, but is not installed. "
                    "Please install it using `pip install nltk`. Then run `nltk.download('punkt_tab')`."
                )
            self._tokenizer = tokenized_on_words
        else:
            self._tokenizer = partial(tokenize_on_ngrams, n=self.n)

    def unload(self) -> None:
        super().unload()
        # In case of LSH being stored in disk, we need to close the file.
        if self.storage == "disk":
            self._lsh.close()

    @property
    def inputs(self) -> List[str]:
        return ["text"]

    @property
    def outputs(self) -> List[str]:
        return ["keep_row_after_minhash_filtering"]

    def process(self, inputs: StepInput) -> "StepOutput":
        tokenized_texts = []
        for input in inputs:
            tokenized_texts.append(self._tokenizer([input[self.inputs[0]]])[0])

        minhashes = self._hasher(
            tokenized_texts, num_perm=self.num_perm, seed=self.seed
        )

        for input, minhash in zip(inputs, minhashes):
            # Check if the text is already in the LSH index
            if self._lsh.query(minhash):
                input["keep_row_after_minhash_filtering"] = False
            else:
                self._lsh.insert(str(uuid.uuid4()), minhash)
                input["keep_row_after_minhash_filtering"] = True

        yield inputs

ConversationTemplate

Bases: Step

Generate a conversation template from an instruction and a response.

Input columns
  • instruction (str): The instruction to be used in the conversation.
  • response (str): The response to be used in the conversation.
Output columns
  • conversation (ChatType): The conversation template.
Categories
  • format
  • chat
  • template

Examples:

Create a conversation from an instruction and a response:

from distilabel.steps import ConversationTemplate

conv_template = ConversationTemplate()
conv_template.load()

result = next(
    conv_template.process(
        [
            {
                "instruction": "Hello",
                "response": "Hi",
            }
        ],
    )
)
# >>> result
# [{'instruction': 'Hello', 'response': 'Hi', 'conversation': [{'role': 'user', 'content': 'Hello'}, {'role': 'assistant', 'content': 'Hi'}]}]
Source code in src/distilabel/steps/formatting/conversation.py
class ConversationTemplate(Step):
    """Generate a conversation template from an instruction and a response.

    Input columns:
        - instruction (`str`): The instruction to be used in the conversation.
        - response (`str`): The response to be used in the conversation.

    Output columns:
        - conversation (`ChatType`): The conversation template.

    Categories:
        - format
        - chat
        - template

    Examples:
        Create a conversation from an instruction and a response:

        ```python
        from distilabel.steps import ConversationTemplate

        conv_template = ConversationTemplate()
        conv_template.load()

        result = next(
            conv_template.process(
                [
                    {
                        "instruction": "Hello",
                        "response": "Hi",
                    }
                ],
            )
        )
        # >>> result
        # [{'instruction': 'Hello', 'response': 'Hi', 'conversation': [{'role': 'user', 'content': 'Hello'}, {'role': 'assistant', 'content': 'Hi'}]}]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """The instruction and response."""
        return ["instruction", "response"]

    @property
    def outputs(self) -> "StepColumns":
        """The conversation template."""
        return ["conversation"]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Generate a conversation template from an instruction and a response.

        Args:
            inputs: The input data.

        Yields:
            The input data with the conversation template.
        """
        for input in inputs:
            input["conversation"] = [
                {"role": "user", "content": input["instruction"]},
                {"role": "assistant", "content": input["response"]},
            ]
        yield inputs
inputs: StepColumns property

The instruction and response.

outputs: StepColumns property

The conversation template.

process(inputs)

Generate a conversation template from an instruction and a response.

Parameters:

Name Type Description Default
inputs StepInput

The input data.

required

Yields:

Type Description
StepOutput

The input data with the conversation template.

Source code in src/distilabel/steps/formatting/conversation.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Generate a conversation template from an instruction and a response.

    Args:
        inputs: The input data.

    Yields:
        The input data with the conversation template.
    """
    for input in inputs:
        input["conversation"] = [
            {"role": "user", "content": input["instruction"]},
            {"role": "assistant", "content": input["response"]},
        ]
    yield inputs

FormatChatGenerationDPO

Bases: Step

Format the output of a combination of a ChatGeneration + a preference task for Direct Preference Optimization (DPO).

FormatChatGenerationDPO is a Step that formats the output of the combination of a ChatGeneration task with a preference Task i.e. a task generating ratings such as UltraFeedback following the standard formatting from frameworks such as axolotl or alignment-handbook., so that those are used to rank the existing generations and provide the chosen and rejected generations based on the ratings.

Note

The messages column should contain at least one message from the user, the generations column should contain at least two generations, the ratings column should contain the same number of ratings as generations.

Input columns
  • messages (List[Dict[str, str]]): The conversation messages.
  • generations (List[str]): The generations produced by the LLM.
  • generation_models (List[str], optional): The model names used to generate the generations, only available if the model_name from the ChatGeneration task/s is combined into a single column named this way, otherwise, it will be ignored.
  • ratings (List[float]): The ratings for each of the generations, produced by a preference task such as UltraFeedback.
Output columns
  • prompt (str): The user message used to generate the generations with the LLM.
  • prompt_id (str): The SHA256 hash of the prompt.
  • chosen (List[Dict[str, str]]): The chosen generation based on the ratings.
  • chosen_model (str, optional): The model name used to generate the chosen generation, if the generation_models are available.
  • chosen_rating (float): The rating of the chosen generation.
  • rejected (List[Dict[str, str]]): The rejected generation based on the ratings.
  • rejected_model (str, optional): The model name used to generate the rejected generation, if the generation_models are available.
  • rejected_rating (float): The rating of the rejected generation.
Categories
  • format
  • chat-generation
  • preference
  • messages
  • generations

Examples:

Format your dataset for DPO fine tuning:

from distilabel.steps import FormatChatGenerationDPO

format_dpo = FormatChatGenerationDPO()
format_dpo.load()

# NOTE: "generation_models" can be added optionally.
result = next(
    format_dpo.process(
        [
            {
                "messages": [{"role": "user", "content": "What's 2+2?"}],
                "generations": ["4", "5", "6"],
                "ratings": [1, 0, -1],
            }
        ]
    )
)
# >>> result
# [
#     {
#         'messages': [{'role': 'user', 'content': "What's 2+2?"}],
#         'generations': ['4', '5', '6'],
#         'ratings': [1, 0, -1],
#         'prompt': "What's 2+2?",
#         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
#         'chosen': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
#         'chosen_rating': 1,
#         'rejected': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '6'}],
#         'rejected_rating': -1
#     }
# ]
Source code in src/distilabel/steps/formatting/dpo.py
class FormatChatGenerationDPO(Step):
    """Format the output of a combination of a `ChatGeneration` + a preference task for Direct Preference Optimization (DPO).

    `FormatChatGenerationDPO` is a `Step` that formats the output of the combination of a `ChatGeneration`
    task with a preference `Task` i.e. a task generating `ratings` such as `UltraFeedback` following the standard
    formatting from frameworks such as `axolotl` or `alignment-handbook`., so that those are used to rank the
    existing generations and provide the `chosen` and `rejected` generations based on the `ratings`.

    Note:
        The `messages` column should contain at least one message from the user, the `generations`
        column should contain at least two generations, the `ratings` column should contain the same
        number of ratings as generations.

    Input columns:
        - messages (`List[Dict[str, str]]`): The conversation messages.
        - generations (`List[str]`): The generations produced by the `LLM`.
        - generation_models (`List[str]`, optional): The model names used to generate the `generations`,
            only available if the `model_name` from the `ChatGeneration` task/s is combined into a single
            column named this way, otherwise, it will be ignored.
        - ratings (`List[float]`): The ratings for each of the `generations`, produced by a preference
            task such as `UltraFeedback`.

    Output columns:
        - prompt (`str`): The user message used to generate the `generations` with the `LLM`.
        - prompt_id (`str`): The `SHA256` hash of the `prompt`.
        - chosen (`List[Dict[str, str]]`): The `chosen` generation based on the `ratings`.
        - chosen_model (`str`, optional): The model name used to generate the `chosen` generation,
            if the `generation_models` are available.
        - chosen_rating (`float`): The rating of the `chosen` generation.
        - rejected (`List[Dict[str, str]]`): The `rejected` generation based on the `ratings`.
        - rejected_model (`str`, optional): The model name used to generate the `rejected` generation,
            if the `generation_models` are available.
        - rejected_rating (`float`): The rating of the `rejected` generation.

    Categories:
        - format
        - chat-generation
        - preference
        - messages
        - generations

    Examples:
        Format your dataset for DPO fine tuning:

        ```python
        from distilabel.steps import FormatChatGenerationDPO

        format_dpo = FormatChatGenerationDPO()
        format_dpo.load()

        # NOTE: "generation_models" can be added optionally.
        result = next(
            format_dpo.process(
                [
                    {
                        "messages": [{"role": "user", "content": "What's 2+2?"}],
                        "generations": ["4", "5", "6"],
                        "ratings": [1, 0, -1],
                    }
                ]
            )
        )
        # >>> result
        # [
        #     {
        #         'messages': [{'role': 'user', 'content': "What's 2+2?"}],
        #         'generations': ['4', '5', '6'],
        #         'ratings': [1, 0, -1],
        #         'prompt': "What's 2+2?",
        #         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
        #         'chosen': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
        #         'chosen_rating': 1,
        #         'rejected': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '6'}],
        #         'rejected_rating': -1
        #     }
        # ]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """List of inputs required by the `Step`, which in this case are: `messages`, `generations`,
        and `ratings`."""
        return ["messages", "generations", "ratings"]

    @property
    def optional_inputs(self) -> List[str]:
        """List of optional inputs, which are not required by the `Step` but used if available,
        which in this case is: `generation_models`."""
        return ["generation_models"]

    @property
    def outputs(self) -> "StepColumns":
        """List of outputs generated by the `Step`, which are: `prompt`, `prompt_id`, `chosen`,
        `chosen_model`, `chosen_rating`, `rejected`, `rejected_model`, `rejected_rating`. Both
        the `chosen_model` and `rejected_model` being optional and only used if `generation_models`
        is available.

        Reference:
            - Format inspired in https://huggingface.co/datasets/HuggingFaceH4/ultrachat_200k
        """
        return [
            "prompt",
            "prompt_id",
            "chosen",
            "chosen_model",
            "chosen_rating",
            "rejected",
            "rejected_model",
            "rejected_rating",
        ]

    def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
        """The `process` method formats the received `StepInput` or list of `StepInput`
        according to the DPO formatting standard.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with batches of formatted `StepInput` following the DPO standard.
        """
        for input in inputs:
            for item in input:
                item["prompt"] = next(
                    (
                        turn["content"]
                        for turn in item["messages"]
                        if turn["role"] == "user"
                    ),
                    None,
                )
                item["prompt_id"] = hashlib.sha256(
                    item["prompt"].encode("utf-8")  # type: ignore
                ).hexdigest()

                chosen_idx = max(enumerate(item["ratings"]), key=lambda x: x[1])[0]
                item["chosen"] = item["messages"] + [
                    {
                        "role": "assistant",
                        "content": item["generations"][chosen_idx],
                    }
                ]
                if "generation_models" in item:
                    item["chosen_model"] = item["generation_models"][chosen_idx]
                item["chosen_rating"] = item["ratings"][chosen_idx]

                rejected_idx = min(enumerate(item["ratings"]), key=lambda x: x[1])[0]
                item["rejected"] = item["messages"] + [
                    {
                        "role": "assistant",
                        "content": item["generations"][rejected_idx],
                    }
                ]
                if "generation_models" in item:
                    item["rejected_model"] = item["generation_models"][rejected_idx]
                item["rejected_rating"] = item["ratings"][rejected_idx]

            yield input
inputs: StepColumns property

List of inputs required by the Step, which in this case are: messages, generations, and ratings.

optional_inputs: List[str] property

List of optional inputs, which are not required by the Step but used if available, which in this case is: generation_models.

outputs: StepColumns property

List of outputs generated by the Step, which are: prompt, prompt_id, chosen, chosen_model, chosen_rating, rejected, rejected_model, rejected_rating. Both the chosen_model and rejected_model being optional and only used if generation_models is available.

Reference
  • Format inspired in https://huggingface.co/datasets/HuggingFaceH4/ultrachat_200k
process(*inputs)

The process method formats the received StepInput or list of StepInput according to the DPO formatting standard.

Parameters:

Name Type Description Default
*inputs StepInput

A list of StepInput to be combined.

()

Yields:

Type Description
StepOutput

A StepOutput with batches of formatted StepInput following the DPO standard.

Source code in src/distilabel/steps/formatting/dpo.py
def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
    """The `process` method formats the received `StepInput` or list of `StepInput`
    according to the DPO formatting standard.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with batches of formatted `StepInput` following the DPO standard.
    """
    for input in inputs:
        for item in input:
            item["prompt"] = next(
                (
                    turn["content"]
                    for turn in item["messages"]
                    if turn["role"] == "user"
                ),
                None,
            )
            item["prompt_id"] = hashlib.sha256(
                item["prompt"].encode("utf-8")  # type: ignore
            ).hexdigest()

            chosen_idx = max(enumerate(item["ratings"]), key=lambda x: x[1])[0]
            item["chosen"] = item["messages"] + [
                {
                    "role": "assistant",
                    "content": item["generations"][chosen_idx],
                }
            ]
            if "generation_models" in item:
                item["chosen_model"] = item["generation_models"][chosen_idx]
            item["chosen_rating"] = item["ratings"][chosen_idx]

            rejected_idx = min(enumerate(item["ratings"]), key=lambda x: x[1])[0]
            item["rejected"] = item["messages"] + [
                {
                    "role": "assistant",
                    "content": item["generations"][rejected_idx],
                }
            ]
            if "generation_models" in item:
                item["rejected_model"] = item["generation_models"][rejected_idx]
            item["rejected_rating"] = item["ratings"][rejected_idx]

        yield input

FormatTextGenerationDPO

Bases: Step

Format the output of your LLMs for Direct Preference Optimization (DPO).

FormatTextGenerationDPO is a Step that formats the output of the combination of a TextGeneration task with a preference Task i.e. a task generating ratings, so that those are used to rank the existing generations and provide the chosen and rejected generations based on the ratings. Use this step to transform the output of a combination of a TextGeneration + a preference task such as UltraFeedback following the standard formatting from frameworks such as axolotl or alignment-handbook.

Note

The generations column should contain at least two generations, the ratings column should contain the same number of ratings as generations.

Input columns
  • system_prompt (str, optional): The system prompt used within the LLM to generate the generations, if available.
  • instruction (str): The instruction used to generate the generations with the LLM.
  • generations (List[str]): The generations produced by the LLM.
  • generation_models (List[str], optional): The model names used to generate the generations, only available if the model_name from the TextGeneration task/s is combined into a single column named this way, otherwise, it will be ignored.
  • ratings (List[float]): The ratings for each of the generations, produced by a preference task such as UltraFeedback.
Output columns
  • prompt (str): The instruction used to generate the generations with the LLM.
  • prompt_id (str): The SHA256 hash of the prompt.
  • chosen (List[Dict[str, str]]): The chosen generation based on the ratings.
  • chosen_model (str, optional): The model name used to generate the chosen generation, if the generation_models are available.
  • chosen_rating (float): The rating of the chosen generation.
  • rejected (List[Dict[str, str]]): The rejected generation based on the ratings.
  • rejected_model (str, optional): The model name used to generate the rejected generation, if the generation_models are available.
  • rejected_rating (float): The rating of the rejected generation.
Categories
  • format
  • text-generation
  • preference
  • instruction
  • generations

Examples:

Format your dataset for DPO fine tuning:

from distilabel.steps import FormatTextGenerationDPO

format_dpo = FormatTextGenerationDPO()
format_dpo.load()

# NOTE: Both "system_prompt" and "generation_models" can be added optionally.
result = next(
    format_dpo.process(
        [
            {
                "instruction": "What's 2+2?",
                "generations": ["4", "5", "6"],
                "ratings": [1, 0, -1],
            }
        ]
    )
)
# >>> result
# [
#    {   'instruction': "What's 2+2?",
#        'generations': ['4', '5', '6'],
#        'ratings': [1, 0, -1],
#        'prompt': "What's 2+2?",
#        'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
#        'chosen': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
#        'chosen_rating': 1,
#        'rejected': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '6'}],
#        'rejected_rating': -1
#    }
# ]
Source code in src/distilabel/steps/formatting/dpo.py
class FormatTextGenerationDPO(Step):
    """Format the output of your LLMs for Direct Preference Optimization (DPO).

    `FormatTextGenerationDPO` is a `Step` that formats the output of the combination of a `TextGeneration`
    task with a preference `Task` i.e. a task generating `ratings`, so that those are used to rank the
    existing generations and provide the `chosen` and `rejected` generations based on the `ratings`.
    Use this step to transform the output of a combination of a `TextGeneration` + a preference task such as
    `UltraFeedback` following the standard formatting from frameworks such as `axolotl` or `alignment-handbook`.

    Note:
        The `generations` column should contain at least two generations, the `ratings` column should
        contain the same number of ratings as generations.

    Input columns:
        - system_prompt (`str`, optional): The system prompt used within the `LLM` to generate the
            `generations`, if available.
        - instruction (`str`): The instruction used to generate the `generations` with the `LLM`.
        - generations (`List[str]`): The generations produced by the `LLM`.
        - generation_models (`List[str]`, optional): The model names used to generate the `generations`,
            only available if the `model_name` from the `TextGeneration` task/s is combined into a single
            column named this way, otherwise, it will be ignored.
        - ratings (`List[float]`): The ratings for each of the `generations`, produced by a preference
            task such as `UltraFeedback`.

    Output columns:
        - prompt (`str`): The instruction used to generate the `generations` with the `LLM`.
        - prompt_id (`str`): The `SHA256` hash of the `prompt`.
        - chosen (`List[Dict[str, str]]`): The `chosen` generation based on the `ratings`.
        - chosen_model (`str`, optional): The model name used to generate the `chosen` generation,
            if the `generation_models` are available.
        - chosen_rating (`float`): The rating of the `chosen` generation.
        - rejected (`List[Dict[str, str]]`): The `rejected` generation based on the `ratings`.
        - rejected_model (`str`, optional): The model name used to generate the `rejected` generation,
            if the `generation_models` are available.
        - rejected_rating (`float`): The rating of the `rejected` generation.

    Categories:
        - format
        - text-generation
        - preference
        - instruction
        - generations

    Examples:
        Format your dataset for DPO fine tuning:

        ```python
        from distilabel.steps import FormatTextGenerationDPO

        format_dpo = FormatTextGenerationDPO()
        format_dpo.load()

        # NOTE: Both "system_prompt" and "generation_models" can be added optionally.
        result = next(
            format_dpo.process(
                [
                    {
                        "instruction": "What's 2+2?",
                        "generations": ["4", "5", "6"],
                        "ratings": [1, 0, -1],
                    }
                ]
            )
        )
        # >>> result
        # [
        #    {   'instruction': "What's 2+2?",
        #        'generations': ['4', '5', '6'],
        #        'ratings': [1, 0, -1],
        #        'prompt': "What's 2+2?",
        #        'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
        #        'chosen': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
        #        'chosen_rating': 1,
        #        'rejected': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '6'}],
        #        'rejected_rating': -1
        #    }
        # ]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """List of inputs required by the `Step`, which in this case are: `instruction`, `generations`,
        and `ratings`."""
        return {
            "system_prompt": False,
            "instruction": True,
            "generations": True,
            "generation_models": False,
            "ratings": True,
        }

    @property
    def optional_inputs(self) -> List[str]:
        """List of optional inputs, which are not required by the `Step` but used if available,
        which in this case are: `system_prompt`, and `generation_models`."""
        return ["system_prompt", "generation_models"]

    @property
    def outputs(self) -> "StepColumns":
        """List of outputs generated by the `Step`, which are: `prompt`, `prompt_id`, `chosen`,
        `chosen_model`, `chosen_rating`, `rejected`, `rejected_model`, `rejected_rating`. Both
        the `chosen_model` and `rejected_model` being optional and only used if `generation_models`
        is available.

        Reference:
            - Format inspired in https://huggingface.co/datasets/HuggingFaceH4/ultrachat_200k
        """
        return [
            "prompt",
            "prompt_id",
            "chosen",
            "chosen_model",
            "chosen_rating",
            "rejected",
            "rejected_model",
            "rejected_rating",
        ]

    def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
        """The `process` method formats the received `StepInput` or list of `StepInput`
        according to the DPO formatting standard.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with batches of formatted `StepInput` following the DPO standard.
        """
        for input in inputs:
            for item in input:
                messages = [
                    {"role": "user", "content": item["instruction"]},  # type: ignore
                ]
                if (
                    "system_prompt" in item
                    and isinstance(item["system_prompt"], str)  # type: ignore
                    and len(item["system_prompt"]) > 0  # type: ignore
                ):
                    messages.insert(
                        0,
                        {"role": "system", "content": item["system_prompt"]},  # type: ignore
                    )

                item["prompt"] = item["instruction"]
                item["prompt_id"] = hashlib.sha256(
                    item["prompt"].encode("utf-8")  # type: ignore
                ).hexdigest()

                chosen_idx = max(enumerate(item["ratings"]), key=lambda x: x[1])[0]
                item["chosen"] = messages + [
                    {
                        "role": "assistant",
                        "content": item["generations"][chosen_idx],
                    }
                ]
                if "generation_models" in item:
                    item["chosen_model"] = item["generation_models"][chosen_idx]
                item["chosen_rating"] = item["ratings"][chosen_idx]

                rejected_idx = min(enumerate(item["ratings"]), key=lambda x: x[1])[0]
                item["rejected"] = messages + [
                    {
                        "role": "assistant",
                        "content": item["generations"][rejected_idx],
                    }
                ]
                if "generation_models" in item:
                    item["rejected_model"] = item["generation_models"][rejected_idx]
                item["rejected_rating"] = item["ratings"][rejected_idx]

            yield input
inputs: StepColumns property

List of inputs required by the Step, which in this case are: instruction, generations, and ratings.

optional_inputs: List[str] property

List of optional inputs, which are not required by the Step but used if available, which in this case are: system_prompt, and generation_models.

outputs: StepColumns property

List of outputs generated by the Step, which are: prompt, prompt_id, chosen, chosen_model, chosen_rating, rejected, rejected_model, rejected_rating. Both the chosen_model and rejected_model being optional and only used if generation_models is available.

Reference
  • Format inspired in https://huggingface.co/datasets/HuggingFaceH4/ultrachat_200k
process(*inputs)

The process method formats the received StepInput or list of StepInput according to the DPO formatting standard.

Parameters:

Name Type Description Default
*inputs StepInput

A list of StepInput to be combined.

()

Yields:

Type Description
StepOutput

A StepOutput with batches of formatted StepInput following the DPO standard.

Source code in src/distilabel/steps/formatting/dpo.py
def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
    """The `process` method formats the received `StepInput` or list of `StepInput`
    according to the DPO formatting standard.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with batches of formatted `StepInput` following the DPO standard.
    """
    for input in inputs:
        for item in input:
            messages = [
                {"role": "user", "content": item["instruction"]},  # type: ignore
            ]
            if (
                "system_prompt" in item
                and isinstance(item["system_prompt"], str)  # type: ignore
                and len(item["system_prompt"]) > 0  # type: ignore
            ):
                messages.insert(
                    0,
                    {"role": "system", "content": item["system_prompt"]},  # type: ignore
                )

            item["prompt"] = item["instruction"]
            item["prompt_id"] = hashlib.sha256(
                item["prompt"].encode("utf-8")  # type: ignore
            ).hexdigest()

            chosen_idx = max(enumerate(item["ratings"]), key=lambda x: x[1])[0]
            item["chosen"] = messages + [
                {
                    "role": "assistant",
                    "content": item["generations"][chosen_idx],
                }
            ]
            if "generation_models" in item:
                item["chosen_model"] = item["generation_models"][chosen_idx]
            item["chosen_rating"] = item["ratings"][chosen_idx]

            rejected_idx = min(enumerate(item["ratings"]), key=lambda x: x[1])[0]
            item["rejected"] = messages + [
                {
                    "role": "assistant",
                    "content": item["generations"][rejected_idx],
                }
            ]
            if "generation_models" in item:
                item["rejected_model"] = item["generation_models"][rejected_idx]
            item["rejected_rating"] = item["ratings"][rejected_idx]

        yield input

FormatChatGenerationSFT

Bases: Step

Format the output of a ChatGeneration task for Supervised Fine-Tuning (SFT).

FormatChatGenerationSFT is a Step that formats the output of a ChatGeneration task for Supervised Fine-Tuning (SFT) following the standard formatting from frameworks such as axolotl or alignment-handbook. The output of the ChatGeneration task is formatted into a chat-like conversation with the instruction as the user message and the generation as the assistant message. Optionally, if the system_prompt is available, it is included as the first message in the conversation.

Input columns
  • system_prompt (str, optional): The system prompt used within the LLM to generate the generation, if available.
  • instruction (str): The instruction used to generate the generation with the LLM.
  • generation (str): The generation produced by the LLM.
Output columns
  • prompt (str): The instruction used to generate the generation with the LLM.
  • prompt_id (str): The SHA256 hash of the prompt.
  • messages (List[Dict[str, str]]): The chat-like conversation with the instruction as the user message and the generation as the assistant message.
Categories
  • format
  • chat-generation
  • instruction
  • generation

Examples:

Format your dataset for SFT:

from distilabel.steps import FormatChatGenerationSFT

format_sft = FormatChatGenerationSFT()
format_sft.load()

# NOTE: "system_prompt" can be added optionally.
result = next(
    format_sft.process(
        [
            {
                "messages": [{"role": "user", "content": "What's 2+2?"}],
                "generation": "4"
            }
        ]
    )
)
# >>> result
# [
#     {
#         'messages': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
#         'generation': '4',
#         'prompt': 'What's 2+2?',
#         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
#     }
# ]
Source code in src/distilabel/steps/formatting/sft.py
class FormatChatGenerationSFT(Step):
    """Format the output of a `ChatGeneration` task for Supervised Fine-Tuning (SFT).

    `FormatChatGenerationSFT` is a `Step` that formats the output of a `ChatGeneration` task for
    Supervised Fine-Tuning (SFT) following the standard formatting from frameworks such as `axolotl`
    or `alignment-handbook`. The output of the `ChatGeneration` task is formatted into a chat-like
    conversation with the `instruction` as the user message and the `generation` as the assistant
    message. Optionally, if the `system_prompt` is available, it is included as the first message
    in the conversation.

    Input columns:
        - system_prompt (`str`, optional): The system prompt used within the `LLM` to generate the
            `generation`, if available.
        - instruction (`str`): The instruction used to generate the `generation` with the `LLM`.
        - generation (`str`): The generation produced by the `LLM`.

    Output columns:
        - prompt (`str`): The instruction used to generate the `generation` with the `LLM`.
        - prompt_id (`str`): The `SHA256` hash of the `prompt`.
        - messages (`List[Dict[str, str]]`): The chat-like conversation with the `instruction` as
            the user message and the `generation` as the assistant message.

    Categories:
        - format
        - chat-generation
        - instruction
        - generation

    Examples:
        Format your dataset for SFT:

        ```python
        from distilabel.steps import FormatChatGenerationSFT

        format_sft = FormatChatGenerationSFT()
        format_sft.load()

        # NOTE: "system_prompt" can be added optionally.
        result = next(
            format_sft.process(
                [
                    {
                        "messages": [{"role": "user", "content": "What's 2+2?"}],
                        "generation": "4"
                    }
                ]
            )
        )
        # >>> result
        # [
        #     {
        #         'messages': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
        #         'generation': '4',
        #         'prompt': 'What's 2+2?',
        #         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
        #     }
        # ]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """List of inputs required by the `Step`, which in this case are: `instruction`, and `generation`."""
        return ["messages", "generation"]

    @property
    def outputs(self) -> "StepColumns":
        """List of outputs generated by the `Step`, which are: `prompt`, `prompt_id`, `messages`.

        Reference:
            - Format inspired in https://huggingface.co/datasets/HuggingFaceH4/ultrachat_200k
        """
        return ["prompt", "prompt_id", "messages"]

    def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
        """The `process` method formats the received `StepInput` or list of `StepInput`
        according to the SFT formatting standard.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with batches of formatted `StepInput` following the SFT standard.
        """
        for input in inputs:
            for item in input:
                item["prompt"] = next(
                    (
                        turn["content"]
                        for turn in item["messages"]
                        if turn["role"] == "user"
                    ),
                    None,
                )

                item["prompt_id"] = hashlib.sha256(
                    item["prompt"].encode("utf-8")  # type: ignore
                ).hexdigest()

                item["messages"] = item["messages"] + [
                    {"role": "assistant", "content": item["generation"]},  # type: ignore
                ]
            yield input
inputs: StepColumns property

List of inputs required by the Step, which in this case are: instruction, and generation.

outputs: StepColumns property

List of outputs generated by the Step, which are: prompt, prompt_id, messages.

Reference
  • Format inspired in https://huggingface.co/datasets/HuggingFaceH4/ultrachat_200k
process(*inputs)

The process method formats the received StepInput or list of StepInput according to the SFT formatting standard.

Parameters:

Name Type Description Default
*inputs StepInput

A list of StepInput to be combined.

()

Yields:

Type Description
StepOutput

A StepOutput with batches of formatted StepInput following the SFT standard.

Source code in src/distilabel/steps/formatting/sft.py
def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
    """The `process` method formats the received `StepInput` or list of `StepInput`
    according to the SFT formatting standard.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with batches of formatted `StepInput` following the SFT standard.
    """
    for input in inputs:
        for item in input:
            item["prompt"] = next(
                (
                    turn["content"]
                    for turn in item["messages"]
                    if turn["role"] == "user"
                ),
                None,
            )

            item["prompt_id"] = hashlib.sha256(
                item["prompt"].encode("utf-8")  # type: ignore
            ).hexdigest()

            item["messages"] = item["messages"] + [
                {"role": "assistant", "content": item["generation"]},  # type: ignore
            ]
        yield input

FormatTextGenerationSFT

Bases: Step

Format the output of a TextGeneration task for Supervised Fine-Tuning (SFT).

FormatTextGenerationSFT is a Step that formats the output of a TextGeneration task for Supervised Fine-Tuning (SFT) following the standard formatting from frameworks such as axolotl or alignment-handbook. The output of the TextGeneration task is formatted into a chat-like conversation with the instruction as the user message and the generation as the assistant message. Optionally, if the system_prompt is available, it is included as the first message in the conversation.

Input columns
  • system_prompt (str, optional): The system prompt used within the LLM to generate the generation, if available.
  • instruction (str): The instruction used to generate the generation with the LLM.
  • generation (str): The generation produced by the LLM.
Output columns
  • prompt (str): The instruction used to generate the generation with the LLM.
  • prompt_id (str): The SHA256 hash of the prompt.
  • messages (List[Dict[str, str]]): The chat-like conversation with the instruction as the user message and the generation as the assistant message.
Categories
  • format
  • text-generation
  • instruction
  • generation

Examples:

Format your dataset for SFT fine tuning:

from distilabel.steps import FormatTextGenerationSFT

format_sft = FormatTextGenerationSFT()
format_sft.load()

# NOTE: "system_prompt" can be added optionally.
result = next(
    format_sft.process(
        [
            {
                "instruction": "What's 2+2?",
                "generation": "4"
            }
        ]
    )
)
# >>> result
# [
#     {
#         'instruction': 'What's 2+2?',
#         'generation': '4',
#         'prompt': 'What's 2+2?',
#         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
#         'messages': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}]
#     }
# ]
Source code in src/distilabel/steps/formatting/sft.py
class FormatTextGenerationSFT(Step):
    """Format the output of a `TextGeneration` task for Supervised Fine-Tuning (SFT).

    `FormatTextGenerationSFT` is a `Step` that formats the output of a `TextGeneration` task for
    Supervised Fine-Tuning (SFT) following the standard formatting from frameworks such as `axolotl`
    or `alignment-handbook`. The output of the `TextGeneration` task is formatted into a chat-like
    conversation with the `instruction` as the user message and the `generation` as the assistant
    message. Optionally, if the `system_prompt` is available, it is included as the first message
    in the conversation.

    Input columns:
        - system_prompt (`str`, optional): The system prompt used within the `LLM` to generate the
            `generation`, if available.
        - instruction (`str`): The instruction used to generate the `generation` with the `LLM`.
        - generation (`str`): The generation produced by the `LLM`.

    Output columns:
        - prompt (`str`): The instruction used to generate the `generation` with the `LLM`.
        - prompt_id (`str`): The `SHA256` hash of the `prompt`.
        - messages (`List[Dict[str, str]]`): The chat-like conversation with the `instruction` as
            the user message and the `generation` as the assistant message.

    Categories:
        - format
        - text-generation
        - instruction
        - generation

    Examples:
        Format your dataset for SFT fine tuning:

        ```python
        from distilabel.steps import FormatTextGenerationSFT

        format_sft = FormatTextGenerationSFT()
        format_sft.load()

        # NOTE: "system_prompt" can be added optionally.
        result = next(
            format_sft.process(
                [
                    {
                        "instruction": "What's 2+2?",
                        "generation": "4"
                    }
                ]
            )
        )
        # >>> result
        # [
        #     {
        #         'instruction': 'What's 2+2?',
        #         'generation': '4',
        #         'prompt': 'What's 2+2?',
        #         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
        #         'messages': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}]
        #     }
        # ]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """List of inputs required by the `Step`, which in this case are: `instruction`, and `generation`."""
        return {
            "system_prompt": False,
            "instruction": True,
            "generation": True,
        }

    @property
    def optional_inputs(self) -> List[str]:
        """List of optional inputs, which are not required by the `Step` but used if available,
        which in this case is: `system_prompt`."""
        return ["system_prompt"]

    @property
    def outputs(self) -> "StepColumns":
        """List of outputs generated by the `Step`, which are: `prompt`, `prompt_id`, `messages`.

        Reference:
            - Format inspired in https://huggingface.co/datasets/HuggingFaceH4/ultrachat_200k
        """
        return ["prompt", "prompt_id", "messages"]

    def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
        """The `process` method formats the received `StepInput` or list of `StepInput`
        according to the SFT formatting standard.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with batches of formatted `StepInput` following the SFT standard.
        """
        for input in inputs:
            for item in input:
                item["prompt"] = item["instruction"]

                item["prompt_id"] = hashlib.sha256(
                    item["prompt"].encode("utf-8")  # type: ignore
                ).hexdigest()

                item["messages"] = [
                    {"role": "user", "content": item["instruction"]},  # type: ignore
                    {"role": "assistant", "content": item["generation"]},  # type: ignore
                ]
                if (
                    "system_prompt" in item
                    and isinstance(item["system_prompt"], str)  # type: ignore
                    and len(item["system_prompt"]) > 0  # type: ignore
                ):
                    item["messages"].insert(
                        0,
                        {"role": "system", "content": item["system_prompt"]},  # type: ignore
                    )

            yield input
inputs: StepColumns property

List of inputs required by the Step, which in this case are: instruction, and generation.

optional_inputs: List[str] property

List of optional inputs, which are not required by the Step but used if available, which in this case is: system_prompt.

outputs: StepColumns property

List of outputs generated by the Step, which are: prompt, prompt_id, messages.

Reference
  • Format inspired in https://huggingface.co/datasets/HuggingFaceH4/ultrachat_200k
process(*inputs)

The process method formats the received StepInput or list of StepInput according to the SFT formatting standard.

Parameters:

Name Type Description Default
*inputs StepInput

A list of StepInput to be combined.

()

Yields:

Type Description
StepOutput

A StepOutput with batches of formatted StepInput following the SFT standard.

Source code in src/distilabel/steps/formatting/sft.py
def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
    """The `process` method formats the received `StepInput` or list of `StepInput`
    according to the SFT formatting standard.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with batches of formatted `StepInput` following the SFT standard.
    """
    for input in inputs:
        for item in input:
            item["prompt"] = item["instruction"]

            item["prompt_id"] = hashlib.sha256(
                item["prompt"].encode("utf-8")  # type: ignore
            ).hexdigest()

            item["messages"] = [
                {"role": "user", "content": item["instruction"]},  # type: ignore
                {"role": "assistant", "content": item["generation"]},  # type: ignore
            ]
            if (
                "system_prompt" in item
                and isinstance(item["system_prompt"], str)  # type: ignore
                and len(item["system_prompt"]) > 0  # type: ignore
            ):
                item["messages"].insert(
                    0,
                    {"role": "system", "content": item["system_prompt"]},  # type: ignore
                )

        yield input

LoadDataFromDicts

Bases: GeneratorStep

Loads a dataset from a list of dictionaries.

GeneratorStep that loads a dataset from a list of dictionaries and yields it in batches.

Attributes:

Name Type Description
data List[Dict[str, Any]]

The list of dictionaries to load the data from.

Runtime parameters
  • batch_size: The batch size to use when processing the data.
Output columns
  • dynamic (based on the keys found on the first dictionary of the list): The columns of the dataset.
Categories
  • load

Examples:

Load data from a list of dictionaries:

from distilabel.steps import LoadDataFromDicts

loader = LoadDataFromDicts(
    data=[{"instruction": "What are 2+2?"}] * 5,
    batch_size=2
)
loader.load()

result = next(loader.process())
# >>> result
# ([{'instruction': 'What are 2+2?'}, {'instruction': 'What are 2+2?'}], False)
Source code in src/distilabel/steps/generators/data.py
class LoadDataFromDicts(GeneratorStep):
    """Loads a dataset from a list of dictionaries.

    `GeneratorStep` that loads a dataset from a list of dictionaries and yields it in
    batches.

    Attributes:
        data: The list of dictionaries to load the data from.

    Runtime parameters:
        - `batch_size`: The batch size to use when processing the data.

    Output columns:
        - dynamic (based on the keys found on the first dictionary of the list): The columns
            of the dataset.

    Categories:
        - load

    Examples:
        Load data from a list of dictionaries:

        ```python
        from distilabel.steps import LoadDataFromDicts

        loader = LoadDataFromDicts(
            data=[{"instruction": "What are 2+2?"}] * 5,
            batch_size=2
        )
        loader.load()

        result = next(loader.process())
        # >>> result
        # ([{'instruction': 'What are 2+2?'}, {'instruction': 'What are 2+2?'}], False)
        ```
    """

    data: List[Dict[str, Any]] = Field(default_factory=list, exclude=True)

    @override
    def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
        """Yields batches from a list of dictionaries.

        Args:
            offset: The offset to start the generation from. Defaults to `0`.

        Yields:
            A list of Python dictionaries as read from the inputs (propagated in batches)
            and a flag indicating whether the yield batch is the last one.
        """
        if offset:
            self.data = self.data[offset:]

        while self.data:
            batch = self.data[: self.batch_size]
            self.data = self.data[self.batch_size :]
            yield (
                batch,
                True if len(self.data) == 0 else False,
            )

    @property
    def outputs(self) -> List[str]:
        """Returns a list of strings with the names of the columns that the step will generate."""
        return list(self.data[0].keys())
outputs: List[str] property

Returns a list of strings with the names of the columns that the step will generate.

process(offset=0)

Yields batches from a list of dictionaries.

Parameters:

Name Type Description Default
offset int

The offset to start the generation from. Defaults to 0.

0

Yields:

Type Description
GeneratorStepOutput

A list of Python dictionaries as read from the inputs (propagated in batches)

GeneratorStepOutput

and a flag indicating whether the yield batch is the last one.

Source code in src/distilabel/steps/generators/data.py
@override
def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
    """Yields batches from a list of dictionaries.

    Args:
        offset: The offset to start the generation from. Defaults to `0`.

    Yields:
        A list of Python dictionaries as read from the inputs (propagated in batches)
        and a flag indicating whether the yield batch is the last one.
    """
    if offset:
        self.data = self.data[offset:]

    while self.data:
        batch = self.data[: self.batch_size]
        self.data = self.data[self.batch_size :]
        yield (
            batch,
            True if len(self.data) == 0 else False,
        )

RewardModelScore

Bases: Step, CudaDevicePlacementMixin

Assign a score to a response using a Reward Model.

RewardModelScore is a Step that using a Reward Model (RM) loaded using transformers, assigns an score to a response generated for an instruction, or a score to a multi-turn conversation.

Attributes:

Name Type Description
model str

the model Hugging Face Hub repo id or a path to a directory containing the model weights and configuration files.

revision str

if model refers to a Hugging Face Hub repository, then the revision (e.g. a branch name or a commit id) to use. Defaults to "main".

torch_dtype str

the torch dtype to use for the model e.g. "float16", "float32", etc. Defaults to "auto".

trust_remote_code bool

whether to allow fetching and executing remote code fetched from the repository in the Hub. Defaults to False.

device_map Union[str, Dict[str, Any], None]

a dictionary mapping each layer of the model to a device, or a mode like "sequential" or "auto". Defaults to None.

token Union[SecretStr, None]

the Hugging Face Hub token that will be used to authenticate to the Hugging Face Hub. If not provided, the HF_TOKEN environment or huggingface_hub package local configuration will be used. Defaults to None.

truncation bool

whether to truncate sequences at the maximum length. Defaults to False.

max_length Union[int, None]

maximun length to use for padding or truncation. Defaults to None.

Input columns
  • instruction (str, optional): the instruction used to generate a response. If provided, then response must be provided too.
  • response (str, optional): the response generated for instruction. If provided, then instruction must be provide too.
  • conversation (ChatType, optional): a multi-turn conversation. If not provided, then instruction and response columns must be provided.
Output columns
  • score (float): the score given by the reward model for the instruction-response pair or the conversation.
Categories
  • scorer

Examples:

Assigning an score for an instruction-response pair:

from distilabel.steps import RewardModelScore

step = RewardModelScore(
    model="RLHFlow/ArmoRM-Llama3-8B-v0.1", device_map="auto", trust_remote_code=True
)

step.load()

result = next(
    step.process(
        inputs=[
            {
                "instruction": "How much is 2+2?",
                "response": "The output of 2+2 is 4",
            },
            {"instruction": "How much is 2+2?", "response": "4"},
        ]
    )
)
# [
#   {'instruction': 'How much is 2+2?', 'response': 'The output of 2+2 is 4', 'score': 0.11690367758274078},
#   {'instruction': 'How much is 2+2?', 'response': '4', 'score': 0.10300665348768234}
# ]

Assigning an score for a multi-turn conversation:

from distilabel.steps import RewardModelScore

step = RewardModelScore(
    model="RLHFlow/ArmoRM-Llama3-8B-v0.1", device_map="auto", trust_remote_code=True
)

step.load()

result = next(
    step.process(
        inputs=[
            {
                "conversation": [
                    {"role": "user", "content": "How much is 2+2?"},
                    {"role": "assistant", "content": "The output of 2+2 is 4"},
                ],
            },
            {
                "conversation": [
                    {"role": "user", "content": "How much is 2+2?"},
                    {"role": "assistant", "content": "4"},
                ],
            },
        ]
    )
)
# [
#   {'conversation': [{'role': 'user', 'content': 'How much is 2+2?'}, {'role': 'assistant', 'content': 'The output of 2+2 is 4'}], 'score': 0.11690367758274078},
#   {'conversation': [{'role': 'user', 'content': 'How much is 2+2?'}, {'role': 'assistant', 'content': '4'}], 'score': 0.10300665348768234}
# ]
Source code in src/distilabel/steps/reward_model.py
class RewardModelScore(Step, CudaDevicePlacementMixin):
    """Assign a score to a response using a Reward Model.

    `RewardModelScore` is a `Step` that using a Reward Model (RM) loaded using `transformers`,
    assigns an score to a response generated for an instruction, or a score to a multi-turn
    conversation.

    Attributes:
        model: the model Hugging Face Hub repo id or a path to a directory containing the
            model weights and configuration files.
        revision: if `model` refers to a Hugging Face Hub repository, then the revision
            (e.g. a branch name or a commit id) to use. Defaults to `"main"`.
        torch_dtype: the torch dtype to use for the model e.g. "float16", "float32", etc.
            Defaults to `"auto"`.
        trust_remote_code: whether to allow fetching and executing remote code fetched
            from the repository in the Hub. Defaults to `False`.
        device_map: a dictionary mapping each layer of the model to a device, or a mode like `"sequential"` or `"auto"`. Defaults to `None`.
        token: the Hugging Face Hub token that will be used to authenticate to the Hugging
            Face Hub. If not provided, the `HF_TOKEN` environment or `huggingface_hub` package
            local configuration will be used. Defaults to `None`.
        truncation: whether to truncate sequences at the maximum length. Defaults to `False`.
        max_length: maximun length to use for padding or truncation. Defaults to `None`.

    Input columns:
        - instruction (`str`, optional): the instruction used to generate a `response`.
            If provided, then `response` must be provided too.
        - response (`str`, optional): the response generated for `instruction`. If provided,
            then `instruction` must be provide too.
        - conversation (`ChatType`, optional): a multi-turn conversation. If not provided,
            then `instruction` and `response` columns must be provided.

    Output columns:
        - score (`float`): the score given by the reward model for the instruction-response
            pair or the conversation.

    Categories:
        - scorer

    Examples:
        Assigning an score for an instruction-response pair:

        ```python
        from distilabel.steps import RewardModelScore

        step = RewardModelScore(
            model="RLHFlow/ArmoRM-Llama3-8B-v0.1", device_map="auto", trust_remote_code=True
        )

        step.load()

        result = next(
            step.process(
                inputs=[
                    {
                        "instruction": "How much is 2+2?",
                        "response": "The output of 2+2 is 4",
                    },
                    {"instruction": "How much is 2+2?", "response": "4"},
                ]
            )
        )
        # [
        #   {'instruction': 'How much is 2+2?', 'response': 'The output of 2+2 is 4', 'score': 0.11690367758274078},
        #   {'instruction': 'How much is 2+2?', 'response': '4', 'score': 0.10300665348768234}
        # ]
        ```

        Assigning an score for a multi-turn conversation:

        ```python
        from distilabel.steps import RewardModelScore

        step = RewardModelScore(
            model="RLHFlow/ArmoRM-Llama3-8B-v0.1", device_map="auto", trust_remote_code=True
        )

        step.load()

        result = next(
            step.process(
                inputs=[
                    {
                        "conversation": [
                            {"role": "user", "content": "How much is 2+2?"},
                            {"role": "assistant", "content": "The output of 2+2 is 4"},
                        ],
                    },
                    {
                        "conversation": [
                            {"role": "user", "content": "How much is 2+2?"},
                            {"role": "assistant", "content": "4"},
                        ],
                    },
                ]
            )
        )
        # [
        #   {'conversation': [{'role': 'user', 'content': 'How much is 2+2?'}, {'role': 'assistant', 'content': 'The output of 2+2 is 4'}], 'score': 0.11690367758274078},
        #   {'conversation': [{'role': 'user', 'content': 'How much is 2+2?'}, {'role': 'assistant', 'content': '4'}], 'score': 0.10300665348768234}
        # ]
        ```
    """

    model: str
    revision: str = "main"
    torch_dtype: str = "auto"
    trust_remote_code: bool = False
    device_map: Union[str, Dict[str, Any], None] = None
    token: Union[SecretStr, None] = Field(
        default_factory=lambda: os.getenv(HF_TOKEN_ENV_VAR), description=""
    )
    truncation: bool = False
    max_length: Union[int, None] = None

    _model: Union["PreTrainedModel", None] = PrivateAttr(None)
    _tokenizer: Union["PreTrainedTokenizer", None] = PrivateAttr(None)

    def load(self) -> None:
        super().load()

        if self.device_map in ["cuda", "auto"]:
            CudaDevicePlacementMixin.load(self)

        try:
            from transformers import AutoModelForSequenceClassification, AutoTokenizer
        except ImportError as e:
            raise ImportError(
                "`transformers` is not installed. Please install it using `pip install transformers`."
            ) from e

        token = self.token.get_secret_value() if self.token is not None else self.token

        self._model = AutoModelForSequenceClassification.from_pretrained(
            self.model,
            revision=self.revision,
            torch_dtype=self.torch_dtype,
            trust_remote_code=self.trust_remote_code,
            device_map=self.device_map,
            token=token,
        )
        self._tokenizer = AutoTokenizer.from_pretrained(
            self.model,
            revision=self.revision,
            torch_dtype=self.torch_dtype,
            trust_remote_code=self.trust_remote_code,
            token=token,
        )

    @property
    def inputs(self) -> "StepColumns":
        """Either `response` and `instruction`, or a `conversation` columns."""
        return {
            "response": False,
            "instruction": False,
            "conversation": False,
        }

    @property
    def outputs(self) -> "StepColumns":
        """The `score` given by the reward model."""
        return ["score"]

    def _prepare_conversation(self, input: Dict[str, Any]) -> "ChatType":
        if "instruction" in input and "response" in input:
            return [
                {"role": "user", "content": input["instruction"]},
                {"role": "assistant", "content": input["response"]},
            ]

        return input["conversation"]

    def _prepare_inputs(self, inputs: List[Dict[str, Any]]) -> "torch.Tensor":
        return self._tokenizer.apply_chat_template(  # type: ignore
            [self._prepare_conversation(input) for input in inputs],  # type: ignore
            return_tensors="pt",
            padding=True,
            truncation=self.truncation,
            max_length=self.max_length,
        ).to(self._model.device)  # type: ignore

    def _inference(self, inputs: List[Dict[str, Any]]) -> List[float]:
        import torch

        input_ids = self._prepare_inputs(inputs)
        with torch.no_grad():
            output = self._model(input_ids)  # type: ignore
            logits = output.logits
            if logits.shape == (2, 1):
                logits = logits.squeeze(-1)
            return logits.tolist()

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        scores = self._inference(inputs)
        for input, score in zip(inputs, scores):
            input["score"] = score
        yield inputs

    def unload(self) -> None:
        if self.device_map in ["cuda", "auto"]:
            CudaDevicePlacementMixin.unload(self)
        super().unload()
inputs: StepColumns property

Either response and instruction, or a conversation columns.

outputs: StepColumns property

The score given by the reward model.

TruncateTextColumn

Bases: Step

Truncate a row using a tokenizer or the number of characters.

TruncateTextColumn is a Step that truncates a row according to the max length. If the tokenizer is provided, then the row will be truncated using the tokenizer, and the max_length will be used as the maximum number of tokens, otherwise it will be used as the maximum number of characters. The TruncateTextColumn step is useful when one wants to truncate a row to a certain length, to avoid posterior errors in the model due to the length.

Attributes:

Name Type Description
column str

the column to truncate. Defaults to "text".

max_length int

the maximum length to use for truncation. If a tokenizer is given, corresponds to the number of tokens, otherwise corresponds to the number of characters. Defaults to 8192.

tokenizer Optional[str]

the name of the tokenizer to use. If provided, the row will be truncated using the tokenizer. Defaults to None.

Input columns
  • dynamic (determined by column attribute): The columns to be truncated, defaults to "text".
Output columns
  • dynamic (determined by column attribute): The truncated column.
Categories
  • text-manipulation

Examples:

Truncating a row to a given number of tokens:

from distilabel.steps import TruncateTextColumn

trunc = TruncateTextColumn(
    tokenizer="meta-llama/Meta-Llama-3.1-70B-Instruct",
    max_length=4,
    column="text"
)

trunc.load()

result = next(
    trunc.process(
        [
            {"text": "This is a sample text that is longer than 10 characters"}
        ]
    )
)
# result
# [{'text': 'This is a sample'}]

Truncating a row to a given number of characters:

from distilabel.steps import TruncateTextColumn

trunc = TruncateTextColumn(max_length=10)

trunc.load()

result = next(
    trunc.process(
        [
            {"text": "This is a sample text that is longer than 10 characters"}
        ]
    )
)
# result
# [{'text': 'This is a '}]
Source code in src/distilabel/steps/truncate.py
class TruncateTextColumn(Step):
    """Truncate a row using a tokenizer or the number of characters.

    `TruncateTextColumn` is a `Step` that truncates a row according to the max length. If
    the `tokenizer` is provided, then the row will be truncated using the tokenizer,
    and the `max_length` will be used as the maximum number of tokens, otherwise it will
    be used as the maximum number of characters. The `TruncateTextColumn` step is useful when one
    wants to truncate a row to a certain length, to avoid posterior errors in the model due
    to the length.

    Attributes:
        column: the column to truncate. Defaults to `"text"`.
        max_length: the maximum length to use for truncation.
            If a `tokenizer` is given, corresponds to the number of tokens,
            otherwise corresponds to the number of characters. Defaults to `8192`.
        tokenizer: the name of the tokenizer to use. If provided, the row will be
            truncated using the tokenizer. Defaults to `None`.

    Input columns:
        - dynamic (determined by `column` attribute): The columns to be truncated, defaults to "text".

    Output columns:
        - dynamic (determined by `column` attribute): The truncated column.

    Categories:
        - text-manipulation

    Examples:
        Truncating a row to a given number of tokens:

        ```python
        from distilabel.steps import TruncateTextColumn

        trunc = TruncateTextColumn(
            tokenizer="meta-llama/Meta-Llama-3.1-70B-Instruct",
            max_length=4,
            column="text"
        )

        trunc.load()

        result = next(
            trunc.process(
                [
                    {"text": "This is a sample text that is longer than 10 characters"}
                ]
            )
        )
        # result
        # [{'text': 'This is a sample'}]
        ```

        Truncating a row to a given number of characters:

        ```python
        from distilabel.steps import TruncateTextColumn

        trunc = TruncateTextColumn(max_length=10)

        trunc.load()

        result = next(
            trunc.process(
                [
                    {"text": "This is a sample text that is longer than 10 characters"}
                ]
            )
        )
        # result
        # [{'text': 'This is a '}]
        ```
    """

    column: str = "text"
    max_length: int = 8192
    tokenizer: Optional[str] = None
    _truncator: Optional[Callable[[str], str]] = None
    _tokenizer: Optional[Any] = None

    def load(self):
        super().load()
        if self.tokenizer:
            if not importlib.util.find_spec("transformers"):
                raise ImportError(
                    "`transformers` is needed to tokenize, but is not installed. "
                    "Please install it using `pip install transformers`."
                )

            from transformers import AutoTokenizer

            self._tokenizer = AutoTokenizer.from_pretrained(self.tokenizer)
            self._truncator = self._truncate_with_tokenizer
        else:
            self._truncator = self._truncate_with_length

    @property
    def inputs(self) -> List[str]:
        return [self.column]

    @property
    def outputs(self) -> List[str]:
        return self.inputs

    def _truncate_with_length(self, text: str) -> str:
        """Truncates the text according to the number of characters."""
        return text[: self.max_length]

    def _truncate_with_tokenizer(self, text: str) -> str:
        """Truncates the text according to the number of characters using the tokenizer."""
        return self._tokenizer.decode(
            self._tokenizer.encode(
                text,
                add_special_tokens=False,
                max_length=self.max_length,
                truncation=True,
            )
        )

    @override
    def process(self, inputs: StepInput) -> "StepOutput":
        for input in inputs:
            input[self.column] = self._truncator(input[self.column])
        yield inputs
_truncate_with_length(text)

Truncates the text according to the number of characters.

Source code in src/distilabel/steps/truncate.py
def _truncate_with_length(self, text: str) -> str:
    """Truncates the text according to the number of characters."""
    return text[: self.max_length]
_truncate_with_tokenizer(text)

Truncates the text according to the number of characters using the tokenizer.

Source code in src/distilabel/steps/truncate.py
def _truncate_with_tokenizer(self, text: str) -> str:
    """Truncates the text according to the number of characters using the tokenizer."""
    return self._tokenizer.decode(
        self._tokenizer.encode(
            text,
            add_special_tokens=False,
            max_length=self.max_length,
            truncation=True,
        )
    )