Skip to content

Pipeline Utils

combine_dicts(*inputs, merge_keys, output_merge_keys=None)

Combines multiple list of dictionaries into a single list of dictionaries on the specified merge_keys. If output_merge_keys are provided, then it will also rename merge_keys.

Parameters:

Name Type Description Default
inputs StepInput

list of dictionaries to combine.

()
merge_keys List[str]

list of keys to merge on.

required
output_merge_keys Optional[List[str]]

list of keys to rename the merge keys to. Defaults to None.

None

Returns:

Type Description
StepInput

A list of dictionaries where the values of the merge_keys are combined into a

StepInput

list and renamed to output_merge_keys.

Source code in src/distilabel/pipeline/utils.py
def combine_dicts(
    *inputs: StepInput,
    merge_keys: List[str],
    output_merge_keys: Optional[List[str]] = None,
) -> StepInput:
    """Combines multiple list of dictionaries into a single list of dictionaries on the
    specified `merge_keys`. If `output_merge_keys` are provided, then it will also rename
    `merge_keys`.

    Args:
        inputs: list of dictionaries to combine.
        merge_keys: list of keys to merge on.
        output_merge_keys: list of keys to rename the merge keys to. Defaults to `None`.

    Returns:
        A list of dictionaries where the values of the `merge_keys` are combined into a
        list and renamed to `output_merge_keys`.
    """
    if output_merge_keys is not None and len(output_merge_keys) != len(merge_keys):
        raise ValueError(
            "The length of output_merge_keys must be the same as the length of merge_keys"
        )
    if output_merge_keys is None:
        output_merge_keys = [f"merged_{key}" for key in merge_keys]
    merge_keys_dict = dict(zip(merge_keys, output_merge_keys))

    result = []
    # Use zip to iterate over lists based on their index
    for dicts_at_index in zip(*inputs):
        combined_dict = {}
        # Iterate over dicts at the same index
        for d in dicts_at_index:
            # Iterate over key-value pairs in each dict
            for key, value in d.items():
                # If the key is in the merge_keys, append the value to the existing list
                if key in merge_keys_dict.keys():
                    combined_dict.setdefault(merge_keys_dict[key], []).append(value)
                # If the key is not in the merge_keys, create a new key-value pair
                else:
                    combined_dict[key] = value
        result.append(combined_dict)
    return result