Skip to content

Columns

This section contains the existing steps intended to be used for common column operations to apply to the batches.

expand

ExpandColumns

Bases: Step

Expand columns that contain lists into multiple rows.

ExpandColumns is a Step that takes a list of columns and expands them into multiple rows. The new rows will have the same data as the original row, except for the expanded column, which will contain a single item from the original list.

Attributes:

Name Type Description
columns Union[Dict[str, str], List[str]]

A dictionary that maps the column to be expanded to the new column name or a list of columns to be expanded. If a list is provided, the new column name will be the same as the column name.

Input columns
  • dynamic (determined by columns attribute): The columns to be expanded into multiple rows.
Output columns
  • dynamic (determined by columns attribute): The expanded columns.
Categories
  • columns

Examples:

Expand the selected columns into multiple rows:

from distilabel.steps import ExpandColumns

expand_columns = ExpandColumns(
    columns=["generation"],
)
expand_columns.load()

result = next(
    expand_columns.process(
        [
            {
                "instruction": "instruction 1",
                "generation": ["generation 1", "generation 2"]}
        ],
    )
)
# >>> result
# [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]
Source code in src/distilabel/steps/columns/expand.py
class ExpandColumns(Step):
    """Expand columns that contain lists into multiple rows.

    `ExpandColumns` is a `Step` that takes a list of columns and expands them into multiple
    rows. The new rows will have the same data as the original row, except for the expanded
    column, which will contain a single item from the original list.

    Attributes:
        columns: A dictionary that maps the column to be expanded to the new column name
            or a list of columns to be expanded. If a list is provided, the new column name
            will be the same as the column name.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to be expanded into
            multiple rows.

    Output columns:
        - dynamic (determined by `columns` attribute):  The expanded columns.

    Categories:
        - columns

    Examples:
        Expand the selected columns into multiple rows:

        ```python
        from distilabel.steps import ExpandColumns

        expand_columns = ExpandColumns(
            columns=["generation"],
        )
        expand_columns.load()

        result = next(
            expand_columns.process(
                [
                    {
                        "instruction": "instruction 1",
                        "generation": ["generation 1", "generation 2"]}
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]
        ```
    """

    columns: Union[Dict[str, str], List[str]]

    @field_validator("columns")
    @classmethod
    def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
        """Ensure that the columns are always a dictionary.

        Args:
            value: The columns to be expanded.

        Returns:
            The columns to be expanded as a dictionary.
        """
        if isinstance(value, list):
            return {col: col for col in value}

        return value

    @property
    def inputs(self) -> "StepColumns":
        """The columns to be expanded."""
        return list(self.columns.keys())

    @property
    def outputs(self) -> "StepColumns":
        """The expanded columns."""
        return [
            new_column if new_column else expand_column
            for expand_column, new_column in self.columns.items()
        ]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Expand the columns in the input data.

        Args:
            inputs: The input data.

        Yields:
            The expanded rows.
        """
        yield [row for input in inputs for row in self._expand_columns(input)]

    def _expand_columns(self, input: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Expand the columns in the input data.

        Args:
            input: The input data.

        Returns:
            The expanded rows.
        """
        expanded_rows = []
        for expand_column, new_column in self.columns.items():  # type: ignore
            data = input.get(expand_column)
            rows = []
            for item, expanded in zip_longest(*[data, expanded_rows], fillvalue=input):
                rows.append({**expanded, new_column: item})
            expanded_rows = rows
        return expanded_rows
inputs: StepColumns property

The columns to be expanded.

outputs: StepColumns property

The expanded columns.

always_dict(value) classmethod

Ensure that the columns are always a dictionary.

Parameters:

Name Type Description Default
value Union[Dict[str, str], List[str]]

The columns to be expanded.

required

Returns:

Type Description
Dict[str, str]

The columns to be expanded as a dictionary.

Source code in src/distilabel/steps/columns/expand.py
@field_validator("columns")
@classmethod
def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
    """Ensure that the columns are always a dictionary.

    Args:
        value: The columns to be expanded.

    Returns:
        The columns to be expanded as a dictionary.
    """
    if isinstance(value, list):
        return {col: col for col in value}

    return value
process(inputs)

Expand the columns in the input data.

Parameters:

Name Type Description Default
inputs StepInput

The input data.

required

Yields:

Type Description
StepOutput

The expanded rows.

