Skip to content

Columns

This section contains the existing steps intended to be used for common column operations to apply to the batches.

expand

ExpandColumns

Bases: Step

Expand columns that contain lists into multiple rows.

ExpandColumns is a Step that takes a list of columns and expands them into multiple rows. The new rows will have the same data as the original row, except for the expanded column, which will contain a single item from the original list.

Attributes:

Name Type Description
columns Union[Dict[str, str], List[str]]

A dictionary that maps the column to be expanded to the new column name or a list of columns to be expanded. If a list is provided, the new column name will be the same as the column name.

encoded Union[bool, List[str]]

A bool to inform Whether the columns are JSON encoded lists. If this value is set to True, the columns will be decoded before expanding. Alternatively, to specify columns that can be encoded, a list can be provided. In this case, the column names informed must be a subset of the columns selected for expansion.

split_statistics bool

A bool to inform whether the statistics in the distilabel_metadata column should be split into multiple rows. If we want to expand some columns containing a list of strings that come from having parsed the output of an LLM, the tokens in the statistics_{step_name} of the distilabel_metadata column should be splitted to avoid multiplying them if we aggregate the data afterwards. For example, with a task that is supposed to generate a list of N instructions, and we want each of those N instructions in different rows, we should split the statistics by N. In such a case, set this value to True.

Input columns
  • dynamic (determined by columns attribute): The columns to be expanded into multiple rows.
Output columns
  • dynamic (determined by columns attribute): The expanded columns.
Categories
  • columns

Examples:

Expand the selected columns into multiple rows:

from distilabel.steps import ExpandColumns

expand_columns = ExpandColumns(
    columns=["generation"],
)
expand_columns.load()

result = next(
    expand_columns.process(
        [
            {
                "instruction": "instruction 1",
                "generation": ["generation 1", "generation 2"]}
        ],
    )
)
# >>> result
# [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]

Expand the selected columns which are JSON encoded into multiple rows:

from distilabel.steps import ExpandColumns

expand_columns = ExpandColumns(
    columns=["generation"],
    encoded=True,  # It can also be a list of columns that are encoded, i.e. ["generation"]
)
expand_columns.load()

result = next(
    expand_columns.process(
        [
            {
                "instruction": "instruction 1",
                "generation": '["generation 1", "generation 2"]'}
        ],
    )
)
# >>> result
# [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]

Expand the selected columns and split the statistics in the distilabel_metadata column:

from distilabel.steps import ExpandColumns

expand_columns = ExpandColumns(
    columns=["generation"],
    split_statistics=True,
)
expand_columns.load()

