Skip to content

Columns

This section contains the existing steps intended to be used for common column operations to apply to the batches.

ExpandColumns

Bases: Step

Expand columns that contain lists into multiple rows.

ExpandColumns is a Step that takes a list of columns and expands them into multiple rows. The new rows will have the same data as the original row, except for the expanded column, which will contain a single item from the original list.

Attributes:

Name Type Description
columns Union[Dict[str, str], List[str]]

A dictionary that maps the column to be expanded to the new column name or a list of columns to be expanded. If a list is provided, the new column name will be the same as the column name.

Input columns
  • dynamic (determined by columns attribute): The columns to be expanded into multiple rows.
Output columns
  • dynamic (determined by columns attribute): The expanded columns.

Examples:

Expand the selected columns into multiple rows:

```python
from distilabel.steps import ExpandColumns

expand_columns = ExpandColumns(
    columns=["generation"],
)
expand_columns.load()

result = next(
    expand_columns.process(
        [
            {
                "instruction": "instruction 1",
                "generation": ["generation 1", "generation 2"]}
        ],
    )
)
# >>> result
# [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]
```
Source code in src/distilabel/steps/columns/expand.py
class ExpandColumns(Step):
    """Expand columns that contain lists into multiple rows.

    `ExpandColumns` is a `Step` that takes a list of columns and expands them into multiple
    rows. The new rows will have the same data as the original row, except for the expanded
    column, which will contain a single item from the original list.

    Attributes:
        columns: A dictionary that maps the column to be expanded to the new column name
            or a list of columns to be expanded. If a list is provided, the new column name
            will be the same as the column name.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to be expanded into
            multiple rows.

    Output columns:
        - dynamic (determined by `columns` attribute):  The expanded columns.

    Examples:

        Expand the selected columns into multiple rows:

        ```python
        from distilabel.steps import ExpandColumns

        expand_columns = ExpandColumns(
            columns=["generation"],
        )
        expand_columns.load()

        result = next(
            expand_columns.process(
                [
                    {
                        "instruction": "instruction 1",
                        "generation": ["generation 1", "generation 2"]}
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]
        ```
    """

    columns: Union[Dict[str, str], List[str]]

    inputs: List[str] = Field(
        default_factory=list, description=["The columns to be expanded."]
    )
    outputs: List[str] = Field(
        default_factory=list, description=["The expanded columns."]
    )

    @override
    def model_post_init(self, __context: Any) -> None:
        """Override this method to perform additional initialization after `__init__` and `model_construct`.
        This is useful if you want to do some validation that requires the entire model to be initialized.
        """
        super().model_post_init(__context)

        self.inputs = list(self.columns.keys())
        self.outputs = [
            new_column if new_column else expand_column
            for expand_column, new_column in self.columns.items()
        ]

    @field_validator("columns")
    @classmethod
    def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
        """Ensure that the columns are always a dictionary.

        Args:
            value: The columns to be expanded.

        Returns:
            The columns to be expanded as a dictionary.
        """
        if isinstance(value, list):
            return {col: col for col in value}

        return value

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Expand the columns in the input data.

        Args:
            inputs: The input data.

        Yields:
            The expanded rows.
        """
        yield [row for input in inputs for row in self._expand_columns(input)]

    def _expand_columns(self, input: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Expand the columns in the input data.

        Args:
            input: The input data.

        Returns:
            The expanded rows.
        """
        expanded_rows = []
        for expand_column, new_column in self.columns.items():  # type: ignore
            data = input.get(expand_column)
            rows = []
            for item, expanded in zip_longest(*[data, expanded_rows], fillvalue=input):
                rows.append({**expanded, new_column: item})
            expanded_rows = rows
        return expanded_rows

always_dict(value) classmethod

Ensure that the columns are always a dictionary.

Parameters:

Name Type Description Default
value Union[Dict[str, str], List[str]]

The columns to be expanded.

required

Returns:

Type Description
Dict[str, str]

The columns to be expanded as a dictionary.

Source code in src/distilabel/steps/columns/expand.py
@field_validator("columns")
@classmethod
def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
    """Ensure that the columns are always a dictionary.

    Args:
        value: The columns to be expanded.

    Returns:
        The columns to be expanded as a dictionary.
    """
    if isinstance(value, list):
        return {col: col for col in value}

    return value

model_post_init(__context)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Source code in src/distilabel/steps/columns/expand.py
@override
def model_post_init(self, __context: Any) -> None:
    """Override this method to perform additional initialization after `__init__` and `model_construct`.
    This is useful if you want to do some validation that requires the entire model to be initialized.
    """
    super().model_post_init(__context)

    self.inputs = list(self.columns.keys())
    self.outputs = [
        new_column if new_column else expand_column
        for expand_column, new_column in self.columns.items()
    ]