Source code in src/distilabel/steps/columns/expand.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Expand the columns in the input data.

    Args:
        inputs: The input data.

    Yields:
        The expanded rows.
    """
    yield [row for input in inputs for row in self._expand_columns(input)]

keep

KeepColumns

Bases: Step

Keeps selected columns in the dataset.

KeepColumns is a Step that implements the process method that keeps only the columns specified in the columns attribute. Also KeepColumns provides an attribute columns to specify the columns to keep which will override the default value for the properties inputs and outputs.

Note

The order in which the columns are provided is important, as the output will be sorted using the provided order, which is useful before pushing either a dataset.Dataset via the PushToHub step or a distilabel.Distiset via the Pipeline.run output variable.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to keep.

Input columns
  • dynamic (determined by columns attribute): The columns to keep.
Output columns
  • dynamic (determined by columns attribute): The columns that were kept.
Categories
  • columns

Examples:

Select the columns to keep:

from distilabel.steps import KeepColumns

keep_columns = KeepColumns(
    columns=["instruction", "generation"],
)
keep_columns.load()

result = next(
    keep_columns.process(
        [{"instruction": "What's the brightest color?", "generation": "white", "model_name": "my_model"}],
    )
)
# >>> result
# [{'instruction': "What's the brightest color?", 'generation': 'white'}]
Source code in src/distilabel/steps/columns/keep.py
class KeepColumns(Step):
    """Keeps selected columns in the dataset.

    `KeepColumns` is a `Step` that implements the `process` method that keeps only the columns
    specified in the `columns` attribute. Also `KeepColumns` provides an attribute `columns` to
    specify the columns to keep which will override the default value for the properties `inputs`
    and `outputs`.

    Note:
        The order in which the columns are provided is important, as the output will be sorted
        using the provided order, which is useful before pushing either a `dataset.Dataset` via
        the `PushToHub` step or a `distilabel.Distiset` via the `Pipeline.run` output variable.

    Attributes:
        columns: List of strings with the names of the columns to keep.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to keep.

    Output columns:
        - dynamic (determined by `columns` attribute): The columns that were kept.

    Categories:
        - columns

    Examples:
        Select the columns to keep:

        ```python
        from distilabel.steps import KeepColumns

        keep_columns = KeepColumns(
            columns=["instruction", "generation"],
        )
        keep_columns.load()

        result = next(
            keep_columns.process(
                [{"instruction": "What's the brightest color?", "generation": "white", "model_name": "my_model"}],
            )
        )
        # >>> result
        # [{'instruction': "What's the brightest color?", 'generation': 'white'}]
        ```
    """

    columns: List[str]

    @property
    def inputs(self) -> "StepColumns":
        """The inputs for the task are the column names in `columns`."""
        return self.columns

    @property
    def outputs(self) -> "StepColumns":
        """The outputs for the task are the column names in `columns`."""
        return self.columns

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method keeps only the columns specified in the `columns` attribute.

        Args:
            *inputs: A list of dictionaries with the input data.

        Yields:
            A list of dictionaries with the output data.
        """
        for input in inputs:
            outputs = []
            for item in input:
                outputs.append({col: item[col] for col in self.columns})
            yield outputs
inputs: StepColumns property

The inputs for the task are the column names in columns.

outputs: StepColumns property

The outputs for the task are the column names in columns.

process(*inputs)

The process method keeps only the columns specified in the columns attribute.

Parameters:

Name Type Description Default
*inputs StepInput

A list of dictionaries with the input data.

()

Yields:

Type Description
StepOutput

A list of dictionaries with the output data.