result = next(
    expand_columns.process(
        [
            {
                "instruction": "instruction 1",
                "generation": ["generation 1", "generation 2"],
                "distilabel_metadata": {
                    "statistics_generation": {
                        "input_tokens": [12],
                        "output_tokens": [12],
                    },
                },
            }
        ],
    )
)
# >>> result
# [{'instruction': 'instruction 1', 'generation': 'generation 1', 'distilabel_metadata': {'statistics_generation': {'input_tokens': [6], 'output_tokens': [6]}}}, {'instruction': 'instruction 1', 'generation': 'generation 2', 'distilabel_metadata': {'statistics_generation': {'input_tokens': [6], 'output_tokens': [6]}}}]
Source code in src/distilabel/steps/columns/expand.py
class ExpandColumns(Step):
    """Expand columns that contain lists into multiple rows.

    `ExpandColumns` is a `Step` that takes a list of columns and expands them into multiple
    rows. The new rows will have the same data as the original row, except for the expanded
    column, which will contain a single item from the original list.

    Attributes:
        columns: A dictionary that maps the column to be expanded to the new column name
            or a list of columns to be expanded. If a list is provided, the new column name
            will be the same as the column name.
        encoded: A bool to inform Whether the columns are JSON encoded lists. If this value is
            set to True, the columns will be decoded before expanding. Alternatively, to specify
            columns that can be encoded, a list can be provided. In this case, the column names
            informed must be a subset of the columns selected for expansion.
        split_statistics: A bool to inform whether the statistics in the `distilabel_metadata`
            column should be split into multiple rows.
            If we want to expand some columns containing a list of strings that come from
            having parsed the output of an LLM, the tokens in the `statistics_{step_name}`
            of the `distilabel_metadata` column should be splitted to avoid multiplying
            them if we aggregate the data afterwards. For example, with a task that is supposed
            to generate a list of N instructions, and we want each of those N instructions in
            different rows, we should split the statistics by N.
            In such a case, set this value to True.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to be expanded into
            multiple rows.

    Output columns:
        - dynamic (determined by `columns` attribute):  The expanded columns.

    Categories:
        - columns

    Examples:
        Expand the selected columns into multiple rows:

        ```python
        from distilabel.steps import ExpandColumns

        expand_columns = ExpandColumns(
            columns=["generation"],
        )
        expand_columns.load()

        result = next(
            expand_columns.process(
                [
                    {
                        "instruction": "instruction 1",
                        "generation": ["generation 1", "generation 2"]}
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]
        ```

        Expand the selected columns which are JSON encoded into multiple rows:

        ```python
        from distilabel.steps import ExpandColumns

        expand_columns = ExpandColumns(
            columns=["generation"],
            encoded=True,  # It can also be a list of columns that are encoded, i.e. ["generation"]
        )
        expand_columns.load()

        result = next(
            expand_columns.process(
                [
                    {
                        "instruction": "instruction 1",
                        "generation": '["generation 1", "generation 2"]'}
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]
        ```

        Expand the selected columns and split the statistics in the `distilabel_metadata` column:

        ```python
        from distilabel.steps import ExpandColumns

        expand_columns = ExpandColumns(
            columns=["generation"],
            split_statistics=True,
        )
        expand_columns.load()

        result = next(
            expand_columns.process(
                [
                    {
                        "instruction": "instruction 1",
                        "generation": ["generation 1", "generation 2"],
                        "distilabel_metadata": {
                            "statistics_generation": {
                                "input_tokens": [12],
                                "output_tokens": [12],
                            },
                        },
                    }
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction 1', 'generation': 'generation 1', 'distilabel_metadata': {'statistics_generation': {'input_tokens': [6], 'output_tokens': [6]}}}, {'instruction': 'instruction 1', 'generation': 'generation 2', 'distilabel_metadata': {'statistics_generation': {'input_tokens': [6], 'output_tokens': [6]}}}]
        ```
    """

    columns: Union[Dict[str, str], List[str]]
    encoded: Union[bool, List[str]] = False
    split_statistics: bool = False

    @field_validator("columns")
    @classmethod
    def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
        """Ensure that the columns are always a dictionary.

        Args:
            value: The columns to be expanded.

        Returns:
            The columns to be expanded as a dictionary.
        """
        if isinstance(value, list):
            return {col: col for col in value}

        return value

    @model_validator(mode="after")
    def is_subset(self) -> Self:
        """Ensure the "encoded" column names are a subset of the "columns" selected.

        Returns:
            The "encoded" attribute updated to work internally.
        """
        if isinstance(self.encoded, list):
            if not set(self.encoded).issubset(set(self.columns.keys())):
                raise ValueError(
                    "The 'encoded' columns must be a subset of the 'columns' selected for expansion."
                )
        if isinstance(self.encoded, bool):
            self.encoded = list(self.columns.keys()) if self.encoded else []
        return self

    @property
    def inputs(self) -> "StepColumns":
        """The columns to be expanded."""
        return list(self.columns.keys())

    @property
    def outputs(self) -> "StepColumns":
        """The expanded columns."""
        return [
            new_column if new_column else expand_column
            for expand_column, new_column in self.columns.items()
        ]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Expand the columns in the input data.

        Args:
            inputs: The input data.

        Yields:
            The expanded rows.
        """
        if self.encoded:
            for input in inputs:
                for column in self.encoded:
                    input[column] = json.loads(input[column])

        yield [row for input in inputs for row in self._expand_columns(input)]

    def _expand_columns(self, input: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Expand the columns in the input data.

        Args:
            input: The input data.

        Returns:
            The expanded rows.
        """
        metadata_visited = False
        expanded_rows = []
        # Update the columns here to avoid doing the validation on the `inputs`, as the
        # `distilabel_metadata` is not defined on Pipeline creation on the DAG.
        columns = self.columns
        if self.split_statistics:
            columns["distilabel_metadata"] = "distilabel_metadata"

        for expand_column, new_column in columns.items():  # type: ignore
            data = input.get(expand_column)
            input, metadata_visited = self._split_metadata(
                input, len(data), metadata_visited
            )

            rows = []
            for item, expanded in zip_longest(*[data, expanded_rows], fillvalue=input):
                rows.append({**expanded, new_column: item})
            expanded_rows = rows
        return expanded_rows

    def _split_metadata(
        self, input: Dict[str, Any], n: int, metadata_visited: bool = False
    ) -> None:
        """Help method to split the statistics in `distilabel_metadata` column.

        Args:
            input: The input data.
            n: Number of splits to apply to the tokens (if we have 12 tokens and want to split
                them 3 times, n==3).
            metadata_visited: Bool to prevent from updating the data more than once.

        Returns:
            Updated input with the `distilabel_metadata` updated.
        """
        # - If we want to split the statistics, we need to ensure that the metadata is present.
        # - Metadata can only be visited once per row to avoid successive splitting.
        # TODO: For an odd number of tokens, this will miss 1, we have to fix it.
        if (
            self.split_statistics
            and (metadata := input.get("distilabel_metadata", {}))
            and not metadata_visited
        ):
            for k, v in metadata.items():
                if (
                    not v
                ):  # In case it wasn't found in the metadata for some error, skip it
                    continue
                if k.startswith("statistics_") and (
                    "input_tokens" in v and "output_tokens" in v
                ):
                    # For num_generations>1 we assume all the tokens should be divided by n
                    # TODO: The tokens should always come as a list, but there can
                    # be differences
                    if isinstance(v["input_tokens"], list):
                        input_tokens = [value // n for value in v["input_tokens"]]
                    else:
                        input_tokens = [v["input_tokens"] // n]
                    if isinstance(v["input_tokens"], list):
                        output_tokens = [value // n for value in v["output_tokens"]]
                    else:
                        output_tokens = [v["output_tokens"] // n]

                    input["distilabel_metadata"][k] = {
                        "input_tokens": input_tokens,
                        "output_tokens": output_tokens,
                    }
                metadata_visited = True
            # Once we have updated the metadata, Create a list out of it to let the
            # following section to expand it as any other column.
            if isinstance(input["distilabel_metadata"], dict):
                input["distilabel_metadata"] = [input["distilabel_metadata"]] * n
        return input, metadata_visited
inputs property

The columns to be expanded.

outputs property

The expanded columns.

always_dict(value) classmethod

Ensure that the columns are always a dictionary.

Parameters:

Name Type Description Default
value Union[Dict[str, str], List[str]]

The columns to be expanded.

required

Returns:

Type Description
Dict[str, str]

The columns to be expanded as a dictionary.

Source code in src/distilabel/steps/columns/expand.py
@field_validator("columns")
@classmethod
def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
    """Ensure that the columns are always a dictionary.

    Args:
        value: The columns to be expanded.

    Returns:
        The columns to be expanded as a dictionary.
    """
    if isinstance(value, list):
        return {col: col for col in value}

    return value
is_subset()

Ensure the "encoded" column names are a subset of the "columns" selected.

Returns:

Type Description
Self

The "encoded" attribute updated to work internally.

Source code in src/distilabel/steps/columns/expand.py
@model_validator(mode="after")
def is_subset(self) -> Self:
    """Ensure the "encoded" column names are a subset of the "columns" selected.

    Returns:
        The "encoded" attribute updated to work internally.
    """
    if isinstance(self.encoded, list):
        if not set(self.encoded).issubset(set(self.columns.keys())):
            raise ValueError(
                "The 'encoded' columns must be a subset of the 'columns' selected for expansion."
            )
    if isinstance(self.encoded, bool):
        self.encoded = list(self.columns.keys()) if self.encoded else []
    return self
process(inputs)

Expand the columns in the input data.

Parameters:

Name Type Description Default
inputs StepInput

The input data.

required

Yields:

Type Description
StepOutput

The expanded rows.

Source code in src/distilabel/steps/columns/expand.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Expand the columns in the input data.

    Args:
        inputs: The input data.

    Yields:
        The expanded rows.
    """
    if self.encoded:
        for input in inputs:
            for column in self.encoded:
                input[column] = json.loads(input[column])

    yield [row for input in inputs for row in self._expand_columns(input)]

keep

KeepColumns

Bases: Step

Keeps selected columns in the dataset.

KeepColumns is a Step that implements the process method that keeps only the columns specified in the columns attribute. Also KeepColumns provides an attribute columns to specify the columns to keep which will override the default value for the properties inputs and outputs.

Note

The order in which the columns are provided is important, as the output will be sorted using the provided order, which is useful before pushing either a dataset.Dataset via the PushToHub step or a distilabel.Distiset via the Pipeline.run output variable.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to keep.

Input columns
  • dynamic (determined by columns attribute): The columns to keep.
Output columns
  • dynamic (determined by columns attribute): The columns that were kept.
Categories
  • columns

Examples:

Select the columns to keep:

from distilabel.steps import KeepColumns

keep_columns = KeepColumns(
    columns=["instruction", "generation"],
)
keep_columns.load()

result = next(
    keep_columns.process(
        [{"instruction": "What's the brightest color?", "generation": "white", "model_name": "my_model"}],
    )
)
# >>> result
# [{'instruction': "What's the brightest color?", 'generation': 'white'}]
Source code in src/distilabel/steps/columns/keep.py
class KeepColumns(Step):
    """Keeps selected columns in the dataset.

    `KeepColumns` is a `Step` that implements the `process` method that keeps only the columns
    specified in the `columns` attribute. Also `KeepColumns` provides an attribute `columns` to
    specify the columns to keep which will override the default value for the properties `inputs`
    and `outputs`.

    Note:
        The order in which the columns are provided is important, as the output will be sorted
        using the provided order, which is useful before pushing either a `dataset.Dataset` via
        the `PushToHub` step or a `distilabel.Distiset` via the `Pipeline.run` output variable.

    Attributes:
        columns: List of strings with the names of the columns to keep.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to keep.

    Output columns:
        - dynamic (determined by `columns` attribute): The columns that were kept.

    Categories:
        - columns

    Examples:
        Select the columns to keep:

        ```python
        from distilabel.steps import KeepColumns

        keep_columns = KeepColumns(
            columns=["instruction", "generation"],
        )
        keep_columns.load()

        result = next(
            keep_columns.process(
                [{"instruction": "What's the brightest color?", "generation": "white", "model_name": "my_model"}],
            )
        )
        # >>> result
        # [{'instruction': "What's the brightest color?", 'generation': 'white'}]
        ```
    """

    columns: List[str]

    @property
    def inputs(self) -> "StepColumns":
        """The inputs for the task are the column names in `columns`."""
        return self.columns

    @property
    def outputs(self) -> "StepColumns":
        """The outputs for the task are the column names in `columns`."""
        return self.columns

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method keeps only the columns specified in the `columns` attribute.

        Args:
            *inputs: A list of dictionaries with the input data.

        Yields:
            A list of dictionaries with the output data.
        """
        for input in inputs:
            outputs = []
            for item in input:
                outputs.append({col: item[col] for col in self.columns})
            yield outputs
inputs property

The inputs for the task are the column names in columns.

outputs property

The outputs for the task are the column names in columns.

process(*inputs)

The process method keeps only the columns specified in the columns attribute.

Parameters:

Name Type Description Default
*inputs StepInput

A list of dictionaries with the input data.

()

Yields:

Type Description
StepOutput

A list of dictionaries with the output data.

Source code in src/distilabel/steps/columns/keep.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method keeps only the columns specified in the `columns` attribute.

    Args:
        *inputs: A list of dictionaries with the input data.

    Yields:
        A list of dictionaries with the output data.
    """
    for input in inputs:
        outputs = []
        for item in input:
            outputs.append({col: item[col] for col in self.columns})
        yield outputs

merge

MergeColumns

Bases: Step

Merge columns from a row.

MergeColumns is a Step that implements the process method that calls the merge_columns function to handle and combine columns in a StepInput. MergeColumns provides two attributes columns and output_column to specify the columns to merge and the resulting output column.

This step can be useful if you have a Task that generates instructions for example, and you want to have more examples of those. In such a case, you could for example use another Task to multiply your instructions synthetically, what would yield two different columns splitted. Using MergeColumns you can merge them and use them as a single column in your dataset for further processing.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to merge.

output_column Optional[str]

str name of the output column

Input columns
  • dynamic (determined by columns attribute): The columns to merge.
Output columns
  • dynamic (determined by columns and output_column attributes): The columns that were merged.
Categories
  • columns

Examples:

Combine columns in rows of a dataset:

from distilabel.steps import MergeColumns

combiner = MergeColumns(
    columns=["queries", "multiple_queries"],
    output_column="queries",
)
combiner.load()

result = next(
    combiner.process(
        [
            {
                "queries": "How are you?",
                "multiple_queries": ["What's up?", "Everything ok?"]
            }
        ],
    )
)
# >>> result
# [{'queries': ['How are you?', "What's up?", 'Everything ok?']}]
Source code in src/distilabel/steps/columns/merge.py
class MergeColumns(Step):
    """Merge columns from a row.

    `MergeColumns` is a `Step` that implements the `process` method that calls the `merge_columns`
    function to handle and combine columns in a `StepInput`. `MergeColumns` provides two attributes
    `columns` and `output_column` to specify the columns to merge and the resulting output column.

    This step can be useful if you have a `Task` that generates instructions for example, and you
    want to have more examples of those. In such a case, you could for example use another `Task`
    to multiply your instructions synthetically, what would yield two different columns splitted.
    Using `MergeColumns` you can merge them and use them as a single column in your dataset for
    further processing.

    Attributes:
        columns: List of strings with the names of the columns to merge.
        output_column: str name of the output column

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to merge.

    Output columns:
        - dynamic (determined by `columns` and `output_column` attributes): The columns
            that were merged.

    Categories:
        - columns

    Examples:
        Combine columns in rows of a dataset:

        ```python
        from distilabel.steps import MergeColumns

        combiner = MergeColumns(
            columns=["queries", "multiple_queries"],
            output_column="queries",
        )
        combiner.load()

        result = next(
            combiner.process(
                [
                    {
                        "queries": "How are you?",
                        "multiple_queries": ["What's up?", "Everything ok?"]
                    }
                ],
            )
        )
        # >>> result
        # [{'queries': ['How are you?', "What's up?", 'Everything ok?']}]
        ```
    """

    columns: List[str]
    output_column: Optional[str] = None

    @property
    def inputs(self) -> "StepColumns":
        return self.columns

    @property
    def outputs(self) -> "StepColumns":
        return [self.output_column] if self.output_column else ["merged_column"]

    @override
    def process(self, inputs: StepInput) -> "StepOutput":
        combined = []
        for input in inputs:
            combined.append(
                merge_columns(
                    input,
                    columns=self.columns,
                    new_column=self.outputs[0],
                )
            )
        yield combined

group

GroupColumns

Bases: Step

Combines columns from a list of StepInput.

GroupColumns is a Step that implements the process method that calls the group_dicts function to handle and combine a list of StepInput. Also GroupColumns provides two attributes columns and output_columns to specify the columns to group and the output columns which will override the default value for the properties inputs and outputs, respectively.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to group.

output_columns Optional[List[str]]

Optional list of strings with the names of the output columns.

Input columns
  • dynamic (determined by columns attribute): The columns to group.
Output columns
  • dynamic (determined by columns and output_columns attributes): The columns that were grouped.
Categories
  • columns

Examples:

Group columns of a dataset:

```python
from distilabel.steps import GroupColumns

group_columns = GroupColumns(
    name="group_columns",
    columns=["generation", "model_name"],
)
group_columns.load()

result = next(
    group_columns.process(
        [{"generation": "AI generated text"}, {"model_name": "my_model"}],
        [{"generation": "Other generated text", "model_name": "my_model"}]
    )
)
# >>> result
# [{'merged_generation': ['AI generated text', 'Other generated text'], 'merged_model_name': ['my_model']}]
```

Specify the name of the output columns:

```python
from distilabel.steps import GroupColumns

group_columns = GroupColumns(
    name="group_columns",
    columns=["generation", "model_name"],
    output_columns=["generations", "generation_models"]
)
group_columns.load()

result = next(
    group_columns.process(
        [{"generation": "AI generated text"}, {"model_name": "my_model"}],
        [{"generation": "Other generated text", "model_name": "my_model"}]
    )
)
# >>> result
#[{'generations': ['AI generated text', 'Other generated text'], 'generation_models': ['my_model']}]
```
Source code in src/distilabel/steps/columns/group.py
class GroupColumns(Step):
    """Combines columns from a list of `StepInput`.

    `GroupColumns` is a `Step` that implements the `process` method that calls the `group_dicts`
    function to handle and combine a list of `StepInput`. Also `GroupColumns` provides two attributes
    `columns` and `output_columns` to specify the columns to group and the output columns
    which will override the default value for the properties `inputs` and `outputs`, respectively.

    Attributes:
        columns: List of strings with the names of the columns to group.
        output_columns: Optional list of strings with the names of the output columns.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to group.

    Output columns:
        - dynamic (determined by `columns` and `output_columns` attributes): The columns
            that were grouped.

    Categories:
        - columns

    Examples:

        Group columns of a dataset:

        ```python
        from distilabel.steps import GroupColumns

        group_columns = GroupColumns(
            name="group_columns",
            columns=["generation", "model_name"],
        )
        group_columns.load()

        result = next(
            group_columns.process(
                [{"generation": "AI generated text"}, {"model_name": "my_model"}],
                [{"generation": "Other generated text", "model_name": "my_model"}]
            )
        )
        # >>> result
        # [{'merged_generation': ['AI generated text', 'Other generated text'], 'merged_model_name': ['my_model']}]
        ```

        Specify the name of the output columns:

        ```python
        from distilabel.steps import GroupColumns

        group_columns = GroupColumns(
            name="group_columns",
            columns=["generation", "model_name"],
            output_columns=["generations", "generation_models"]
        )
        group_columns.load()

        result = next(
            group_columns.process(
                [{"generation": "AI generated text"}, {"model_name": "my_model"}],
                [{"generation": "Other generated text", "model_name": "my_model"}]
            )
        )
        # >>> result
        #[{'generations': ['AI generated text', 'Other generated text'], 'generation_models': ['my_model']}]
        ```
    """

    columns: List[str]
    output_columns: Optional[List[str]] = None

    @property
    def inputs(self) -> List[str]:
        """The inputs for the task are the column names in `columns`."""
        return self.columns

    @property
    def outputs(self) -> List[str]:
        """The outputs for the task are the column names in `output_columns` or
        `grouped_{column}` for each column in `columns`."""
        return (
            self.output_columns
            if self.output_columns is not None
            else [f"grouped_{column}" for column in self.columns]
        )

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method calls the `group_dicts` function to handle and combine a list of `StepInput`.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with the combined `StepInput` using the `group_dicts` function.
        """
        yield group_columns(
            *inputs,
            group_columns=self.inputs,
            output_group_columns=self.outputs,
        )
inputs property

The inputs for the task are the column names in columns.

outputs property

The outputs for the task are the column names in output_columns or grouped_{column} for each column in columns.

process(*inputs)

The process method calls the group_dicts function to handle and combine a list of StepInput.

Parameters:

Name Type Description Default
*inputs StepInput

A list of StepInput to be combined.

()

Yields:

Type Description
StepOutput

A StepOutput with the combined StepInput using the group_dicts function.

Source code in src/distilabel/steps/columns/group.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method calls the `group_dicts` function to handle and combine a list of `StepInput`.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with the combined `StepInput` using the `group_dicts` function.
    """
    yield group_columns(
        *inputs,
        group_columns=self.inputs,
        output_group_columns=self.outputs,
    )

utils

merge_distilabel_metadata(*output_dicts)

Merge the DISTILABEL_METADATA_KEY from multiple output dictionaries. DISTILABEL_METADATA_KEY can be either a dictionary containing metadata keys or a list containing dictionaries of metadata keys.

Parameters:

Name Type Description Default
*output_dicts Dict[str, Any]

Variable number of dictionaries or lists containing distilabel metadata.

()

Returns:

Type Description
Union[Dict[str, Any], List[Dict[str, Any]]]

A merged dictionary or list containing all the distilabel metadata.

Source code in src/distilabel/steps/columns/utils.py
def merge_distilabel_metadata(
    *output_dicts: Dict[str, Any],
) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
    """
    Merge the `DISTILABEL_METADATA_KEY` from multiple output dictionaries. `DISTILABEL_METADATA_KEY`
    can be either a dictionary containing metadata keys or a list containing dictionaries
    of metadata keys.

    Args:
        *output_dicts: Variable number of dictionaries or lists containing distilabel metadata.

    Returns:
        A merged dictionary or list containing all the distilabel metadata.
    """
    merged_metadata = defaultdict(list)

    for output_dict in output_dicts:
        metadata = output_dict.get(DISTILABEL_METADATA_KEY, {})
        # If `distilabel_metadata_key` is a `list` then it contains dictionaries with
        # the metadata per `num_generations` created when `group_generations==True`
        if isinstance(metadata, list):
            if not isinstance(merged_metadata, list):
                merged_metadata = []
            merged_metadata.extend(metadata)
        else:
            for key, value in metadata.items():
                merged_metadata[key].append(value)

    if isinstance(merged_metadata, list):
        return merged_metadata

    final_metadata = {}
    for key, value_list in merged_metadata.items():
        if len(value_list) == 1:
            final_metadata[key] = value_list[0]
        else:
            final_metadata[key] = value_list

    return final_metadata

group_columns(*inputs, group_columns, output_group_columns=None)

Groups multiple list of dictionaries into a single list of dictionaries on the specified group_columns. If group_columns are provided, then it will also rename group_columns.

Parameters:

Name Type Description Default
inputs StepInput

list of dictionaries to combine.

()
group_columns List[str]

list of keys to merge on.

required
output_group_columns Optional[List[str]]

list of keys to rename the merge keys to. Defaults to None.

None

Returns:

Type Description
StepInput

A list of dictionaries where the values of the group_columns are combined into a

StepInput

list and renamed to output_group_columns.

Source code in src/distilabel/steps/columns/utils.py
def group_columns(
    *inputs: "StepInput",
    group_columns: List[str],
    output_group_columns: Optional[List[str]] = None,
) -> "StepInput":
    """Groups multiple list of dictionaries into a single list of dictionaries on the
    specified `group_columns`. If `group_columns` are provided, then it will also rename
    `group_columns`.

    Args:
        inputs: list of dictionaries to combine.
        group_columns: list of keys to merge on.
        output_group_columns: list of keys to rename the merge keys to. Defaults to `None`.

    Returns:
        A list of dictionaries where the values of the `group_columns` are combined into a
        list and renamed to `output_group_columns`.
    """
    if output_group_columns is not None and len(output_group_columns) != len(
        group_columns
    ):
        raise ValueError(
            "The length of `output_group_columns` must be the same as the length of `group_columns`."
        )
    if output_group_columns is None:
        output_group_columns = [f"grouped_{key}" for key in group_columns]
    group_columns_dict = dict(zip(group_columns, output_group_columns))

    result = []
    # Use zip to iterate over lists based on their index
    for dicts_at_index in zip(*inputs):
        combined_dict = {}
        metadata_dicts = []
        # Iterate over dicts at the same index
        for d in dicts_at_index:
            # Extract metadata for merging
            if DISTILABEL_METADATA_KEY in d:
                metadata_dicts.append(
                    {DISTILABEL_METADATA_KEY: d[DISTILABEL_METADATA_KEY]}
                )
            # Iterate over key-value pairs in each dict
            for key, value in d.items():
                if key == DISTILABEL_METADATA_KEY:
                    continue
                # If the key is in the merge_keys, append the value to the existing list
                if key in group_columns_dict.keys():
                    combined_dict.setdefault(group_columns_dict[key], []).append(value)
                # If the key is not in the merge_keys, create a new key-value pair
                else:
                    combined_dict[key] = value

        if metadata_dicts:
            combined_dict[DISTILABEL_METADATA_KEY] = merge_distilabel_metadata(
                *metadata_dicts
            )

        result.append(combined_dict)
    return result

merge_columns(row, columns, new_column='combined_key')

Merge columns in a dictionary into a single column on the specified new_column.

Parameters:

Name Type Description Default
row Dict[str, Any]

Dictionary corresponding to a row in a dataset.

required
columns List[str]

List of keys to merge.

required
new_column str

Name of the new key created.

'combined_key'

Returns:

Type Description
Dict[str, Any]

Dictionary with the new merged key.

Source code in src/distilabel/steps/columns/utils.py
def merge_columns(
    row: Dict[str, Any], columns: List[str], new_column: str = "combined_key"
) -> Dict[str, Any]:
    """Merge columns in a dictionary into a single column on the specified `new_column`.

    Args:
        row: Dictionary corresponding to a row in a dataset.
        columns: List of keys to merge.
        new_column: Name of the new key created.

    Returns:
        Dictionary with the new merged key.
    """
    result = row.copy()  # preserve the original dictionary
    combined = []
    for key in columns:
        to_combine = result.pop(key)
        if not isinstance(to_combine, list):
            to_combine = [to_combine]
        combined += to_combine
    result[new_column] = combined
    return result