Skip to content

Extra

LoadDataFromDicts

Bases: GeneratorStep

Loads a dataset from a list of dictionaries.

GeneratorStep that loads a dataset from a list of dictionaries and yields it in batches.

Attributes:

Name Type Description
data List[Dict[str, Any]]

The list of dictionaries to load the data from.

Runtime parameters
  • batch_size: The batch size to use when processing the data.
Output columns
  • dynamic (based on the keys found on the first dictionary of the list): The columns of the dataset.
Categories
  • load

Examples:

Load data from a list of dictionaries:

```python
from distilabel.steps import LoadDataFromDicts

loader = LoadDataFromDicts(
    data=[{"instruction": "What are 2+2?"}] * 5,
    batch_size=2
)
loader.load()

result = next(loader.process())
# >>> result
# ([{'instruction': 'What are 2+2?'}, {'instruction': 'What are 2+2?'}], False)
```
Source code in src/distilabel/steps/generators/data.py
class LoadDataFromDicts(GeneratorStep):
    """Loads a dataset from a list of dictionaries.

    `GeneratorStep` that loads a dataset from a list of dictionaries and yields it in
    batches.

    Attributes:
        data: The list of dictionaries to load the data from.

    Runtime parameters:
        - `batch_size`: The batch size to use when processing the data.

    Output columns:
        - dynamic (based on the keys found on the first dictionary of the list): The columns
            of the dataset.

    Categories:
        - load

    Examples:

        Load data from a list of dictionaries:

        ```python
        from distilabel.steps import LoadDataFromDicts

        loader = LoadDataFromDicts(
            data=[{"instruction": "What are 2+2?"}] * 5,
            batch_size=2
        )
        loader.load()

        result = next(loader.process())
        # >>> result
        # ([{'instruction': 'What are 2+2?'}, {'instruction': 'What are 2+2?'}], False)
        ```
    """

    data: List[Dict[str, Any]]

    @override
    def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
        """Yields batches from a list of dictionaries.

        Args:
            offset: The offset to start the generation from. Defaults to `0`.

        Yields:
            A list of Python dictionaries as read from the inputs (propagated in batches)
            and a flag indicating whether the yield batch is the last one.
        """
        if offset:
            self.data = self.data[offset:]

        while self.data:
            batch = self.data[: self.batch_size]
            self.data = self.data[self.batch_size :]
            yield (
                batch,
                True if len(self.data) == 0 else False,
            )

    @property
    def outputs(self) -> List[str]:
        """Returns a list of strings with the names of the columns that the step will generate."""
        return list(self.data[0].keys())

outputs: List[str] property

Returns a list of strings with the names of the columns that the step will generate.

process(offset=0)

Yields batches from a list of dictionaries.

Parameters:

Name Type Description Default
offset int

The offset to start the generation from. Defaults to 0.

0

Yields:

Type Description
GeneratorStepOutput

A list of Python dictionaries as read from the inputs (propagated in batches)

GeneratorStepOutput

and a flag indicating whether the yield batch is the last one.

Source code in src/distilabel/steps/generators/data.py
@override
def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
    """Yields batches from a list of dictionaries.

    Args:
        offset: The offset to start the generation from. Defaults to `0`.

    Yields:
        A list of Python dictionaries as read from the inputs (propagated in batches)
        and a flag indicating whether the yield batch is the last one.
    """
    if offset:
        self.data = self.data[offset:]

    while self.data:
        batch = self.data[: self.batch_size]
        self.data = self.data[self.batch_size :]
        yield (
            batch,
            True if len(self.data) == 0 else False,
        )

DeitaFiltering

Bases: GlobalStep

Filter dataset rows using DEITA filtering strategy.

Filter the dataset based on the DEITA score and the cosine distance between the embeddings. It's an implementation of the filtering step from the paper 'What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning'.

Attributes:

Name Type Description
data_budget RuntimeParameter[int]

The desired size of the dataset after filtering.

diversity_threshold RuntimeParameter[float]

If a row has a cosine distance with respect to it's nearest neighbor greater than this value, it will be included in the filtered dataset. Defaults to 0.9.

normalize_embeddings RuntimeParameter[bool]

Whether to normalize the embeddings before computing the cosine distance. Defaults to True.

Runtime parameters
  • data_budget: The desired size of the dataset after filtering.
  • diversity_threshold: If a row has a cosine distance with respect to it's nearest neighbor greater than this value, it will be included in the filtered dataset.
Input columns
  • evol_instruction_score (float): The score of the instruction generated by ComplexityScorer step.
  • evol_response_score (float): The score of the response generated by QualityScorer step.
  • embedding (List[float]): The embedding generated for the conversation of the instruction-response pair using GenerateEmbeddings step.
Output columns
  • deita_score (float): The DEITA score for the instruction-response pair.
  • deita_score_computed_with (List[str]): The scores used to compute the DEITA score.
  • nearest_neighbor_distance (float): The cosine distance between the embeddings of the instruction-response pair.
Categories
  • filtering
References

Examples:

Filter the dataset based on the DEITA score and the cosine distance between the embeddings:

