Skip to content

Base

BasePipeline

Bases: _Serializable

Base class for a distilabel pipeline.

Attributes:

Name Type Description
name

The name of the pipeline.

description

A description of the pipeline.

dag

The DAG instance that represents the pipeline.

_cache_dir

The directory where the pipeline will be cached.

_logger

The logger instance that will be used by the pipeline.

_batch_manager Optional[_BatchManager]

The batch manager that will manage the batches received from the steps while running the pipeline.

Source code in src/distilabel/pipeline/base.py
class BasePipeline(_Serializable):
    """Base class for a `distilabel` pipeline.

    Attributes:
        name: The name of the pipeline.
        description: A description of the pipeline.
        dag: The `DAG` instance that represents the pipeline.
        _cache_dir: The directory where the pipeline will be cached.
        _logger: The logger instance that will be used by the pipeline.
        _batch_manager: The batch manager that will manage the batches received from the
            steps while running the pipeline.
    """

    def __init__(
        self,
        name: str,
        description: Optional[str] = None,
        cache_dir: Optional["PathLike"] = None,
        enable_metadata: bool = False,
    ) -> None:
        """Initialize the `BasePipeline` instance.

        Args:
            name: The name of the pipeline.
            description: A description of the pipeline. Defaults to `None`.
            cache_dir: A directory where the pipeline will be cached. Defaults to `None`.
            enable_metadata: Whether to include the distilabel metadata column for the pipeline
                in the final `Distiset`. It contains metadata used by distilabel, for example
                the raw outputs of the `LLM` without processing would be here, inside `raw_output_...`
                field. Defaults to `False`.
        """
        self.name = name
        self.description = description
        self._enable_metadata = enable_metadata
        self.dag = DAG()

        if cache_dir:
            self._cache_dir = Path(cache_dir)
        elif env_cache_dir := os.getenv("DISTILABEL_CACHE_DIR"):
            self._cache_dir = Path(env_cache_dir)
        else:
            self._cache_dir = BASE_CACHE_DIR

        self._logger = logging.getLogger("distilabel.pipeline")

        # It's set to None here, will be created in the call to run
        self._batch_manager: Optional["_BatchManager"] = None
        self._dry_run: bool = False

    def __enter__(self) -> Self:
        """Set the global pipeline instance when entering a pipeline context."""
        _GlobalPipelineManager.set_pipeline(self)
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Unset the global pipeline instance when exiting a pipeline context."""
        _GlobalPipelineManager.set_pipeline(None)

    def _create_signature(self) -> str:
        """Makes a signature (hash) of a pipeline, using the step ids and the adjacency between them.

        The main use is to find the pipeline in the cache folder.

        Returns:
            int: Signature of the pipeline.
        """
        hasher = hashlib.sha1()

        steps_info = []
        pipeline_dump = self.dump()["pipeline"]

        for step in pipeline_dump["steps"]:
            step_info = step["name"]
            for argument, value in sorted(step[STEP_ATTR_NAME].items()):
                if (argument == TYPE_INFO_KEY) or (value is None):
                    continue

                if isinstance(value, dict):
                    # input_mappings/output_mappings
                    step_info += "-".join(
                        [f"{str(k)}-{str(v)}" for k, v in value.items()]
                    )
                elif isinstance(value, (list, tuple)):
                    # runtime_parameters_info
                    step_info += "-".join([str(v) for v in value])
                elif isinstance(value, (int, str, float)):
                    # batch_size/name
                    step_info += str(value)
                else:
                    raise ValueError(
                        f"Field '{argument}' in step '{step['name']}' has type {type(value)}, explicitly cast the type to 'str'."
                    )

            steps_info.append(step_info)

        connections_info = [
            f"{c['from']}-{'-'.join(c['to'])}" for c in pipeline_dump["connections"]
        ]

        routing_batch_functions_info = []
        for function in pipeline_dump["routing_batch_functions"]:
            step = function["step"]
            routing_batch_function: "RoutingBatchFunction" = self.dag.get_step(step)[
                ROUTING_BATCH_FUNCTION_ATTR_NAME
            ]
            if type_info := routing_batch_function._get_type_info():
                step += f"-{type_info}"

        hasher.update(
            ",".join(
                steps_info + connections_info + routing_batch_functions_info
            ).encode()
        )

        return hasher.hexdigest()

    def run(
        self,
        parameters: Optional[Dict[str, Dict[str, Any]]] = None,
        use_cache: bool = True,
    ) -> "Distiset":  # type: ignore
        """Run the pipeline. It will set the runtime parameters for the steps and validate
        the pipeline.

        This method should be extended by the specific pipeline implementation,
        adding the logic to run the pipeline.

        Args:
            parameters: A dictionary with the step name as the key and a dictionary with
                the runtime parameters for the step as the value. Defaults to `None`.
            use_cache: Whether to use the cache from previous pipeline runs. Defaults to
                `True`.

        Returns:
            The `Distiset` created by the pipeline.
        """
        if use_cache:
            self._load_from_cache()
        self._set_runtime_parameters(parameters or {})
        self.dag.validate()

    def dry_run(
        self,
        parameters: Optional[Dict[str, Dict[str, Any]]] = None,
        batch_size: int = 1,
    ) -> "Distiset":
        """Do a dry run to test the pipeline runs as expected.

        Running a `Pipeline` in dry run mode will set all the `batch_size` of generator steps
        to the specified batch_size, and run just with a single batch, effectively
        running the whole pipeline with a single example. The cache will be set to False.

        Args:
            parameters: The same parameters variable from `BasePipeline.run`. Defaults to None.
                Will be passed to the parent method, but with the batch_size of the generator steps
                fixed to 1.
            batch_size: The batch size to test the pipeline. Defaults to 1.

        Returns:
            Will return the `Distiset` as the main run method would do.
        """
        self._dry_run = True

        for step_name in self.dag:
            step = self.dag.get_step(step_name)[STEP_ATTR_NAME]
            if step.is_generator:
                if parameters.get(step_name) and parameters[step_name].get(
                    "batch_size"
                ):
                    parameters[step_name]["batch_size"] = batch_size

        distiset = self.run(parameters, use_cache=False)

        self._dry_run = False
        return distiset

    def get_runtime_parameters_info(self) -> Dict[str, List[Dict[str, Any]]]:
        """Get the runtime parameters for the steps in the pipeline.

        Returns:
            A dictionary with the step name as the key and a list of dictionaries with
            the parameter name and the parameter info as the value.
        """
        runtime_parameters = {}
        for step_name in self.dag:
            step: "_Step" = self.dag.get_step(step_name)[STEP_ATTR_NAME]
            runtime_parameters[step_name] = step.get_runtime_parameters_info()
        return runtime_parameters

    def _add_step(self, step: "_Step") -> None:
        """Add a step to the pipeline.

        Args:
            step: The step to be added to the pipeline.
        """
        self.dag.add_step(step)

    def _add_edge(self, from_step: str, to_step: str) -> None:
        """Add an edge between two steps in the pipeline.

        Args:
            from_step: The name of the step that will generate the input for `to_step`.
            to_step: The name of the step that will receive the input from `from_step`.
        """
        self.dag.add_edge(from_step, to_step)

        # Check if `from_step` has a `routing_batch_function`. If it does, then mark
        # `to_step` as a step that will receive a routed batch.
        node = self.dag.get_step(from_step)  # type: ignore
        routing_batch_function = node.get(ROUTING_BATCH_FUNCTION_ATTR_NAME, None)
        self.dag.set_step_attr(
            name=to_step,
            attr=RECEIVES_ROUTED_BATCHES_ATTR_NAME,
            value=routing_batch_function is not None,
        )

    def _add_routing_batch_function(
        self, step_name: str, routing_batch_function: "RoutingBatchFunction"
    ) -> None:
        """Add a routing batch function to a step.

        Args:
            step_name: The name of the step that will receive the routed batch.
            routing_batch_function: The function that will route the batch to the step.
        """
        self.dag.set_step_attr(
            name=step_name,
            attr=ROUTING_BATCH_FUNCTION_ATTR_NAME,
            value=routing_batch_function,
        )

    def _set_runtime_parameters(self, parameters: Dict[str, Dict[str, Any]]) -> None:
        """Set the runtime parameters for the steps in the pipeline.

        Args:
            parameters: A dictionary with the step name as the key and a dictionary with
            the parameter name as the key and the parameter value as the value.
        """
        step_names = set(self.dag.G)
        for step_name, step_parameters in parameters.items():
            if step_name not in step_names:
                self._logger.warning(
                    f"❓ Step '{step_name}' provided in `Pipeline.run(parameters={{...}})` not found in the pipeline."
                    f" Available steps are: {step_names}."
                )
            else:
                step: "_Step" = self.dag.get_step(step_name)[STEP_ATTR_NAME]
                step.set_runtime_parameters(step_parameters)

    def _model_dump(self, obj: Any, **kwargs: Any) -> Dict[str, Any]:
        """Dumps the DAG content to a dict.

        Args:
            obj (Any): Unused, just kept to match the signature of the parent method.
            kwargs (Any): Unused, just kept to match the signature of the parent method.

        Returns:
            Dict[str, Any]: Internal representation of the DAG from networkx in a serializable format.
        """
        return self.dag.dump()

    def dump(self, **kwargs: Any) -> Dict[str, Any]:
        return {
            "distilabel": {"version": __version__},
            "pipeline": {
                "name": self.name,
                "description": self.description,
                **super().dump(),
            },
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> Self:
        """Create a Pipeline from a dict containing the serialized data.

        Note:
            It's intended for internal use.

        Args:
            data (Dict[str, Any]): Dictionary containing the serialized data from a Pipeline.

        Returns:
            BasePipeline: Pipeline recreated from the dictionary info.
        """
        name = data["pipeline"]["name"]
        description = data["pipeline"].get("description")
        with cls(name=name, description=description) as pipe:
            pipe.dag = DAG.from_dict(data["pipeline"])
        return pipe

    @property
    def _cache_location(self) -> _CacheLocation:
        """Dictionary containing the the object that will stored and the location,
        whether it is a filename or a folder.

        Returns:
            Path: Filenames where the pipeline content will be serialized.
        """
        folder = self._cache_dir / self.name / self._create_signature()
        return {
            "pipeline": folder / "pipeline.yaml",
            "batch_manager": folder / "batch_manager.json",
            "data": folder / "data",
            "log_file": folder / "pipeline.log",
        }

    def _cache(self) -> None:
        """Saves the `BasePipeline` using the `_cache_filename`."""
        self.save(
            path=self._cache_location["pipeline"],
            format=self._cache_location["pipeline"].suffix.replace(".", ""),  # type: ignore
        )
        if self._batch_manager is not None:
            self._batch_manager.save(
                self._cache_location["batch_manager"],
                format=self._cache_location["batch_manager"].suffix.replace(".", ""),  # type: ignore
            )
        self._logger.debug("Pipeline and batch manager saved to cache.")

    def _load_from_cache(self) -> None:
        """Will try to load the `BasePipeline` from the cache dir if found, updating
        the internal `DAG` and `_BatchManager`.
        """
        cache_loc = self._cache_location
        if cache_loc["pipeline"].exists():
            if cache_loc["batch_manager"].exists():
                self._batch_manager = _BatchManager.from_json(
                    cache_loc["batch_manager"]
                )
            self._logger.info("💾 Load pipeline from cache")

__enter__()

Set the global pipeline instance when entering a pipeline context.

Source code in src/distilabel/pipeline/base.py
def __enter__(self) -> Self:
    """Set the global pipeline instance when entering a pipeline context."""
    _GlobalPipelineManager.set_pipeline(self)
    return self

__exit__(exc_type, exc_value, traceback)

Unset the global pipeline instance when exiting a pipeline context.

Source code in src/distilabel/pipeline/base.py
def __exit__(self, exc_type, exc_value, traceback) -> None:
    """Unset the global pipeline instance when exiting a pipeline context."""
    _GlobalPipelineManager.set_pipeline(None)

__init__(name, description=None, cache_dir=None, enable_metadata=False)

Initialize the BasePipeline instance.

Parameters:

Name Type Description Default
name str

The name of the pipeline.

required
description Optional[str]

A description of the pipeline. Defaults to None.

None
cache_dir Optional[PathLike]

A directory where the pipeline will be cached. Defaults to None.

None
enable_metadata bool

Whether to include the distilabel metadata column for the pipeline in the final Distiset. It contains metadata used by distilabel, for example the raw outputs of the LLM without processing would be here, inside raw_output_... field. Defaults to False.

False
Source code in src/distilabel/pipeline/base.py
def __init__(
    self,
    name: str,
    description: Optional[str] = None,
    cache_dir: Optional["PathLike"] = None,
    enable_metadata: bool = False,
) -> None:
    """Initialize the `BasePipeline` instance.

    Args:
        name: The name of the pipeline.
        description: A description of the pipeline. Defaults to `None`.
        cache_dir: A directory where the pipeline will be cached. Defaults to `None`.
        enable_metadata: Whether to include the distilabel metadata column for the pipeline
            in the final `Distiset`. It contains metadata used by distilabel, for example
            the raw outputs of the `LLM` without processing would be here, inside `raw_output_...`
            field. Defaults to `False`.
    """
    self.name = name
    self.description = description
    self._enable_metadata = enable_metadata
    self.dag = DAG()

    if cache_dir:
        self._cache_dir = Path(cache_dir)
    elif env_cache_dir := os.getenv("DISTILABEL_CACHE_DIR"):
        self._cache_dir = Path(env_cache_dir)
    else:
        self._cache_dir = BASE_CACHE_DIR

    self._logger = logging.getLogger("distilabel.pipeline")

    # It's set to None here, will be created in the call to run
    self._batch_manager: Optional["_BatchManager"] = None
    self._dry_run: bool = False

dry_run(parameters=None, batch_size=1)

Do a dry run to test the pipeline runs as expected.

Running a Pipeline in dry run mode will set all the batch_size of generator steps to the specified batch_size, and run just with a single batch, effectively running the whole pipeline with a single example. The cache will be set to False.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Dict[str, Any]]]

The same parameters variable from BasePipeline.run. Defaults to None. Will be passed to the parent method, but with the batch_size of the generator steps fixed to 1.

None
batch_size int

The batch size to test the pipeline. Defaults to 1.

1

Returns:

Type Description
Distiset

Will return the Distiset as the main run method would do.

Source code in src/distilabel/pipeline/base.py
def dry_run(
    self,
    parameters: Optional[Dict[str, Dict[str, Any]]] = None,
    batch_size: int = 1,
) -> "Distiset":
    """Do a dry run to test the pipeline runs as expected.

    Running a `Pipeline` in dry run mode will set all the `batch_size` of generator steps
    to the specified batch_size, and run just with a single batch, effectively
    running the whole pipeline with a single example. The cache will be set to False.

    Args:
        parameters: The same parameters variable from `BasePipeline.run`. Defaults to None.
            Will be passed to the parent method, but with the batch_size of the generator steps
            fixed to 1.
        batch_size: The batch size to test the pipeline. Defaults to 1.

    Returns:
        Will return the `Distiset` as the main run method would do.
    """
    self._dry_run = True

    for step_name in self.dag:
        step = self.dag.get_step(step_name)[STEP_ATTR_NAME]
        if step.is_generator:
            if parameters.get(step_name) and parameters[step_name].get(
                "batch_size"
            ):
                parameters[step_name]["batch_size"] = batch_size

    distiset = self.run(parameters, use_cache=False)

    self._dry_run = False
    return distiset

from_dict(data) classmethod

Create a Pipeline from a dict containing the serialized data.

Note

It's intended for internal use.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary containing the serialized data from a Pipeline.

required

Returns:

Name Type Description
BasePipeline Self

Pipeline recreated from the dictionary info.

Source code in src/distilabel/pipeline/base.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Self:
    """Create a Pipeline from a dict containing the serialized data.

    Note:
        It's intended for internal use.

    Args:
        data (Dict[str, Any]): Dictionary containing the serialized data from a Pipeline.

    Returns:
        BasePipeline: Pipeline recreated from the dictionary info.
    """
    name = data["pipeline"]["name"]
    description = data["pipeline"].get("description")
    with cls(name=name, description=description) as pipe:
        pipe.dag = DAG.from_dict(data["pipeline"])
    return pipe

get_runtime_parameters_info()

Get the runtime parameters for the steps in the pipeline.

Returns:

Type Description
Dict[str, List[Dict[str, Any]]]

A dictionary with the step name as the key and a list of dictionaries with

Dict[str, List[Dict[str, Any]]]

the parameter name and the parameter info as the value.

Source code in src/distilabel/pipeline/base.py
def get_runtime_parameters_info(self) -> Dict[str, List[Dict[str, Any]]]:
    """Get the runtime parameters for the steps in the pipeline.

    Returns:
        A dictionary with the step name as the key and a list of dictionaries with
        the parameter name and the parameter info as the value.
    """
    runtime_parameters = {}
    for step_name in self.dag:
        step: "_Step" = self.dag.get_step(step_name)[STEP_ATTR_NAME]
        runtime_parameters[step_name] = step.get_runtime_parameters_info()
    return runtime_parameters

run(parameters=None, use_cache=True)

Run the pipeline. It will set the runtime parameters for the steps and validate the pipeline.

This method should be extended by the specific pipeline implementation, adding the logic to run the pipeline.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Dict[str, Any]]]

A dictionary with the step name as the key and a dictionary with the runtime parameters for the step as the value. Defaults to None.

None
use_cache bool

Whether to use the cache from previous pipeline runs. Defaults to True.

True

Returns:

Type Description
Distiset

The Distiset created by the pipeline.

Source code in src/distilabel/pipeline/base.py
def run(
    self,
    parameters: Optional[Dict[str, Dict[str, Any]]] = None,
    use_cache: bool = True,
) -> "Distiset":  # type: ignore
    """Run the pipeline. It will set the runtime parameters for the steps and validate
    the pipeline.

    This method should be extended by the specific pipeline implementation,
    adding the logic to run the pipeline.

    Args:
        parameters: A dictionary with the step name as the key and a dictionary with
            the runtime parameters for the step as the value. Defaults to `None`.
        use_cache: Whether to use the cache from previous pipeline runs. Defaults to
            `True`.

    Returns:
        The `Distiset` created by the pipeline.
    """
    if use_cache:
        self._load_from_cache()
    self._set_runtime_parameters(parameters or {})
    self.dag.validate()