Skip to content

Generator Steps

LoadDataFromDicts

Bases: GeneratorStep

A generator step that loads a dataset from a list of dictionaries.

This step will load the dataset and yield the transformed data as it is loaded from the list of dictionaries.

Attributes:

Name Type Description
data List[Dict[str, Any]]

The list of dictionaries to load the data from.

Runtime parameters
  • batch_size: The batch size to use when processing the data.
Output columns

Dynamic, based on the keys found on the first dictionary of the list

Source code in src/distilabel/steps/generators/data.py
class LoadDataFromDicts(GeneratorStep):
    """A generator step that loads a dataset from a list of dictionaries.

    This step will load the dataset and yield the transformed data as it is loaded from the list of dictionaries.

    Attributes:
        data: The list of dictionaries to load the data from.

    Runtime parameters:
        - `batch_size`: The batch size to use when processing the data.

    Output columns:
        Dynamic, based on the keys found on the first dictionary of the list
    """

    data: List[Dict[str, Any]]

    @override
    def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
        """Yields batches from a list of dictionaries.

        Args:
            offset: The offset to start the generation from. Defaults to `0`.

        Yields:
            A list of Python dictionaries as read from the inputs (propagated in batches)
            and a flag indicating whether the yield batch is the last one.
        """
        if offset:
            self.data = self.data[offset:]

        while self.data:
            batch = self.data[: self.batch_size]
            self.data = self.data[self.batch_size :]
            yield (
                batch,
                True if len(self.data) == 0 else False,
            )

    @property
    def outputs(self) -> List[str]:
        """Returns a list of strings with the names of the columns that the step will generate."""
        return list(self.data[0].keys())

outputs: List[str] property

Returns a list of strings with the names of the columns that the step will generate.

process(offset=0)

Yields batches from a list of dictionaries.

Parameters:

Name Type Description Default
offset int

The offset to start the generation from. Defaults to 0.

0

Yields:

Type Description
GeneratorStepOutput

A list of Python dictionaries as read from the inputs (propagated in batches)

GeneratorStepOutput

and a flag indicating whether the yield batch is the last one.

Source code in src/distilabel/steps/generators/data.py
@override
def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
    """Yields batches from a list of dictionaries.

    Args:
        offset: The offset to start the generation from. Defaults to `0`.

    Yields:
        A list of Python dictionaries as read from the inputs (propagated in batches)
        and a flag indicating whether the yield batch is the last one.
    """
    if offset:
        self.data = self.data[offset:]

    while self.data:
        batch = self.data[: self.batch_size]
        self.data = self.data[self.batch_size :]
        yield (
            batch,
            True if len(self.data) == 0 else False,
        )

LoadHubDataset

Bases: GeneratorStep

A generator step that loads a dataset from the Hugging Face Hub using the datasets library.

This step will load the dataset in streaming mode, which means that it will not load the entire dataset into memory at once. Instead, it will load the dataset in chunks and yield the transformed data as it is loaded from the Hugging Face Hub.

Attributes:

Name Type Description
repo_id RuntimeParameter[str]

The Hugging Face Hub repository ID of the dataset to load.

split RuntimeParameter[str]

The split of the dataset to load.

config Optional[RuntimeParameter[str]]

The configuration of the dataset to load. This is optional and only needed if the dataset has multiple configurations.

Runtime parameters
  • batch_size: The batch size to use when processing the data.
  • repo_id: The Hugging Face Hub repository ID of the dataset to load.
  • split: The split of the dataset to load. Defaults to 'train'.
  • config: The configuration of the dataset to load. This is optional and only needed if the dataset has multiple configurations.

Output columns - dynamic, based on the dataset being loaded