Source code in src/distilabel/steps/columns/keep.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method keeps only the columns specified in the `columns` attribute.

    Args:
        *inputs: A list of dictionaries with the input data.

    Yields:
        A list of dictionaries with the output data.
    """
    for input in inputs:
        outputs = []
        for item in input:
            outputs.append({col: item[col] for col in self.columns})
        yield outputs

merge

MergeColumns

Bases: Step

Merge columns from a row.

MergeColumns is a Step that implements the process method that calls the merge_columns function to handle and combine columns in a StepInput. MergeColumns provides two attributes columns and output_column to specify the columns to merge and the resulting output column.

This step can be useful if you have a Task that generates instructions for example, and you want to have more examples of those. In such a case, you could for example use another Task to multiply your instructions synthetically, what would yield two different columns splitted. Using MergeColumns you can merge them and use them as a single column in your dataset for further processing.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to merge.

output_column Optional[str]

str name of the output column

Input columns
  • dynamic (determined by columns attribute): The columns to merge.
Output columns
  • dynamic (determined by columns and output_column attributes): The columns that were merged.
Categories
  • columns

Examples:

Combine columns in rows of a dataset:

from distilabel.steps import MergeColumns

combiner = MergeColumns(
    columns=["queries", "multiple_queries"],
    output_column="queries",
)
combiner.load()

result = next(
    combiner.process(
        [
            {
                "queries": "How are you?",
                "multiple_queries": ["What's up?", "Everything ok?"]
            }
        ],
    )
)
# >>> result
# [{'queries': ['How are you?', "What's up?", 'Everything ok?']}]
Source code in src/distilabel/steps/columns/merge.py
class MergeColumns(Step):
    """Merge columns from a row.

    `MergeColumns` is a `Step` that implements the `process` method that calls the `merge_columns`
    function to handle and combine columns in a `StepInput`. `MergeColumns` provides two attributes
    `columns` and `output_column` to specify the columns to merge and the resulting output column.

    This step can be useful if you have a `Task` that generates instructions for example, and you
    want to have more examples of those. In such a case, you could for example use another `Task`
    to multiply your instructions synthetically, what would yield two different columns splitted.
    Using `MergeColumns` you can merge them and use them as a single column in your dataset for
    further processing.

    Attributes:
        columns: List of strings with the names of the columns to merge.
        output_column: str name of the output column

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to merge.

    Output columns:
        - dynamic (determined by `columns` and `output_column` attributes): The columns
            that were merged.

    Categories:
        - columns

    Examples:
        Combine columns in rows of a dataset:

        ```python
        from distilabel.steps import MergeColumns

        combiner = MergeColumns(
            columns=["queries", "multiple_queries"],
            output_column="queries",
        )
        combiner.load()

        result = next(
            combiner.process(
                [
                    {
                        "queries": "How are you?",
                        "multiple_queries": ["What's up?", "Everything ok?"]
                    }
                ],
            )
        )
        # >>> result
        # [{'queries': ['How are you?', "What's up?", 'Everything ok?']}]
        ```
    """

    columns: List[str]
    output_column: Optional[str] = None

    @property
    def inputs(self) -> "StepColumns":
        return self.columns

    @property
    def outputs(self) -> "StepColumns":
        return [self.output_column] if self.output_column else ["merged_column"]

    @override
    def process(self, inputs: StepInput) -> "StepOutput":
        combined = []
        for input in inputs:
            combined.append(
                merge_columns(
                    input,
                    columns=self.columns,
                    new_column=self.outputs[0],
                )
            )
        yield combined

group

GroupColumns

Bases: Step

Combines columns from a list of StepInput.

GroupColumns is a Step that implements the process method that calls the group_dicts function to handle and combine a list of StepInput. Also GroupColumns provides two attributes columns and output_columns to specify the columns to group and the output columns which will override the default value for the properties inputs and outputs, respectively.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to group.

output_columns Optional[List[str]]

Optional list of strings with the names of the output columns.

Input columns
  • dynamic (determined by columns attribute): The columns to group.
Output columns
  • dynamic (determined by columns and output_columns attributes): The columns that were grouped.
Categories
  • columns

Examples:

Group columns of a dataset:

```python
from distilabel.steps import GroupColumns

group_columns = GroupColumns(
    name="group_columns",
    columns=["generation", "model_name"],
)
group_columns.load()

result = next(
    group_columns.process(
        [{"generation": "AI generated text"}, {"model_name": "my_model"}],
        [{"generation": "Other generated text", "model_name": "my_model"}]
    )
)
# >>> result
# [{'merged_generation': ['AI generated text', 'Other generated text'], 'merged_model_name': ['my_model']}]
```

Specify the name of the output columns:

```python
from distilabel.steps import GroupColumns

group_columns = GroupColumns(
    name="group_columns",
    columns=["generation", "model_name"],
    output_columns=["generations", "generation_models"]
)
group_columns.load()

result = next(
    group_columns.process(
        [{"generation": "AI generated text"}, {"model_name": "my_model"}],
        [{"generation": "Other generated text", "model_name": "my_model"}]
    )
)
# >>> result
#[{'generations': ['AI generated text', 'Other generated text'], 'generation_models': ['my_model']}]
```
Source code in src/distilabel/steps/columns/group.py
class GroupColumns(Step):
    """Combines columns from a list of `StepInput`.

    `GroupColumns` is a `Step` that implements the `process` method that calls the `group_dicts`
    function to handle and combine a list of `StepInput`. Also `GroupColumns` provides two attributes
    `columns` and `output_columns` to specify the columns to group and the output columns
    which will override the default value for the properties `inputs` and `outputs`, respectively.

    Attributes:
        columns: List of strings with the names of the columns to group.
        output_columns: Optional list of strings with the names of the output columns.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to group.

    Output columns:
        - dynamic (determined by `columns` and `output_columns` attributes): The columns
            that were grouped.

    Categories:
        - columns

    Examples:

        Group columns of a dataset:

        ```python
        from distilabel.steps import GroupColumns

        group_columns = GroupColumns(
            name="group_columns",
            columns=["generation", "model_name"],
        )
        group_columns.load()

        result = next(
            group_columns.process(
                [{"generation": "AI generated text"}, {"model_name": "my_model"}],
                [{"generation": "Other generated text", "model_name": "my_model"}]
            )
        )
        # >>> result
        # [{'merged_generation': ['AI generated text', 'Other generated text'], 'merged_model_name': ['my_model']}]
        ```

        Specify the name of the output columns:

        ```python
        from distilabel.steps import GroupColumns

        group_columns = GroupColumns(
            name="group_columns",
            columns=["generation", "model_name"],
            output_columns=["generations", "generation_models"]
        )
        group_columns.load()

        result = next(
            group_columns.process(
                [{"generation": "AI generated text"}, {"model_name": "my_model"}],
                [{"generation": "Other generated text", "model_name": "my_model"}]
            )
        )
        # >>> result
        #[{'generations': ['AI generated text', 'Other generated text'], 'generation_models': ['my_model']}]
        ```
    """

    columns: List[str]
    output_columns: Optional[List[str]] = None

    @property
    def inputs(self) -> "StepColumns":
        """The inputs for the task are the column names in `columns`."""
        return self.columns

    @property
    def outputs(self) -> "StepColumns":
        """The outputs for the task are the column names in `output_columns` or
        `grouped_{column}` for each column in `columns`."""
        return (
            self.output_columns
            if self.output_columns is not None
            else [f"grouped_{column}" for column in self.columns]
        )

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method calls the `group_dicts` function to handle and combine a list of `StepInput`.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with the combined `StepInput` using the `group_dicts` function.
        """
        yield group_columns(
            *inputs,
            group_columns=self.inputs,
            output_group_columns=self.outputs,
        )