process(inputs)

Expand the columns in the input data.

Parameters:

Name Type Description Default
inputs StepInput

The input data.

required

Yields:

Type Description
StepOutput

The expanded rows.

Source code in src/distilabel/steps/columns/expand.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Expand the columns in the input data.

    Args:
        inputs: The input data.

    Yields:
        The expanded rows.
    """
    yield [row for input in inputs for row in self._expand_columns(input)]

KeepColumns

Bases: Step

Keeps selected columns in the dataset.

KeepColumns is a Step that implements the process method that keeps only the columns specified in the columns attribute. Also KeepColumns provides an attribute columns to specify the columns to keep which will override the default value for the properties inputs and outputs.

Note

The order in which the columns are provided is important, as the output will be sorted using the provided order, which is useful before pushing either a dataset.Dataset via the PushToHub step or a distilabel.Distiset via the Pipeline.run output variable.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to keep.

Input columns
  • dynamic (determined by columns attribute): The columns to keep.
Output columns
  • dynamic (determined by columns attribute): The columns that were kept.

Examples:

Select the columns to keep:

```python
from distilabel.steps import KeepColumns

keep_columns = KeepColumns(
    columns=["instruction", "generation"],
)
keep_columns.load()

result = next(
    keep_columns.process(
        [{"instruction": "What's the brightest color?", "generation": "white", "model_name": "my_model"}],
    )
)
# >>> result
# [{'instruction': "What's the brightest color?", 'generation': 'white'}]
```
Source code in src/distilabel/steps/columns/keep.py
class KeepColumns(Step):
    """Keeps selected columns in the dataset.

    `KeepColumns` is a `Step` that implements the `process` method that keeps only the columns
    specified in the `columns` attribute. Also `KeepColumns` provides an attribute `columns` to
    specify the columns to keep which will override the default value for the properties `inputs`
    and `outputs`.

    Note:
        The order in which the columns are provided is important, as the output will be sorted
        using the provided order, which is useful before pushing either a `dataset.Dataset` via
        the `PushToHub` step or a `distilabel.Distiset` via the `Pipeline.run` output variable.

    Attributes:
        columns: List of strings with the names of the columns to keep.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to keep.

    Output columns:
        - dynamic (determined by `columns` attribute): The columns that were kept.

    Examples:

        Select the columns to keep:

        ```python
        from distilabel.steps import KeepColumns

        keep_columns = KeepColumns(
            columns=["instruction", "generation"],
        )
        keep_columns.load()

        result = next(
            keep_columns.process(
                [{"instruction": "What's the brightest color?", "generation": "white", "model_name": "my_model"}],
            )
        )
        # >>> result
        # [{'instruction': "What's the brightest color?", 'generation': 'white'}]
        ```
    """

    columns: List[str]

    inputs: List[str] = Field(
        default_factory=list,
        description="The inputs for the task are the column names in 'columns'.",
    )
    outputs: List[str] = Field(
        default_factory=list,
        description="The outputs for the task are the column names in 'columns'.",
    )

    @override
    def model_post_init(self, __context: Any) -> None:
        """Override this method to perform additional initialization after `__init__` and `model_construct`.
        This is useful if you want to do some validation that requires the entire model to be initialized.
        """
        super().model_post_init(__context)

        self.inputs = self.columns
        self.outputs = self.columns

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method keeps only the columns specified in the `columns` attribute.

        Args:
            *inputs: A list of dictionaries with the input data.

        Yields:
            A list of dictionaries with the output data.
        """
        for input in inputs:
            outputs = []
            for item in input:
                outputs.append({col: item[col] for col in self.columns})
            yield outputs

model_post_init(__context)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Source code in src/distilabel/steps/columns/keep.py
@override
def model_post_init(self, __context: Any) -> None:
    """Override this method to perform additional initialization after `__init__` and `model_construct`.
    This is useful if you want to do some validation that requires the entire model to be initialized.
    """
    super().model_post_init(__context)

    self.inputs = self.columns
    self.outputs = self.columns

process(*inputs)

The process method keeps only the columns specified in the columns attribute.

Parameters:

Name Type Description Default
*inputs StepInput

A list of dictionaries with the input data.

()

Yields:

Type Description
StepOutput

A list of dictionaries with the output data.

