Skip to content

Columns

This section contains the existing steps intended to be used for commong column operations to apply to the batches.

CombineColumns

Bases: Step

Combines columns from a list of StepInput.

CombineColumns is a Step that implements the process method that calls the combine_dicts function to handle and combine a list of StepInput. Also CombineColumns provides two attributes columns and output_columns to specify the columns to merge and the output columns which will override the default value for the properties inputs and outputs, respectively.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to merge.

output_columns Optional[List[str]]

Optional list of strings with the names of the output columns.

Input columns
  • dynamic (determined by columns attribute): The columns to merge.
Output columns
  • dynamic (determined by columns and output_columns attributes): The columns that were merged.
Source code in src/distilabel/steps/combine.py
class CombineColumns(Step):
    """Combines columns from a list of `StepInput`.

    `CombineColumns` is a `Step` that implements the `process` method that calls the `combine_dicts`
    function to handle and combine a list of `StepInput`. Also `CombineColumns` provides two attributes
    `columns` and `output_columns` to specify the columns to merge and the output columns
    which will override the default value for the properties `inputs` and `outputs`, respectively.

    Attributes:
        columns: List of strings with the names of the columns to merge.
        output_columns: Optional list of strings with the names of the output columns.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to merge.

    Output columns:
        - dynamic (determined by `columns` and `output_columns` attributes): The columns
            that were merged.
    """

    columns: List[str]
    output_columns: Optional[List[str]] = None

    @property
    def inputs(self) -> List[str]:
        """The inputs for the task are the column names in `columns`."""
        return self.columns

    @property
    def outputs(self) -> List[str]:
        """The outputs for the task are the column names in `output_columns` or
        `merged_{column}` for each column in `columns`."""
        return (
            self.output_columns
            if self.output_columns is not None
            else [f"merged_{column}" for column in self.columns]
        )

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method calls the `combine_dicts` function to handle and combine a list of `StepInput`.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with the combined `StepInput` using the `combine_dicts` function.
        """
        yield combine_dicts(
            *inputs,
            merge_keys=self.inputs,
            output_merge_keys=self.outputs,
        )

inputs: List[str] property

The inputs for the task are the column names in columns.

outputs: List[str] property

The outputs for the task are the column names in output_columns or merged_{column} for each column in columns.

process(*inputs)

The process method calls the combine_dicts function to handle and combine a list of StepInput.

Parameters:

Name Type Description Default
*inputs StepInput

A list of StepInput to be combined.

()

Yields:

Type Description
StepOutput

A StepOutput with the combined StepInput using the combine_dicts function.

Source code in src/distilabel/steps/combine.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method calls the `combine_dicts` function to handle and combine a list of `StepInput`.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with the combined `StepInput` using the `combine_dicts` function.
    """
    yield combine_dicts(
        *inputs,
        merge_keys=self.inputs,
        output_merge_keys=self.outputs,
    )

ExpandColumns

Bases: Step

Expand columns that contain lists into multiple rows.

ExpandColumns is a Step that takes a list of columns and expands them into multiple rows. The new rows will have the same data as the original row, except for the expanded column, which will contain a single item from the original list.

Attributes:

Name Type Description
columns Union[Dict[str, str], List[str]]

A dictionary that maps the column to be expanded to the new column name or a list of columns to be expanded. If a list is provided, the new column name will be the same as the column name.

Input columns
  • dynamic (determined by columns attribute): The columns to be expanded into multiple rows.
Output columns
  • dynamic (determined by columns attribute): The expanded columns.
Source code in src/distilabel/steps/expand.py
class ExpandColumns(Step):
    """Expand columns that contain lists into multiple rows.

    `ExpandColumns` is a `Step` that takes a list of columns and expands them into multiple
    rows. The new rows will have the same data as the original row, except for the expanded
    column, which will contain a single item from the original list.

    Attributes:
        columns: A dictionary that maps the column to be expanded to the new column name
            or a list of columns to be expanded. If a list is provided, the new column name
            will be the same as the column name.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to be expanded into
            multiple rows.

    Output columns:
        - dynamic (determined by `columns` attribute):  The expanded columns.
    """

    columns: Union[Dict[str, str], List[str]]

    @field_validator("columns")
    @classmethod
    def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
        """Ensure that the columns are always a dictionary.

        Args:
            value: The columns to be expanded.

        Returns:
            The columns to be expanded as a dictionary.
        """
        if isinstance(value, list):
            return {col: col for col in value}

        return value

    @property
    def inputs(self) -> List[str]:
        """The columns to be expanded."""
        return list(self.columns.keys())

    @property
    def outputs(self) -> List[str]:
        """The expanded columns."""
        return [
            new_column if new_column else expand_column
            for expand_column, new_column in self.columns.items()
        ]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Expand the columns in the input data.

        Args:
            inputs: The input data.

        Yields:
            The expanded rows.
        """
        yield [row for input in inputs for row in self._expand_columns(input)]

    def _expand_columns(self, input: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Expand the columns in the input data.

        Args:
            input: The input data.

        Returns:
            The expanded rows.
        """
        expanded_rows = []
        for expand_column, new_column in self.columns.items():  # type: ignore
            data = input.get(expand_column)
            rows = []
            for item, expanded in zip_longest(*[data, expanded_rows], fillvalue=input):
                rows.append({**expanded, new_column: item})
            expanded_rows = rows
        return expanded_rows