inputs: StepColumns property

The inputs for the task are the column names in columns.

outputs: StepColumns property

The outputs for the task are the column names in output_columns or grouped_{column} for each column in columns.

process(*inputs)

The process method calls the group_dicts function to handle and combine a list of StepInput.

Parameters:

Name Type Description Default
*inputs StepInput

A list of StepInput to be combined.

()

Yields:

Type Description
StepOutput

A StepOutput with the combined StepInput using the group_dicts function.

Source code in src/distilabel/steps/columns/group.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method calls the `group_dicts` function to handle and combine a list of `StepInput`.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with the combined `StepInput` using the `group_dicts` function.
    """
    yield group_columns(
        *inputs,
        group_columns=self.inputs,
        output_group_columns=self.outputs,
    )

CombineColumns

Bases: GroupColumns

CombineColumns is deprecated and will be removed in version 1.5.0, use GroupColumns instead.

Source code in src/distilabel/steps/columns/group.py
class CombineColumns(GroupColumns):
    """`CombineColumns` is deprecated and will be removed in version 1.5.0, use `GroupColumns` instead."""

    def __init__(self, **data: Any) -> None:
        warnings.warn(
            "`CombineColumns` is deprecated and will be removed in version 1.5.0, use `GroupColumns` instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        return super().__init__(**data)

utils

merge_distilabel_metadata(*output_dicts)

Merge the DISTILABEL_METADATA_KEY from multiple output dictionaries.

Parameters:

Name Type Description Default
*output_dicts Dict[str, Any]

Variable number of dictionaries containing distilabel metadata.

()

Returns:

Type Description
Dict[str, Any]

A merged dictionary containing all the distilabel metadata from the input dictionaries.

Source code in src/distilabel/steps/columns/utils.py
def merge_distilabel_metadata(*output_dicts: Dict[str, Any]) -> Dict[str, Any]:
    """
    Merge the `DISTILABEL_METADATA_KEY` from multiple output dictionaries.

    Args:
        *output_dicts: Variable number of dictionaries containing distilabel metadata.

    Returns:
        A merged dictionary containing all the distilabel metadata from the input dictionaries.
    """
    merged_metadata = defaultdict(list)

    for output_dict in output_dicts:
        metadata = output_dict.get(DISTILABEL_METADATA_KEY, {})
        for key, value in metadata.items():
            merged_metadata[key].append(value)

    final_metadata = {}
    for key, value_list in merged_metadata.items():
        if len(value_list) == 1:
            final_metadata[key] = value_list[0]
        else:
            final_metadata[key] = value_list

    return final_metadata

group_columns(*inputs, group_columns, output_group_columns=None)

Groups multiple list of dictionaries into a single list of dictionaries on the specified group_columns. If group_columns are provided, then it will also rename group_columns.

Parameters:

Name Type Description Default
inputs StepInput

list of dictionaries to combine.

()
group_columns List[str]

list of keys to merge on.

required
output_group_columns Optional[List[str]]

list of keys to rename the merge keys to. Defaults to None.

None

Returns:

Type Description
StepInput

A list of dictionaries where the values of the group_columns are combined into a

StepInput

list and renamed to output_group_columns.

Source code in src/distilabel/steps/columns/utils.py
def group_columns(
    *inputs: "StepInput",
    group_columns: List[str],
    output_group_columns: Optional[List[str]] = None,
) -> "StepInput":
    """Groups multiple list of dictionaries into a single list of dictionaries on the
    specified `group_columns`. If `group_columns` are provided, then it will also rename
    `group_columns`.

    Args:
        inputs: list of dictionaries to combine.
        group_columns: list of keys to merge on.
        output_group_columns: list of keys to rename the merge keys to. Defaults to `None`.

    Returns:
        A list of dictionaries where the values of the `group_columns` are combined into a
        list and renamed to `output_group_columns`.
    """
    if output_group_columns is not None and len(output_group_columns) != len(
        group_columns
    ):
        raise ValueError(
            "The length of `output_group_columns` must be the same as the length of `group_columns`."
        )
    if output_group_columns is None:
        output_group_columns = [f"grouped_{key}" for key in group_columns]
    group_columns_dict = dict(zip(group_columns, output_group_columns))

    result = []
    # Use zip to iterate over lists based on their index
    for dicts_at_index in zip(*inputs):
        combined_dict = {}
        metadata_dicts = []
        # Iterate over dicts at the same index
        for d in dicts_at_index:
            # Extract metadata for merging
            if DISTILABEL_METADATA_KEY in d:
                metadata_dicts.append(
                    {DISTILABEL_METADATA_KEY: d[DISTILABEL_METADATA_KEY]}
                )
            # Iterate over key-value pairs in each dict
            for key, value in d.items():
                if key == DISTILABEL_METADATA_KEY:
                    continue
                # If the key is in the merge_keys, append the value to the existing list
                if key in group_columns_dict.keys():
                    combined_dict.setdefault(group_columns_dict[key], []).append(value)
                # If the key is not in the merge_keys, create a new key-value pair
                else:
                    combined_dict[key] = value

        if metadata_dicts:
            combined_dict[DISTILABEL_METADATA_KEY] = merge_distilabel_metadata(
                *metadata_dicts
            )

        result.append(combined_dict)
    return result

merge_columns(row, columns, new_column='combined_key')

Merge columns in a dictionary into a single column on the specified new_column.

Parameters:

Name Type Description Default
row Dict[str, Any]

Dictionary corresponding to a row in a dataset.

required
columns List[str]

List of keys to merge.

required
new_column str

Name of the new key created.

'combined_key'

Returns:

Type Description
Dict[str, Any]

Dictionary with the new merged key.

Source code in src/distilabel/steps/columns/utils.py
def merge_columns(
    row: Dict[str, Any], columns: List[str], new_column: str = "combined_key"
) -> Dict[str, Any]:
    """Merge columns in a dictionary into a single column on the specified `new_column`.

    Args:
        row: Dictionary corresponding to a row in a dataset.
        columns: List of keys to merge.
        new_column: Name of the new key created.

    Returns:
        Dictionary with the new merged key.
    """
    result = row.copy()  # preserve the original dictionary
    combined = []
    for key in columns:
        to_combine = result.pop(key)
        if not isinstance(to_combine, list):
            to_combine = [to_combine]
        combined += to_combine
    result[new_column] = combined
    return result