Source code in src/distilabel/steps/columns/keep.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method keeps only the columns specified in the `columns` attribute.

    Args:
        *inputs: A list of dictionaries with the input data.

    Yields:
        A list of dictionaries with the output data.
    """
    for input in inputs:
        outputs = []
        for item in input:
            outputs.append({col: item[col] for col in self.columns})
        yield outputs

MergeColumns

Bases: Step

Merge columns from a row.

MergeColumns is a Step that implements the process method that calls the merge_columns function to handle and combine columns in a StepInput. MergeColumns provides two attributes columns and output_column to specify the columns to merge and the resulting output column.

This step can be useful if you have a Task that generates instructions for example, and you want to have more examples of those. In such a case, you could for example use another Task to multiply your instructions synthetically, what would yield two different columns splitted. Using MergeColumns you can merge them and use them as a single column in your dataset for further processing.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to merge.

output_column Optional[str]

str name of the output column

Input columns
  • dynamic (determined by columns attribute): The columns to merge.
Output columns
  • dynamic (determined by columns and output_column attributes): The columns that were merged.

Examples:

Combine columns in rows of a dataset:

```python
from distilabel.steps import MergeColumns

combiner = MergeColumns(
    columns=["queries", "multiple_queries"],
    output_column="queries",
)
combiner.load()

result = next(
    combiner.process(
        [
            {
                "queries": "How are you?",
                "multiple_queries": ["What's up?", "Everything ok?"]
            }
        ],
    )
)
# >>> result
# [{'queries': ['How are you?', "What's up?", 'Everything ok?']}]
```
Source code in src/distilabel/steps/columns/merge.py
class MergeColumns(Step):
    """Merge columns from a row.

    `MergeColumns` is a `Step` that implements the `process` method that calls the `merge_columns`
    function to handle and combine columns in a `StepInput`. `MergeColumns` provides two attributes
    `columns` and `output_column` to specify the columns to merge and the resulting output column.

    This step can be useful if you have a `Task` that generates instructions for example, and you
    want to have more examples of those. In such a case, you could for example use another `Task`
    to multiply your instructions synthetically, what would yield two different columns splitted.
    Using `MergeColumns` you can merge them and use them as a single column in your dataset for
    further processing.

    Attributes:
        columns: List of strings with the names of the columns to merge.
        output_column: str name of the output column

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to merge.

    Output columns:
        - dynamic (determined by `columns` and `output_column` attributes): The columns
            that were merged.

    Examples:

        Combine columns in rows of a dataset:

        ```python
        from distilabel.steps import MergeColumns

        combiner = MergeColumns(
            columns=["queries", "multiple_queries"],
            output_column="queries",
        )
        combiner.load()

        result = next(
            combiner.process(
                [
                    {
                        "queries": "How are you?",
                        "multiple_queries": ["What's up?", "Everything ok?"]
                    }
                ],
            )
        )
        # >>> result
        # [{'queries': ['How are you?', "What's up?", 'Everything ok?']}]
        ```
    """

    columns: List[str]
    output_column: Optional[str] = None

    inputs: List[str] = Field(
        default_factory=list,
        description="The inputs for the task are the column names in 'columns'.",
    )
    outputs: List[str] = Field(
        default_factory=list,
        description="The outputs for the task are the column names in 'columns'.",
    )

    @override
    def model_post_init(self, __context: Any) -> None:
        """Override this method to perform additional initialization after `__init__` and `model_construct`.
        This is useful if you want to do some validation that requires the entire model to be initialized.
        """
        super().model_post_init(__context)

        self.inputs = self.columns
        self.outputs = [self.output_column] if self.output_column else ["merged_column"]

    @override
    def process(self, inputs: StepInput) -> "StepOutput":
        combined = []
        for input in inputs:
            combined.append(
                merge_columns(
                    input,
                    columns=self.columns,
                    new_column=self.outputs[0],
                )
            )
        yield combined

model_post_init(__context)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Source code in src/distilabel/steps/columns/merge.py
@override
def model_post_init(self, __context: Any) -> None:
    """Override this method to perform additional initialization after `__init__` and `model_construct`.
    This is useful if you want to do some validation that requires the entire model to be initialized.
    """
    super().model_post_init(__context)

    self.inputs = self.columns
    self.outputs = [self.output_column] if self.output_column else ["merged_column"]

GroupColumns

Bases: Step

Combines columns from a list of StepInput.

GroupColumns is a Step that implements the process method that calls the group_dicts function to handle and combine a list of StepInput. Also GroupColumns provides two attributes columns and output_columns to specify the columns to group and the output columns which will override the default value for the properties inputs and outputs, respectively.

Attributes:

Name Type Description
columns List[str]

List of strings with the names of the columns to group.

output_columns Optional[List[str]]

Optional list of strings with the names of the output columns.

Input columns
  • dynamic (determined by columns attribute): The columns to group.
Output columns
  • dynamic (determined by columns and output_columns attributes): The columns that were grouped.

Examples:

Combine columns of a dataset:

```python
from distilabel.steps import GroupColumns

group_columns = GroupColumns(
    name="group_columns",
    columns=["generation", "model_name"],
)
group_columns.load()

result = next(
    group_columns.process(
        [{"generation": "AI generated text"}, {"model_name": "my_model"}],
        [{"generation": "Other generated text", "model_name": "my_model"}]
    )
)
# >>> result
# [{'merged_generation': ['AI generated text', 'Other generated text'], 'merged_model_name': ['my_model']}]
```

Specify the name of the output columns:

```python
from distilabel.steps import GroupColumns

group_columns = GroupColumns(
    name="group_columns",
    columns=["generation", "model_name"],
    output_columns=["generations", "generation_models"]
)
group_columns.load()

result = next(
    group_columns.process(
        [{"generation": "AI generated text"}, {"model_name": "my_model"}],
        [{"generation": "Other generated text", "model_name": "my_model"}]
    )
)
# >>> result
#[{'generations': ['AI generated text', 'Other generated text'], 'generation_models': ['my_model']}]
```
Source code in src/distilabel/steps/columns/group.py
class GroupColumns(Step):
    """Combines columns from a list of `StepInput`.

    `GroupColumns` is a `Step` that implements the `process` method that calls the `group_dicts`
    function to handle and combine a list of `StepInput`. Also `GroupColumns` provides two attributes
    `columns` and `output_columns` to specify the columns to group and the output columns
    which will override the default value for the properties `inputs` and `outputs`, respectively.

    Attributes:
        columns: List of strings with the names of the columns to group.
        output_columns: Optional list of strings with the names of the output columns.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to group.

    Output columns:
        - dynamic (determined by `columns` and `output_columns` attributes): The columns
            that were grouped.

    Examples:

        Combine columns of a dataset:

        ```python
        from distilabel.steps import GroupColumns

        group_columns = GroupColumns(
            name="group_columns",
            columns=["generation", "model_name"],
        )
        group_columns.load()

        result = next(
            group_columns.process(
                [{"generation": "AI generated text"}, {"model_name": "my_model"}],
                [{"generation": "Other generated text", "model_name": "my_model"}]
            )
        )
        # >>> result
        # [{'merged_generation': ['AI generated text', 'Other generated text'], 'merged_model_name': ['my_model']}]
        ```

        Specify the name of the output columns:

        ```python
        from distilabel.steps import GroupColumns

        group_columns = GroupColumns(
            name="group_columns",
            columns=["generation", "model_name"],
            output_columns=["generations", "generation_models"]
        )
        group_columns.load()

        result = next(
            group_columns.process(
                [{"generation": "AI generated text"}, {"model_name": "my_model"}],
                [{"generation": "Other generated text", "model_name": "my_model"}]
            )
        )
        # >>> result
        #[{'generations': ['AI generated text', 'Other generated text'], 'generation_models': ['my_model']}]
        ```
    """

    columns: List[str]
    output_columns: Optional[List[str]] = None

    inputs: List[str] = Field(
        default_factory=list,
        description="The inputs for the task are the column names in 'columns'.",
    )
    outputs: List[str] = Field(
        default_factory=list,
        description="The outputs for the task are the column names in 'output_columns' or 'grouped_{column}' for each column in 'columns'.",
    )

    @override
    def model_post_init(self, __context: Any) -> None:
        """Override this method to perform additional initialization after `__init__` and `model_construct`.
        This is useful if you want to do some validation that requires the entire model to be initialized.
        """
        super().model_post_init(__context)

        self.inputs = self.columns
        self.outputs = (
            self.output_columns
            if self.output_columns is not None
            else [f"grouped_{column}" for column in self.columns]
        )

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method calls the `group_dicts` function to handle and combine a list of `StepInput`.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with the combined `StepInput` using the `group_dicts` function.
        """
        yield group_columns(
            *inputs,
            group_columns=self.inputs,
            output_group_columns=self.outputs,
        )

model_post_init(__context)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Source code in src/distilabel/steps/columns/group.py
@override
def model_post_init(self, __context: Any) -> None:
    """Override this method to perform additional initialization after `__init__` and `model_construct`.
    This is useful if you want to do some validation that requires the entire model to be initialized.
    """
    super().model_post_init(__context)

    self.inputs = self.columns
    self.outputs = (
        self.output_columns
        if self.output_columns is not None
        else [f"grouped_{column}" for column in self.columns]
    )

process(*inputs)

The process method calls the group_dicts function to handle and combine a list of StepInput.

Parameters:

Name Type Description Default
*inputs StepInput

A list of StepInput to be combined.

()

Yields:

Type Description
StepOutput

A StepOutput with the combined StepInput using the group_dicts function.

Source code in src/distilabel/steps/columns/group.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method calls the `group_dicts` function to handle and combine a list of `StepInput`.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with the combined `StepInput` using the `group_dicts` function.
    """
    yield group_columns(
        *inputs,
        group_columns=self.inputs,
        output_group_columns=self.outputs,
    )