Skip to content

Base

BasePipeline

Bases: _Serializable

Base class for a distilabel pipeline.

Attributes:

Name Type Description
name

The name of the pipeline.

description

A description of the pipeline.

dag

The DAG instance that represents the pipeline.

_cache_dir

The directory where the pipeline will be cached.

_logger

The logger instance that will be used by the pipeline.

_batch_manager Optional[_BatchManager]

The batch manager that will manage the batches received from the steps while running the pipeline.

Source code in src/distilabel/pipeline/base.py
class BasePipeline(_Serializable):
    """Base class for a `distilabel` pipeline.

    Attributes:
        name: The name of the pipeline.
        description: A description of the pipeline.
        dag: The `DAG` instance that represents the pipeline.
        _cache_dir: The directory where the pipeline will be cached.
        _logger: The logger instance that will be used by the pipeline.
        _batch_manager: The batch manager that will manage the batches received from the
            steps while running the pipeline.
    """

    def __init__(
        self,
        name: str,
        description: Optional[str] = None,
        cache_dir: Optional["PathLike"] = None,
    ) -> None:
        """Initialize the `BasePipeline` instance.

        Args:
            name: The name of the pipeline.
            description: A description of the pipeline. Defaults to `None`.
            cache_dir: A directory where the pipeline will be cached. Defaults to `None`.
        """
        self.name = name
        self.description = description
        self.dag = DAG()

        if cache_dir:
            self._cache_dir = Path(cache_dir)
        elif env_cache_dir := os.getenv("DISTILABEL_CACHE_DIR"):
            self._cache_dir = Path(env_cache_dir)
        else:
            self._cache_dir = BASE_CACHE_DIR

        self._logger = logging.getLogger("distilabel.pipeline")

        # It's set to None here, will be created in the call to run
        self._batch_manager: Optional["_BatchManager"] = None

    def __enter__(self) -> Self:
        """Set the global pipeline instance when entering a pipeline context."""
        _GlobalPipelineManager.set_pipeline(self)
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Unset the global pipeline instance when exiting a pipeline context."""
        _GlobalPipelineManager.set_pipeline(None)

    def _create_signature(self) -> str:
        """Makes a signature (hash) of a pipeline, using the step ids and the adjacency between them.

        The main use is to find the pipeline in the cache folder.

        Returns:
            int: Signature of the pipeline.
        """
        hasher = hashlib.sha1()

        steps_info = []
        pipeline_dump = self.dump()["pipeline"]
        for step in pipeline_dump["steps"]:
            step_info = step["name"]
            for argument, value in sorted(step["step"].items()):
                if (
                    (argument == TYPE_INFO_KEY)
                    or (argument == "llm")
                    or (value is None)
                ):
                    # NOTE: Should we include the LLM info at this stage??
                    continue

                if isinstance(value, dict):
                    # input_mappings/output_mappings
                    step_info += "-".join(
                        [f"{str(k)}-{str(v)}" for k, v in value.items()]
                    )
                elif isinstance(value, (list, tuple)):
                    # runtime_parameters_info
                    step_info += "-".join([str(v) for v in value])
                elif isinstance(value, (int, str, float)):
                    # batch_size/name
                    step_info += str(value)
                else:
                    raise ValueError(
                        f"Field '{argument}' in step '{step['name']}' has type {type(value)}, explicitly cast the type to 'str'."
                    )

            steps_info.append(step_info)

        connections_info = [
            f"{c['from']}-{'-'.join(c['to'])}" for c in pipeline_dump["connections"]
        ]
        hasher.update(",".join(steps_info + connections_info).encode())

        return hasher.hexdigest()

    def run(
        self,
        parameters: Optional[Dict[str, Dict[str, Any]]] = None,
        use_cache: bool = True,
    ) -> "Distiset":  # type: ignore
        """Run the pipeline. It will set the runtime parameters for the steps and validate
        the pipeline.

        This method should be extended by the specific pipeline implementation,
        adding the logic to run the pipeline.

        Args:
            parameters: A dictionary with the step name as the key and a dictionary with
                the runtime parameters for the step as the value. Defaults to `None`.
            use_cache: Whether to use the cache from previous pipeline runs. Defaults to
                `True`.

        Returns:
            The `Distiset` created by the pipeline.
        """
        if use_cache:
            self._load_from_cache()
        self._set_runtime_parameters(parameters or {})
        self.dag.validate()

    def get_runtime_parameters_info(self) -> Dict[str, List[Dict[str, Any]]]:
        """Get the runtime parameters for the steps in the pipeline.

        Returns:
            A dictionary with the step name as the key and a list of dictionaries with
            the parameter name and the parameter info as the value.
        """
        runtime_parameters = {}
        for step_name in self.dag:
            step: "_Step" = self.dag.get_step(step_name)["step"]
            runtime_parameters[step_name] = step.get_runtime_parameters_info()
        return runtime_parameters

    def _add_step(self, step: "_Step") -> None:
        """Add a step to the pipeline.

        Args:
            step: The step to be added to the pipeline.
        """
        self.dag.add_step(step)

    def _add_edge(self, from_step: str, to_step: str) -> None:
        """Add an edge between two steps in the pipeline.

        Args:
            from_step: The name of the step that will generate the input for `to_step`.
            to_step: The name of the step that will receive the input from `from_step`.
        """
        self.dag.add_edge(from_step, to_step)

    def _set_runtime_parameters(self, parameters: Dict[str, Dict[str, Any]]) -> None:
        """Set the runtime parameters for the steps in the pipeline.

        Args:
            parameters: A dictionary with the step name as the key and a dictionary with
            the parameter name as the key and the parameter value as the value.
        """
        for step_name, step_parameters in parameters.items():
            step: "_Step" = self.dag.get_step(step_name)["step"]
            step.set_runtime_parameters(step_parameters)

    def _model_dump(self, obj: Any, **kwargs: Any) -> Dict[str, Any]:
        """Dumps the DAG content to a dict.

        Args:
            obj (Any): Unused, just kept to match the signature of the parent method.
            kwargs (Any): Unused, just kept to match the signature of the parent method.

        Returns:
            Dict[str, Any]: Internal representation of the DAG from networkx in a serializable format.
        """
        return self.dag.dump()

    def dump(self, **kwargs: Any) -> Dict[str, Any]:
        return {
            "distilabel": {"version": __version__},
            "pipeline": {
                "name": self.name,
                "description": self.description,
                **super().dump(),
            },
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> Self:
        """Create a Pipeline from a dict containing the serialized data.

        Note:
            It's intended for internal use.

        Args:
            data (Dict[str, Any]): Dictionary containing the serialized data from a Pipeline.

        Returns:
            BasePipeline: Pipeline recreated from the dictionary info.
        """
        name = data["pipeline"]["name"]
        description = data["pipeline"].get("description")
        with cls(name=name, description=description) as pipe:
            pipe.dag = DAG.from_dict(data["pipeline"])
        return pipe

    @property
    def _cache_location(self) -> CacheLocation:
        """Dictionary containing the the object that will stored and the location,
        whether it is a filename or a folder.

        Returns:
            Path: Filenames where the pipeline content will be serialized.
        """
        folder = self._cache_dir / self._create_signature()
        return {
            "pipeline": folder / "pipeline.yaml",
            "batch_manager": folder / "batch_manager.json",
            "data": folder / "data",
        }

    def _cache(self) -> None:
        """Saves the `BasePipeline` using the `_cache_filename`."""
        self.save(
            path=self._cache_location["pipeline"],
            format=self._cache_location["pipeline"].suffix.replace(".", ""),
        )
        if self._batch_manager is not None:
            self._batch_manager.save(
                self._cache_location["batch_manager"],
                format=self._cache_location["batch_manager"].suffix.replace(".", ""),
            )
        self._logger.debug("Pipeline and batch manager saved to cache.")

    def _load_from_cache(self) -> None:
        """Will try to load the `BasePipeline` from the cache dir if found, updating
        the internal `DAG` and `_BatchManager`.
        """
        cache_loc = self._cache_location
        if cache_loc["pipeline"].exists():
            # Refresh the DAG to avoid errors when it's created within a context manager
            # (it will check the steps aren't already defined for the DAG).
            self.dag = DAG()
            new_class = self.from_yaml(cache_loc["pipeline"])
            # Update the internal dag and batch_manager
            self.dag.G = new_class.dag.G
            if cache_loc["batch_manager"].exists():
                self._batch_manager = _BatchManager.from_json(
                    cache_loc["batch_manager"]
                )
            self._logger.info("💾 Load pipeline from cache")

__enter__()

Set the global pipeline instance when entering a pipeline context.

Source code in src/distilabel/pipeline/base.py
def __enter__(self) -> Self:
    """Set the global pipeline instance when entering a pipeline context."""
    _GlobalPipelineManager.set_pipeline(self)
    return self

__exit__(exc_type, exc_value, traceback)

Unset the global pipeline instance when exiting a pipeline context.

Source code in src/distilabel/pipeline/base.py
def __exit__(self, exc_type, exc_value, traceback) -> None:
    """Unset the global pipeline instance when exiting a pipeline context."""
    _GlobalPipelineManager.set_pipeline(None)

__init__(name, description=None, cache_dir=None)

Initialize the BasePipeline instance.

Parameters:

Name Type Description Default
name str

The name of the pipeline.

required
description Optional[str]

A description of the pipeline. Defaults to None.

None
cache_dir Optional[PathLike]

A directory where the pipeline will be cached. Defaults to None.

None
Source code in src/distilabel/pipeline/base.py
def __init__(
    self,
    name: str,
    description: Optional[str] = None,
    cache_dir: Optional["PathLike"] = None,
) -> None:
    """Initialize the `BasePipeline` instance.

    Args:
        name: The name of the pipeline.
        description: A description of the pipeline. Defaults to `None`.
        cache_dir: A directory where the pipeline will be cached. Defaults to `None`.
    """
    self.name = name
    self.description = description
    self.dag = DAG()

    if cache_dir:
        self._cache_dir = Path(cache_dir)
    elif env_cache_dir := os.getenv("DISTILABEL_CACHE_DIR"):
        self._cache_dir = Path(env_cache_dir)
    else:
        self._cache_dir = BASE_CACHE_DIR

    self._logger = logging.getLogger("distilabel.pipeline")

    # It's set to None here, will be created in the call to run
    self._batch_manager: Optional["_BatchManager"] = None

from_dict(data) classmethod

Create a Pipeline from a dict containing the serialized data.

Note

It's intended for internal use.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary containing the serialized data from a Pipeline.

required

Returns:

Name Type Description
BasePipeline Self

Pipeline recreated from the dictionary info.

Source code in src/distilabel/pipeline/base.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Self:
    """Create a Pipeline from a dict containing the serialized data.

    Note:
        It's intended for internal use.

    Args:
        data (Dict[str, Any]): Dictionary containing the serialized data from a Pipeline.

    Returns:
        BasePipeline: Pipeline recreated from the dictionary info.
    """
    name = data["pipeline"]["name"]
    description = data["pipeline"].get("description")
    with cls(name=name, description=description) as pipe:
        pipe.dag = DAG.from_dict(data["pipeline"])
    return pipe

get_runtime_parameters_info()

Get the runtime parameters for the steps in the pipeline.

Returns:

Type Description
Dict[str, List[Dict[str, Any]]]

A dictionary with the step name as the key and a list of dictionaries with

Dict[str, List[Dict[str, Any]]]

the parameter name and the parameter info as the value.

Source code in src/distilabel/pipeline/base.py
def get_runtime_parameters_info(self) -> Dict[str, List[Dict[str, Any]]]:
    """Get the runtime parameters for the steps in the pipeline.

    Returns:
        A dictionary with the step name as the key and a list of dictionaries with
        the parameter name and the parameter info as the value.
    """
    runtime_parameters = {}
    for step_name in self.dag:
        step: "_Step" = self.dag.get_step(step_name)["step"]
        runtime_parameters[step_name] = step.get_runtime_parameters_info()
    return runtime_parameters

run(parameters=None, use_cache=True)

Run the pipeline. It will set the runtime parameters for the steps and validate the pipeline.

This method should be extended by the specific pipeline implementation, adding the logic to run the pipeline.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Dict[str, Any]]]

A dictionary with the step name as the key and a dictionary with the runtime parameters for the step as the value. Defaults to None.

None
use_cache bool

Whether to use the cache from previous pipeline runs. Defaults to True.

True

Returns:

Type Description
Distiset

The Distiset created by the pipeline.

Source code in src/distilabel/pipeline/base.py
def run(
    self,
    parameters: Optional[Dict[str, Dict[str, Any]]] = None,
    use_cache: bool = True,
) -> "Distiset":  # type: ignore
    """Run the pipeline. It will set the runtime parameters for the steps and validate
    the pipeline.

    This method should be extended by the specific pipeline implementation,
    adding the logic to run the pipeline.

    Args:
        parameters: A dictionary with the step name as the key and a dictionary with
            the runtime parameters for the step as the value. Defaults to `None`.
        use_cache: Whether to use the cache from previous pipeline runs. Defaults to
            `True`.

    Returns:
        The `Distiset` created by the pipeline.
    """
    if use_cache:
        self._load_from_cache()
    self._set_runtime_parameters(parameters or {})
    self.dag.validate()

CacheLocation

Bases: TypedDict

Dictionary to store the filenames and directories of a cached pipeline.

Source code in src/distilabel/pipeline/base.py
class CacheLocation(TypedDict):
    """Dictionary to store the filenames and directories of a cached pipeline."""

    pipeline: Path
    batch_manager: Path
    data: Path