```python
from distilabel.steps import DeitaFiltering

deita_filtering = DeitaFiltering(data_budget=1)

deita_filtering.load()

result = next(
    deita_filtering.process(
        [
            {
                "evol_instruction_score": 0.5,
                "evol_response_score": 0.5,
                "embedding": [-8.12729941, -5.24642847, -6.34003029],
            },
            {
                "evol_instruction_score": 0.6,
                "evol_response_score": 0.6,
                "embedding": [2.99329242, 0.7800932, 0.7799726],
            },
            {
                "evol_instruction_score": 0.7,
                "evol_response_score": 0.7,
                "embedding": [10.29041806, 14.33088073, 13.00557506],
            },
        ],
    )
)
# >>> result
# [{'evol_instruction_score': 0.5, 'evol_response_score': 0.5, 'embedding': [-8.12729941, -5.24642847, -6.34003029], 'deita_score': 0.25, 'deita_score_computed_with': ['evol_instruction_score', 'evol_response_score'], 'nearest_neighbor_distance': 1.9042812683723933}]
```
Source code in src/distilabel/steps/deita.py
class DeitaFiltering(GlobalStep):
    """Filter dataset rows using DEITA filtering strategy.

    Filter the dataset based on the DEITA score and the cosine distance between the embeddings.
    It's an implementation of the filtering step from the paper 'What Makes Good Data
    for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning'.

    Attributes:
        data_budget: The desired size of the dataset after filtering.
        diversity_threshold: If a row has a cosine distance with respect to it's nearest
            neighbor greater than this value, it will be included in the filtered dataset.
            Defaults to `0.9`.
        normalize_embeddings: Whether to normalize the embeddings before computing the cosine
            distance. Defaults to `True`.

    Runtime parameters:
        - `data_budget`: The desired size of the dataset after filtering.
        - `diversity_threshold`: If a row has a cosine distance with respect to it's nearest
            neighbor greater than this value, it will be included in the filtered dataset.

    Input columns:
        - evol_instruction_score (`float`): The score of the instruction generated by
            `ComplexityScorer` step.
        - evol_response_score (`float`): The score of the response generated by
            `QualityScorer` step.
        - embedding (`List[float]`): The embedding generated for the conversation of the
            instruction-response pair using `GenerateEmbeddings` step.

    Output columns:
        - deita_score (`float`): The DEITA score for the instruction-response pair.
        - deita_score_computed_with (`List[str]`): The scores used to compute the DEITA
            score.
        - nearest_neighbor_distance (`float`): The cosine distance between the embeddings
            of the instruction-response pair.

    Categories:
        - filtering

    References:
        - [`What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning`](https://arxiv.org/abs/2312.15685)

    Examples:

        Filter the dataset based on the DEITA score and the cosine distance between the embeddings:

        ```python
        from distilabel.steps import DeitaFiltering

        deita_filtering = DeitaFiltering(data_budget=1)

        deita_filtering.load()

        result = next(
            deita_filtering.process(
                [
                    {
                        "evol_instruction_score": 0.5,
                        "evol_response_score": 0.5,
                        "embedding": [-8.12729941, -5.24642847, -6.34003029],
                    },
                    {
                        "evol_instruction_score": 0.6,
                        "evol_response_score": 0.6,
                        "embedding": [2.99329242, 0.7800932, 0.7799726],
                    },
                    {
                        "evol_instruction_score": 0.7,
                        "evol_response_score": 0.7,
                        "embedding": [10.29041806, 14.33088073, 13.00557506],
                    },
                ],
            )
        )
        # >>> result
        # [{'evol_instruction_score': 0.5, 'evol_response_score': 0.5, 'embedding': [-8.12729941, -5.24642847, -6.34003029], 'deita_score': 0.25, 'deita_score_computed_with': ['evol_instruction_score', 'evol_response_score'], 'nearest_neighbor_distance': 1.9042812683723933}]
        ```
    """

    data_budget: RuntimeParameter[int] = Field(
        default=None, description="The desired size of the dataset after filtering."
    )
    diversity_threshold: RuntimeParameter[float] = Field(
        default=0.9,
        description="If a row has a cosine distance with respect to it's nearest neighbor"
        " greater than this value, it will be included in the filtered dataset.",
    )
    normalize_embeddings: RuntimeParameter[bool] = Field(
        default=True,
        description="Whether to normalize the embeddings before computing the cosine distance.",
    )
    distance_metric: RuntimeParameter[Literal["cosine", "manhattan"]] = Field(
        default="cosine",
        description="The distance metric to use. Currently only 'cosine' is supported.",
    )

    @property
    def inputs(self) -> List[str]:
        return ["evol_instruction_score", "evol_response_score", "embedding"]

    @property
    def outputs(self) -> List[str]:
        return ["deita_score", "nearest_neighbor_distance", "deita_score_computed_with"]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Filter the dataset based on the DEITA score and the cosine distance between the
        embeddings.

        Args:
            inputs: The input data.

        Returns:
            The filtered dataset.
        """
        inputs = self._compute_deita_score(inputs)
        inputs = self._compute_nearest_neighbor(inputs)
        inputs.sort(key=lambda x: x["deita_score"], reverse=True)

        selected_rows = []
        for input in inputs:
            if len(selected_rows) >= self.data_budget:  # type: ignore
                break
            if input["nearest_neighbor_distance"] >= self.diversity_threshold:
                selected_rows.append(input)
        yield selected_rows

    def _compute_deita_score(self, inputs: StepInput) -> StepInput:
        """Computes the DEITA score for each instruction-response pair. The DEITA score is
        the product of the instruction score and the response score.

        Args:
            inputs: The input data.

        Returns:
            The input data with the DEITA score computed.
        """
        for input_ in inputs:
            evol_instruction_score = input_.get("evol_instruction_score")
            evol_response_score = input_.get("evol_response_score")

            if evol_instruction_score and evol_response_score:
                deita_score = evol_instruction_score * evol_response_score
                score_computed_with = ["evol_instruction_score", "evol_response_score"]
            elif evol_instruction_score:
                self._logger.warning(
                    "Response score is missing for the instruction-response pair. Using"
                    " instruction score as DEITA score."
                )
                deita_score = evol_instruction_score
                score_computed_with = ["evol_instruction_score"]
            elif evol_response_score:
                self._logger.warning(
                    "Instruction score is missing for the instruction-response pair. Using"
                    " response score as DEITA score."
                )
                deita_score = evol_response_score
                score_computed_with = ["evol_response_score"]
            else:
                self._logger.warning(
                    "Instruction and response scores are missing for the instruction-response"
                    " pair. Setting DEITA score to 0."
                )
                deita_score = 0
                score_computed_with = []

            input_.update(
                {
                    "deita_score": deita_score,
                    "deita_score_computed_with": score_computed_with,
                }
            )
        return inputs

    def _compute_nearest_neighbor(self, inputs: StepInput) -> StepInput:
        """Computes the cosine distance between the embeddings of the instruction-response
        pairs and the nearest neighbor.

        Args:
            inputs: The input data.

        Returns:
            The input data with the cosine distance computed.
        """
        embeddings = np.array([input["embedding"] for input in inputs])
        if self.normalize_embeddings:
            embeddings = self._normalize_embeddings(embeddings)
        self._logger.info("📏 Computing nearest neighbor distance...")

        if self.distance_metric == "cosine":
            self._logger.info("📏 Using cosine distance.")
            distances = self._cosine_distance(embeddings)
        else:
            self._logger.info("📏 Using manhattan distance.")
            distances = self._manhattan_distance(embeddings)

        for distance, input in zip(distances, inputs):
            input["nearest_neighbor_distance"] = distance
        return inputs

    def _normalize_embeddings(self, embeddings: np.ndarray) -> np.ndarray:
        """Normalize the embeddings.

        Args:
            embeddings: The embeddings to normalize.

        Returns:
            The normalized embeddings.
        """
        self._logger.info("⚖️ Normalizing embeddings...")
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        return embeddings / norms

    def _cosine_distance(self, embeddings: np.array) -> np.array:  # type: ignore
        """Computes the cosine distance between the embeddings.

        Args:
            embeddings: The embeddings.

        Returns:
            The cosine distance between the embeddings.
        """
        cosine_similarity = np.dot(embeddings, embeddings.T)
        cosine_distance = 1 - cosine_similarity
        # Ignore self-distance
        np.fill_diagonal(cosine_distance, np.inf)
        return np.min(cosine_distance, axis=1)

    def _manhattan_distance(self, embeddings: np.array) -> np.array:  # type: ignore
        """Computes the manhattan distance between the embeddings.

        Args:
            embeddings: The embeddings.

        Returns:
            The manhattan distance between the embeddings.
        """
        manhattan_distance = np.abs(embeddings[:, None] - embeddings).sum(-1)
        # Ignore self-distance
        np.fill_diagonal(manhattan_distance, np.inf)
        return np.min(manhattan_distance, axis=1)

process(inputs)

Filter the dataset based on the DEITA score and the cosine distance between the embeddings.

Parameters:

Name Type Description Default
inputs StepInput

The input data.

required

Returns:

Type Description
StepOutput

The filtered dataset.

Source code in src/distilabel/steps/deita.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Filter the dataset based on the DEITA score and the cosine distance between the
    embeddings.

    Args:
        inputs: The input data.

    Returns:
        The filtered dataset.
    """
    inputs = self._compute_deita_score(inputs)
    inputs = self._compute_nearest_neighbor(inputs)
    inputs.sort(key=lambda x: x["deita_score"], reverse=True)

    selected_rows = []
    for input in inputs:
        if len(selected_rows) >= self.data_budget:  # type: ignore
            break
        if input["nearest_neighbor_distance"] >= self.diversity_threshold:
            selected_rows.append(input)
    yield selected_rows

GeneratorStepOutput = Iterator[Tuple[List[Dict[str, Any]], bool]] module-attribute

GeneratorStepOutput is an alias of the typing Iterator[Tuple[List[Dict[str, Any]], bool]]

StepOutput = Iterator[List[Dict[str, Any]]] module-attribute

StepOutput is an alias of the typing Iterator[List[Dict[str, Any]]]