Source code in src/distilabel/steps/generators/huggingface.py
class LoadHubDataset(GeneratorStep):
    """A generator step that loads a dataset from the Hugging Face Hub using the `datasets`
    library.

    This step will load the dataset in streaming mode, which means that it will not load the
    entire dataset into memory at once. Instead, it will load the dataset in chunks and yield
    the transformed data as it is loaded from the Hugging Face Hub.

    Attributes:
        repo_id: The Hugging Face Hub repository ID of the dataset to load.
        split: The split of the dataset to load.
        config: The configuration of the dataset to load. This is optional and only needed
            if the dataset has multiple configurations.

    Runtime parameters:
        - `batch_size`: The batch size to use when processing the data.
        - `repo_id`: The Hugging Face Hub repository ID of the dataset to load.
        - `split`: The split of the dataset to load. Defaults to 'train'.
        - `config`: The configuration of the dataset to load. This is optional and only
            needed if the dataset has multiple configurations.

    Output columns
        - dynamic, based on the dataset being loaded
    """

    repo_id: RuntimeParameter[str] = Field(
        default=None,
        description="The Hugging Face Hub repository ID of the dataset to load.",
    )
    split: RuntimeParameter[str] = Field(
        default="train",
        description="The split of the dataset to load. Defaults to 'train'.",
    )
    config: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The configuration of the dataset to load. This is optional and only"
        " needed if the dataset has multiple configurations.",
    )

    _dataset: Union[IterableDataset, None] = PrivateAttr(...)

    def load(self) -> None:
        """Load the dataset from the Hugging Face Hub"""
        super().load()

        self._dataset = load_dataset(
            self.repo_id,  # type: ignore
            self.config,
            split=self.split,
            streaming=True,
        )

    def process(self, offset: int = 0) -> "GeneratorStepOutput":
        """Yields batches from the loaded dataset from the Hugging Face Hub.

        Args:
            offset: The offset to start yielding the data from. Will be used during the caching
                process to help skipping already processed data.

        Yields:
            A tuple containing a batch of rows and a boolean indicating if the batch is
            the last one.
        """
        num_examples = self._get_dataset_num_examples()
        num_returned_rows = 0
        for batch_num, batch in enumerate(
            self._dataset.iter(batch_size=self.batch_size)  # type: ignore
        ):
            if batch_num * self.batch_size < offset:
                continue
            transformed_batch = self._transform_batch(batch)
            batch_size = len(transformed_batch)
            num_returned_rows += batch_size
            yield transformed_batch, num_returned_rows == num_examples

    @property
    def outputs(self) -> List[str]:
        """The columns that will be generated by this step, based on the datasets loaded
        from the Hugging Face Hub.

        Returns:
            The columns that will be generated by this step.
        """
        return self._get_dataset_columns()

    def _transform_batch(self, batch: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Transform a batch of data from the Hugging Face Hub into a list of rows.

        Args:
            batch: The batch of data from the Hugging Face Hub.

        Returns:
            A list of rows, where each row is a dictionary of column names and values.
        """
        length = len(next(iter(batch.values())))
        rows = []
        for i in range(length):
            rows.append({col: values[i] for col, values in batch.items()})
        return rows

    def _get_dataset_num_examples(self) -> int:
        """Get the number of examples in the dataset, based on the `split` and `config`
        runtime parameters provided.

        Returns:
            The number of examples in the dataset.
        """
        dataset_info = self._get_dataset_info()
        split = self.split
        if self.config:
            return dataset_info["splits"][split]["num_examples"]
        return dataset_info["default"]["splits"][split]["num_examples"]

    def _get_dataset_columns(self) -> List[str]:
        """Get the columns of the dataset, based on the `config` runtime parameter provided.

        Returns:
            The columns of the dataset.
        """
        dataset_info = self._get_dataset_info()
        if self.config:
            return list(dataset_info["features"].keys())
        return list(dataset_info["default"]["features"].keys())

    def _get_dataset_info(self) -> Dict[str, Any]:
        """Calls the Datasets Server API from Hugging Face to obtain the dataset information.

        Returns:
            The dataset information.
        """
        repo_id = self.repo_id
        config = self.config
        return _get_hf_dataset_info(repo_id, config)

outputs: List[str] property

The columns that will be generated by this step, based on the datasets loaded from the Hugging Face Hub.

Returns:

Type Description
List[str]

The columns that will be generated by this step.

load()

Load the dataset from the Hugging Face Hub

Source code in src/distilabel/steps/generators/huggingface.py
def load(self) -> None:
    """Load the dataset from the Hugging Face Hub"""
    super().load()

    self._dataset = load_dataset(
        self.repo_id,  # type: ignore
        self.config,
        split=self.split,
        streaming=True,
    )

process(offset=0)

Yields batches from the loaded dataset from the Hugging Face Hub.

Parameters:

Name Type Description Default
offset int

The offset to start yielding the data from. Will be used during the caching process to help skipping already processed data.

0

Yields:

Type Description
GeneratorStepOutput

A tuple containing a batch of rows and a boolean indicating if the batch is

GeneratorStepOutput

the last one.

Source code in src/distilabel/steps/generators/huggingface.py
def process(self, offset: int = 0) -> "GeneratorStepOutput":
    """Yields batches from the loaded dataset from the Hugging Face Hub.

    Args:
        offset: The offset to start yielding the data from. Will be used during the caching
            process to help skipping already processed data.

    Yields:
        A tuple containing a batch of rows and a boolean indicating if the batch is
        the last one.
    """
    num_examples = self._get_dataset_num_examples()
    num_returned_rows = 0
    for batch_num, batch in enumerate(
        self._dataset.iter(batch_size=self.batch_size)  # type: ignore
    ):
        if batch_num * self.batch_size < offset:
            continue
        transformed_batch = self._transform_batch(batch)
        batch_size = len(transformed_batch)
        num_returned_rows += batch_size
        yield transformed_batch, num_returned_rows == num_examples