inputs: List[str] property

The columns to be expanded.

outputs: List[str] property

The expanded columns.

always_dict(value) classmethod

Ensure that the columns are always a dictionary.

Parameters:

Name Type Description Default
value Union[Dict[str, str], List[str]]

The columns to be expanded.

required

Returns:

Type Description
Dict[str, str]

The columns to be expanded as a dictionary.

Source code in src/distilabel/steps/expand.py
@field_validator("columns")
@classmethod
def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
    """Ensure that the columns are always a dictionary.

    Args:
        value: The columns to be expanded.

    Returns:
        The columns to be expanded as a dictionary.
    """
    if isinstance(value, list):
        return {col: col for col in value}

    return value

process(inputs)

Expand the columns in the input data.

Parameters:

Name Type Description Default
inputs StepInput

The input data.

required

Yields:

Type Description
StepOutput

The expanded rows.

Source code in src/distilabel/steps/expand.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Expand the columns in the input data.

    Args:
        inputs: The input data.

    Yields:
        The expanded rows.
    """
    yield [row for input in inputs for row in self._expand_columns(input)]

KeepColumns

Bases: Step

Keeps selected columns in the dataset.

KeepColumns is a Step that implements the process method that keeps only the columns specified in the columns attribute. Also KeepColumns provides an attribute columns to specify the columns to keep which will override the default value for the properties inputs and outputs.

Note

The order in which the columns are provided is important, as the output will be sorted using the provided order, which is useful before pushing either a dataset.Dataset via the PushToHub step or a distilabel.Distiset via the Pipeline.run output variable.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to keep.

Input columns
  • dynamic (determined by columns attribute): The columns to keep.
Output columns
  • dynamic (determined by columns attribute): The columns that were kept.
Source code in src/distilabel/steps/keep.py
class KeepColumns(Step):
    """Keeps selected columns in the dataset.

    `KeepColumns` is a `Step` that implements the `process` method that keeps only the columns
    specified in the `columns` attribute. Also `KeepColumns` provides an attribute `columns` to
    specify the columns to keep which will override the default value for the properties `inputs`
    and `outputs`.

    Note:
        The order in which the columns are provided is important, as the output will be sorted
        using the provided order, which is useful before pushing either a `dataset.Dataset` via
        the `PushToHub` step or a `distilabel.Distiset` via the `Pipeline.run` output variable.

    Attributes:
        columns: List of strings with the names of the columns to keep.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to keep.

    Output columns:
        - dynamic (determined by `columns` attribute): The columns that were kept.
    """

    columns: List[str]

    @property
    def inputs(self) -> List[str]:
        """The inputs for the task are the column names in `columns`."""
        return self.columns

    @property
    def outputs(self) -> List[str]:
        """The outputs for the task are the column names in `columns`."""
        return self.columns

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method keeps only the columns specified in the `columns` attribute.

        Args:
            *inputs: A list of dictionaries with the input data.

        Yields:
            A list of dictionaries with the output data.
        """
        for input in inputs:
            outputs = []
            for item in input:
                outputs.append({col: item[col] for col in self.columns})
            yield outputs

inputs: List[str] property

The inputs for the task are the column names in columns.

outputs: List[str] property

The outputs for the task are the column names in columns.

process(*inputs)

The process method keeps only the columns specified in the columns attribute.

Parameters:

Name Type Description Default
*inputs StepInput

A list of dictionaries with the input data.

()

Yields:

Type Description
StepOutput

A list of dictionaries with the output data.

Source code in src/distilabel/steps/keep.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method keeps only the columns specified in the `columns` attribute.

    Args:
        *inputs: A list of dictionaries with the input data.

    Yields:
        A list of dictionaries with the output data.
    """
    for input in inputs:
        outputs = []
        for item in input:
            outputs.append({col: item[col] for col in self.columns})
        yield outputs