Skip to content

Pipeline

This section contains the API reference for the distilabel pipelines. For an example on how to use the pipelines, see the Tutorial - Pipeline.

BasePipeline

Bases: _Serializable

Base class for a distilabel pipeline.

Attributes:

Name Type Description
name

The name of the pipeline.

description

A description of the pipeline.

dag

The DAG instance that represents the pipeline.

_cache_dir

The directory where the pipeline will be cached.

_logger

The logger instance that will be used by the pipeline.

_batch_manager Optional[_BatchManager]

The batch manager that will manage the batches received from the steps while running the pipeline.

Source code in src/distilabel/pipeline/base.py
class BasePipeline(_Serializable):
    """Base class for a `distilabel` pipeline.

    Attributes:
        name: The name of the pipeline.
        description: A description of the pipeline.
        dag: The `DAG` instance that represents the pipeline.
        _cache_dir: The directory where the pipeline will be cached.
        _logger: The logger instance that will be used by the pipeline.
        _batch_manager: The batch manager that will manage the batches received from the
            steps while running the pipeline.
    """

    def __init__(
        self,
        name: str,
        description: Optional[str] = None,
        cache_dir: Optional["PathLike"] = None,
        enable_metadata: bool = False,
    ) -> None:
        """Initialize the `BasePipeline` instance.

        Args:
            name: The name of the pipeline.
            description: A description of the pipeline. Defaults to `None`.
            cache_dir: A directory where the pipeline will be cached. Defaults to `None`.
            enable_metadata: Whether to include the distilabel metadata column for the pipeline
                in the final `Distiset`. It contains metadata used by distilabel, for example
                the raw outputs of the `LLM` without processing would be here, inside `raw_output_...`
                field. Defaults to `False`.
        """
        self.name = name
        self.description = description
        self._enable_metadata = enable_metadata
        self.dag = DAG()

        if cache_dir:
            self._cache_dir = Path(cache_dir)
        elif env_cache_dir := os.getenv("DISTILABEL_CACHE_DIR"):
            self._cache_dir = Path(env_cache_dir)
        else:
            self._cache_dir = BASE_CACHE_DIR

        self._logger = logging.getLogger("distilabel.pipeline")

        # It's set to None here, will be created in the call to run
        self._batch_manager: Optional["_BatchManager"] = None
        self._dry_run: bool = False

    def __enter__(self) -> Self:
        """Set the global pipeline instance when entering a pipeline context."""
        _GlobalPipelineManager.set_pipeline(self)
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Unset the global pipeline instance when exiting a pipeline context."""
        _GlobalPipelineManager.set_pipeline(None)

    def _create_signature(self) -> str:
        """Makes a signature (hash) of a pipeline, using the step ids and the adjacency between them.

        The main use is to find the pipeline in the cache folder.

        Returns:
            int: Signature of the pipeline.
        """
        hasher = hashlib.sha1()

        steps_info = []
        pipeline_dump = self.dump()["pipeline"]

        for step in pipeline_dump["steps"]:
            step_info = step["name"]
            for argument, value in sorted(step[STEP_ATTR_NAME].items()):
                if (argument == TYPE_INFO_KEY) or (value is None):
                    continue

                if isinstance(value, dict):
                    # input_mappings/output_mappings
                    step_info += "-".join(
                        [f"{str(k)}-{str(v)}" for k, v in value.items()]
                    )
                elif isinstance(value, (list, tuple)):
                    # runtime_parameters_info
                    step_info += "-".join([str(v) for v in value])
                elif isinstance(value, (int, str, float)):
                    # batch_size/name
                    step_info += str(value)
                else:
                    raise ValueError(
                        f"Field '{argument}' in step '{step['name']}' has type {type(value)}, explicitly cast the type to 'str'."
                    )

            steps_info.append(step_info)

        connections_info = [
            f"{c['from']}-{'-'.join(c['to'])}" for c in pipeline_dump["connections"]
        ]

        routing_batch_functions_info = []
        for function in pipeline_dump["routing_batch_functions"]:
            step = function["step"]
            routing_batch_function: "RoutingBatchFunction" = self.dag.get_step(step)[
                ROUTING_BATCH_FUNCTION_ATTR_NAME
            ]
            if type_info := routing_batch_function._get_type_info():
                step += f"-{type_info}"

        hasher.update(
            ",".join(
                steps_info + connections_info + routing_batch_functions_info
            ).encode()
        )

        return hasher.hexdigest()

    def run(
        self,
        parameters: Optional[Dict[str, Dict[str, Any]]] = None,
        use_cache: bool = True,
    ) -> "Distiset":  # type: ignore
        """Run the pipeline. It will set the runtime parameters for the steps and validate
        the pipeline.

        This method should be extended by the specific pipeline implementation,
        adding the logic to run the pipeline.

        Args:
            parameters: A dictionary with the step name as the key and a dictionary with
                the runtime parameters for the step as the value. Defaults to `None`.
            use_cache: Whether to use the cache from previous pipeline runs. Defaults to
                `True`.

        Returns:
            The `Distiset` created by the pipeline.
        """
        if use_cache:
            self._load_from_cache()
        self._set_runtime_parameters(parameters or {})
        self.dag.validate()

    def dry_run(
        self,
        parameters: Optional[Dict[str, Dict[str, Any]]] = None,
        batch_size: int = 1,
    ) -> "Distiset":
        """Do a dry run to test the pipeline runs as expected.

        Running a `Pipeline` in dry run mode will set all the `batch_size` of generator steps
        to the specified batch_size, and run just with a single batch, effectively
        running the whole pipeline with a single example. The cache will be set to False.

        Args:
            parameters: The same parameters variable from `BasePipeline.run`. Defaults to None.
                Will be passed to the parent method, but with the batch_size of the generator steps
                fixed to 1.
            batch_size: The batch size to test the pipeline. Defaults to 1.

        Returns:
            Will return the `Distiset` as the main run method would do.
        """
        self._dry_run = True

        for step_name in self.dag:
            step = self.dag.get_step(step_name)[STEP_ATTR_NAME]

            if step.is_generator:
                if not parameters:
                    parameters = {}
                parameters[step_name] = {"batch_size": batch_size}

        distiset = self.run(parameters=parameters, use_cache=False)

        self._dry_run = False
        return distiset

    def get_runtime_parameters_info(self) -> Dict[str, List[Dict[str, Any]]]:
        """Get the runtime parameters for the steps in the pipeline.

        Returns:
            A dictionary with the step name as the key and a list of dictionaries with
            the parameter name and the parameter info as the value.
        """
        runtime_parameters = {}
        for step_name in self.dag:
            step: "_Step" = self.dag.get_step(step_name)[STEP_ATTR_NAME]
            runtime_parameters[step_name] = step.get_runtime_parameters_info()
        return runtime_parameters

    def _add_step(self, step: "_Step") -> None:
        """Add a step to the pipeline.

        Args:
            step: The step to be added to the pipeline.
        """
        self.dag.add_step(step)

    def _add_edge(self, from_step: str, to_step: str) -> None:
        """Add an edge between two steps in the pipeline.

        Args:
            from_step: The name of the step that will generate the input for `to_step`.
            to_step: The name of the step that will receive the input from `from_step`.
        """
        self.dag.add_edge(from_step, to_step)

        # Check if `from_step` has a `routing_batch_function`. If it does, then mark
        # `to_step` as a step that will receive a routed batch.
        node = self.dag.get_step(from_step)  # type: ignore
        routing_batch_function = node.get(ROUTING_BATCH_FUNCTION_ATTR_NAME, None)
        self.dag.set_step_attr(
            name=to_step,
            attr=RECEIVES_ROUTED_BATCHES_ATTR_NAME,
            value=routing_batch_function is not None,
        )

    def _add_routing_batch_function(
        self, step_name: str, routing_batch_function: "RoutingBatchFunction"
    ) -> None:
        """Add a routing batch function to a step.

        Args:
            step_name: The name of the step that will receive the routed batch.
            routing_batch_function: The function that will route the batch to the step.
        """
        self.dag.set_step_attr(
            name=step_name,
            attr=ROUTING_BATCH_FUNCTION_ATTR_NAME,
            value=routing_batch_function,
        )

    def _set_runtime_parameters(self, parameters: Dict[str, Dict[str, Any]]) -> None:
        """Set the runtime parameters for the steps in the pipeline.

        Args:
            parameters: A dictionary with the step name as the key and a dictionary with
            the parameter name as the key and the parameter value as the value.
        """
        step_names = set(self.dag.G)
        for step_name, step_parameters in parameters.items():
            if step_name not in step_names:
                self._logger.warning(
                    f"❓ Step '{step_name}' provided in `Pipeline.run(parameters={{...}})` not found in the pipeline."
                    f" Available steps are: {step_names}."
                )
            else:
                step: "_Step" = self.dag.get_step(step_name)[STEP_ATTR_NAME]
                step.set_runtime_parameters(step_parameters)

    def _model_dump(self, obj: Any, **kwargs: Any) -> Dict[str, Any]:
        """Dumps the DAG content to a dict.

        Args:
            obj (Any): Unused, just kept to match the signature of the parent method.
            kwargs (Any): Unused, just kept to match the signature of the parent method.

        Returns:
            Dict[str, Any]: Internal representation of the DAG from networkx in a serializable format.
        """
        return self.dag.dump()

    def dump(self, **kwargs: Any) -> Dict[str, Any]:
        return {
            "distilabel": {"version": __version__},
            "pipeline": {
                "name": self.name,
                "description": self.description,
                **super().dump(),
            },
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> Self:
        """Create a Pipeline from a dict containing the serialized data.

        Note:
            It's intended for internal use.

        Args:
            data (Dict[str, Any]): Dictionary containing the serialized data from a Pipeline.

        Returns:
            BasePipeline: Pipeline recreated from the dictionary info.
        """
        name = data["pipeline"]["name"]
        description = data["pipeline"].get("description")
        with cls(name=name, description=description) as pipe:
            pipe.dag = DAG.from_dict(data["pipeline"])
        return pipe

    @property
    def _cache_location(self) -> _CacheLocation:
        """Dictionary containing the the object that will stored and the location,
        whether it is a filename or a folder.

        Returns:
            Path: Filenames where the pipeline content will be serialized.
        """
        folder = self._cache_dir / self.name / self._create_signature()
        return {
            "pipeline": folder / "pipeline.yaml",
            "batch_manager": folder / "batch_manager.json",
            "data": folder / "data",
            "log_file": folder / "pipeline.log",
        }

    def _cache(self) -> None:
        """Saves the `BasePipeline` using the `_cache_filename`."""
        self.save(
            path=self._cache_location["pipeline"],
            format=self._cache_location["pipeline"].suffix.replace(".", ""),  # type: ignore
        )
        if self._batch_manager is not None:
            self._batch_manager.save(
                self._cache_location["batch_manager"],
                format=self._cache_location["batch_manager"].suffix.replace(".", ""),  # type: ignore
            )
        self._logger.debug("Pipeline and batch manager saved to cache.")

    def _load_from_cache(self) -> None:
        """Will try to load the `BasePipeline` from the cache dir if found, updating
        the internal `DAG` and `_BatchManager`.
        """
        cache_loc = self._cache_location
        if cache_loc["pipeline"].exists():
            if cache_loc["batch_manager"].exists():
                self._batch_manager = _BatchManager.from_json(
                    cache_loc["batch_manager"]
                )
            self._logger.info("💾 Load pipeline from cache")

__enter__()

Set the global pipeline instance when entering a pipeline context.

Source code in src/distilabel/pipeline/base.py
def __enter__(self) -> Self:
    """Set the global pipeline instance when entering a pipeline context."""
    _GlobalPipelineManager.set_pipeline(self)
    return self

__exit__(exc_type, exc_value, traceback)

Unset the global pipeline instance when exiting a pipeline context.

Source code in src/distilabel/pipeline/base.py
def __exit__(self, exc_type, exc_value, traceback) -> None:
    """Unset the global pipeline instance when exiting a pipeline context."""
    _GlobalPipelineManager.set_pipeline(None)

__init__(name, description=None, cache_dir=None, enable_metadata=False)

Initialize the BasePipeline instance.

Parameters:

Name Type Description Default
name str

The name of the pipeline.

required
description Optional[str]

A description of the pipeline. Defaults to None.

None
cache_dir Optional[PathLike]

A directory where the pipeline will be cached. Defaults to None.

None
enable_metadata bool

Whether to include the distilabel metadata column for the pipeline in the final Distiset. It contains metadata used by distilabel, for example the raw outputs of the LLM without processing would be here, inside raw_output_... field. Defaults to False.

False
Source code in src/distilabel/pipeline/base.py
def __init__(
    self,
    name: str,
    description: Optional[str] = None,
    cache_dir: Optional["PathLike"] = None,
    enable_metadata: bool = False,
) -> None:
    """Initialize the `BasePipeline` instance.

    Args:
        name: The name of the pipeline.
        description: A description of the pipeline. Defaults to `None`.
        cache_dir: A directory where the pipeline will be cached. Defaults to `None`.
        enable_metadata: Whether to include the distilabel metadata column for the pipeline
            in the final `Distiset`. It contains metadata used by distilabel, for example
            the raw outputs of the `LLM` without processing would be here, inside `raw_output_...`
            field. Defaults to `False`.
    """
    self.name = name
    self.description = description
    self._enable_metadata = enable_metadata
    self.dag = DAG()

    if cache_dir:
        self._cache_dir = Path(cache_dir)
    elif env_cache_dir := os.getenv("DISTILABEL_CACHE_DIR"):
        self._cache_dir = Path(env_cache_dir)
    else:
        self._cache_dir = BASE_CACHE_DIR

    self._logger = logging.getLogger("distilabel.pipeline")

    # It's set to None here, will be created in the call to run
    self._batch_manager: Optional["_BatchManager"] = None
    self._dry_run: bool = False

dry_run(parameters=None, batch_size=1)

Do a dry run to test the pipeline runs as expected.

Running a Pipeline in dry run mode will set all the batch_size of generator steps to the specified batch_size, and run just with a single batch, effectively running the whole pipeline with a single example. The cache will be set to False.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Dict[str, Any]]]

The same parameters variable from BasePipeline.run. Defaults to None. Will be passed to the parent method, but with the batch_size of the generator steps fixed to 1.

None
batch_size int

The batch size to test the pipeline. Defaults to 1.

1

Returns:

Type Description
Distiset

Will return the Distiset as the main run method would do.

Source code in src/distilabel/pipeline/base.py
def dry_run(
    self,
    parameters: Optional[Dict[str, Dict[str, Any]]] = None,
    batch_size: int = 1,
) -> "Distiset":
    """Do a dry run to test the pipeline runs as expected.

    Running a `Pipeline` in dry run mode will set all the `batch_size` of generator steps
    to the specified batch_size, and run just with a single batch, effectively
    running the whole pipeline with a single example. The cache will be set to False.

    Args:
        parameters: The same parameters variable from `BasePipeline.run`. Defaults to None.
            Will be passed to the parent method, but with the batch_size of the generator steps
            fixed to 1.
        batch_size: The batch size to test the pipeline. Defaults to 1.

    Returns:
        Will return the `Distiset` as the main run method would do.
    """
    self._dry_run = True

    for step_name in self.dag:
        step = self.dag.get_step(step_name)[STEP_ATTR_NAME]

        if step.is_generator:
            if not parameters:
                parameters = {}
            parameters[step_name] = {"batch_size": batch_size}

    distiset = self.run(parameters=parameters, use_cache=False)

    self._dry_run = False
    return distiset

from_dict(data) classmethod

Create a Pipeline from a dict containing the serialized data.

Note

It's intended for internal use.

Parameters:

Name Type Description Default
data Dict[str, Any]

Dictionary containing the serialized data from a Pipeline.

required

Returns:

Name Type Description
BasePipeline Self

Pipeline recreated from the dictionary info.

Source code in src/distilabel/pipeline/base.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Self:
    """Create a Pipeline from a dict containing the serialized data.

    Note:
        It's intended for internal use.

    Args:
        data (Dict[str, Any]): Dictionary containing the serialized data from a Pipeline.

    Returns:
        BasePipeline: Pipeline recreated from the dictionary info.
    """
    name = data["pipeline"]["name"]
    description = data["pipeline"].get("description")
    with cls(name=name, description=description) as pipe:
        pipe.dag = DAG.from_dict(data["pipeline"])
    return pipe

get_runtime_parameters_info()

Get the runtime parameters for the steps in the pipeline.

Returns:

Type Description
Dict[str, List[Dict[str, Any]]]

A dictionary with the step name as the key and a list of dictionaries with

Dict[str, List[Dict[str, Any]]]

the parameter name and the parameter info as the value.

Source code in src/distilabel/pipeline/base.py
def get_runtime_parameters_info(self) -> Dict[str, List[Dict[str, Any]]]:
    """Get the runtime parameters for the steps in the pipeline.

    Returns:
        A dictionary with the step name as the key and a list of dictionaries with
        the parameter name and the parameter info as the value.
    """
    runtime_parameters = {}
    for step_name in self.dag:
        step: "_Step" = self.dag.get_step(step_name)[STEP_ATTR_NAME]
        runtime_parameters[step_name] = step.get_runtime_parameters_info()
    return runtime_parameters

run(parameters=None, use_cache=True)

Run the pipeline. It will set the runtime parameters for the steps and validate the pipeline.

This method should be extended by the specific pipeline implementation, adding the logic to run the pipeline.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Dict[str, Any]]]

A dictionary with the step name as the key and a dictionary with the runtime parameters for the step as the value. Defaults to None.

None
use_cache bool

Whether to use the cache from previous pipeline runs. Defaults to True.

True

Returns:

Type Description
Distiset

The Distiset created by the pipeline.

Source code in src/distilabel/pipeline/base.py
def run(
    self,
    parameters: Optional[Dict[str, Dict[str, Any]]] = None,
    use_cache: bool = True,
) -> "Distiset":  # type: ignore
    """Run the pipeline. It will set the runtime parameters for the steps and validate
    the pipeline.

    This method should be extended by the specific pipeline implementation,
    adding the logic to run the pipeline.

    Args:
        parameters: A dictionary with the step name as the key and a dictionary with
            the runtime parameters for the step as the value. Defaults to `None`.
        use_cache: Whether to use the cache from previous pipeline runs. Defaults to
            `True`.

    Returns:
        The `Distiset` created by the pipeline.
    """
    if use_cache:
        self._load_from_cache()
    self._set_runtime_parameters(parameters or {})
    self.dag.validate()

Pipeline

Bases: BasePipeline

Local pipeline implementation using multiprocessing.

Source code in src/distilabel/pipeline/local.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
class Pipeline(BasePipeline):
    """Local pipeline implementation using `multiprocessing`."""

    def run(
        self,
        parameters: Optional[Dict[str, Dict[str, Any]]] = None,
        use_cache: bool = True,
    ) -> "Distiset":
        """Runs the pipeline.

        Args:
            parameters: A dictionary with the step name as the key and a dictionary with
                the runtime parameters for the step as the value. Defaults to `None`.
            use_cache: Whether to use the cache from previous pipeline runs. Defaults to
                `True`.

        Returns:
            The `Distiset` created by the pipeline.

        Raises:
            RuntimeError: If the pipeline fails to load all the steps.
        """
        log_queue = mp.Queue()
        # We must place the runtime parameters before calling setup_logging to ensure consistency
        super().run(parameters, use_cache)
        setup_logging(log_queue, filename=str(self._cache_location["log_file"]))  # type: ignore
        self._logger = logging.getLogger("distilabel.pipeline.local")

        if self._dry_run:
            # This message is placed here to ensure we are using the already setup logger.
            self._logger.info("🌵 Dry run mode")

        if self._batch_manager is None:
            self._batch_manager = _BatchManager.from_dag(self.dag)

        # If the batch manager is not able to generate batches, that means that the loaded
        # `_BatchManager` from cache didn't have any remaining batches to process i.e.
        # the previous pipeline execution was completed successfully.
        if not self._batch_manager.can_generate():
            self._logger.info(
                "💾 Loaded batch manager from cache doesn't have any remaining data. Returning"
                " `Distiset` from cache data..."
            )
            stop_logging()
            return create_distiset(
                self._cache_location["data"],
                pipeline_path=self._cache_location["pipeline"],
                log_filename_path=self._cache_location["log_file"],
                enable_metadata=self._enable_metadata,
            )

        buffer_data_path = self._cache_location["data"]
        self._logger.info(f"📝 Pipeline data will be written to '{buffer_data_path}'")
        write_buffer = _WriteBuffer(buffer_data_path, self.dag.leaf_steps)

        num_processes = len(self.dag)
        ctx = mp.get_context()  # type: ignore
        with ctx.Manager() as manager, ctx.Pool(
            num_processes,
            initializer=_init_worker,
            initargs=(log_queue,),
        ) as pool:
            self.output_queue: "Queue[Any]" = manager.Queue()
            self.shared_info = self._create_shared_info_dict(manager)
            self._handle_keyboard_interrupt(manager=manager, pool=pool)

            # Run the steps using the pool of processes
            self._run_steps_in_loop(pool, manager, self.output_queue, self.shared_info)

            # Wait for all the steps to be loaded correctly
            if not self._all_steps_loaded():
                write_buffer.close()
                self._batch_manager = None
                stop_logging()
                raise RuntimeError(
                    "Failed to load all the steps. Could not run pipeline."
                ) from _SUBPROCESS_EXCEPTION

            # Send the "first" batches to the steps so the batches starts flowing through
            # the input queues and output queue
            self._request_initial_batches()

            # Start a loop to receive the output batches from the steps
            self._run_output_queue_loop_in_thread(write_buffer)

            # Send `None` to steps `input_queue`s just in case some step is still waiting
            self._notify_steps_to_stop()

            pool.close()
            pool.join()

        write_buffer.close()
        distiset = create_distiset(
            self._cache_location["data"],
            pipeline_path=self._cache_location["pipeline"],
            log_filename_path=self._cache_location["log_file"],
            enable_metadata=self._enable_metadata,
        )
        stop_logging()
        return distiset

    def _run_output_queue_loop_in_thread(self, write_buffer: "_WriteBuffer") -> None:
        """Runs the output queue loop in a separate thread to receive the output batches
        from the steps. This is done to avoid the signal handler to block the loop, which
        would prevent the pipeline from stopping correctly.

        Args:
            write_buffer: The write buffer to write the data from the leaf steps to disk.
        """
        thread = threading.Thread(target=self._output_queue_loop, args=(write_buffer,))
        thread.start()
        thread.join()

    def _notify_steps_to_stop(self) -> None:
        """Notifies the steps to stop their infinite running loop by sending `None` to
        their input queues."""
        for step_name in self.dag:
            if input_queue := self.dag.get_step(step_name).get(INPUT_QUEUE_ATTR_NAME):
                input_queue.put(None)

    def _output_queue_loop(self, write_buffer: "_WriteBuffer") -> None:
        """Loop to receive the output batches from the steps and manage the flow of the
        batches through the pipeline.

        Args:
            write_buffer: The write buffer to write the data from the leaf steps to disk.
        """
        while self._batch_manager.can_generate() and not _STOP_CALLED:  # type: ignore
            self._logger.debug("Waiting for output batch from step...")
            if (batch := self.output_queue.get()) is None:
                self._logger.debug("Received `None` from output queue. Breaking loop.")
                break

            if batch.step_name in self.dag.leaf_steps:
                write_buffer.add_batch(batch)

            # If `_STOP_CALLED` was set to `True` while waiting for the output queue, then
            # we need to handle the stop of the pipeline and break the loop to avoid
            # propagating the batches through the pipeline and making the stop process
            # slower.
            if _STOP_CALLED:
                self._handle_batch_on_stop(batch)
                break

            self._logger.debug(
                f"Received batch with seq_no {batch.seq_no} from step '{batch.step_name}'"
                f" from output queue: {batch}"
            )

            self._manage_batch_flow(batch)

        if _STOP_CALLED:
            self._handle_stop(write_buffer)

    def _manage_batch_flow(self, batch: "_Batch") -> None:
        """Checks if the step that generated the batch has more data in its buffer to
        generate a new batch. If there's data, then a new batch is sent to the step. If
        the step has no data in its buffer, then the predecessors generator steps are
        requested to send a new batch.

        Args:
            batch: The batch that was processed.
        """
        assert self._batch_manager, "Batch manager is not set"

        # Make sure to send the `LAST_BATCH_SENT_FLAG` to the predecessors of the convergence
        # step if the batch is the last one, so they stop their processing loop even if
        # they haven't received the last batch because of the routing function.
        if self._is_convergence_step(batch.step_name) and batch.last_batch:
            for step_name in self.dag.get_step_predecessors(batch.step_name):
                self._send_last_batch_flag_to_step(step_name)

        route_to, routed = self._get_successors(batch)

        # Keep track of the steps that the batch was routed to
        if routed:
            batch.batch_routed_to = route_to

        self._register_batch(batch)

        step = self._get_step_from_batch(batch)

        # Add the batch to the successors input buffers
        for successor in route_to:
            # Copy batch to avoid modifying the same reference in the batch manager
            batch_to_add = batch.copy() if len(route_to) > 1 else batch

            self._batch_manager.add_batch(successor, batch_to_add)

            # Check if the step is a generator and if there are successors that need data
            # from this step. This usually happens when the generator `batch_size` is smaller
            # than the `input_batch_size` of the successor steps.
            if (
                step.is_generator
                and step.name in self._batch_manager.step_empty_buffers(successor)
            ):
                last_batch_sent = self._batch_manager.get_last_batch_sent(step.name)
                self._send_batch_to_step(last_batch_sent.next_batch())  # type: ignore

            # If successor step has enough data in its buffer to create a new batch, then
            # send the batch to the step.
            if new_batch := self._batch_manager.get_batch(successor):
                self._send_batch_to_step(new_batch)

        if step.is_generator:
            return

        # Step ("this", the one from which the batch was received) has enough data on its
        # buffers to create a new batch
        if new_batch := self._batch_manager.get_batch(step.name):  # type: ignore
            self._send_batch_to_step(new_batch)
        else:
            self._request_more_batches_if_needed(step)

        self._cache()

    def _register_batch(self, batch: "_Batch") -> None:
        """Registers a batch in the batch manager.

        Args:
            batch: The batch to register.
        """
        self._batch_manager.register_batch(batch)  # type: ignore
        self._logger.debug(
            f"Batch {batch.seq_no} from step '{batch.step_name}' registered in batch"
            " manager"
        )

    def _get_successors(self, batch: "_Batch") -> Tuple[List[str], bool]:
        """Gets the successors and the successors to which the batch has to be routed.

        Args:
            batch: The batch to which the successors will be determined.

        Returns:
            The successors to route the batch to and whether the batch was routed using
            a routing function.
        """
        node = self.dag.get_step(batch.step_name)
        step: "Step" = node[STEP_ATTR_NAME]
        successors = list(self.dag.get_step_successors(step.name))  # type: ignore
        route_to = successors

        # Check if the step has a routing function to send the batch to specific steps
        if routing_batch_function := node.get(ROUTING_BATCH_FUNCTION_ATTR_NAME):
            route_to = routing_batch_function(batch, successors)
            successors_str = ", ".join(f"'{successor}'" for successor in route_to)
            self._logger.info(
                f"🚏 Using '{step.name}' routing function to send batch {batch.seq_no} to steps: {successors_str}"
            )

        return route_to, route_to != successors

    def _get_step_from_batch(self, batch: "_Batch") -> "Step":
        """Gets the `Step` instance from a batch.

        Args:
            batch: The batch to get the step from.

        Returns:
            The `Step` instance.
        """
        return self.dag.get_step(batch.step_name)[STEP_ATTR_NAME]

    def _request_more_batches_if_needed(self, step: "Step") -> None:
        """Request more batches to the predecessors steps of `step` if needed.

        Args:
            step: The step of which it has to be checked if more batches are needed from
                its predecessors.
        """
        empty_buffers = self._batch_manager.step_empty_buffers(step.name)  # type: ignore
        for previous_step_name in empty_buffers:
            if previous_step_name not in self.dag.root_steps:
                continue

            last_batch = self._batch_manager.get_last_batch_sent(previous_step_name)  # type: ignore
            if last_batch is None:
                continue

            self._logger.debug(
                f"Step '{step.name}' input buffer for step '{previous_step_name}' is"
                " empty. Requesting new batch..."
            )
            self._send_batch_to_step(last_batch.next_batch())

    def _handle_stop(self, write_buffer: "_WriteBuffer") -> None:
        """Handles the stop of the pipeline execution, which will stop the steps from
        processing more batches and wait for the output queue to be empty, to not lose
        any data that was already processed by the steps before the stop was called.

        Args:
            write_buffer: The write buffer to write the data from the leaf steps to disk.
        """
        self._logger.debug("Handling stop of the pipeline execution...")

        # Add the remaining batches in the input queues back to the batch manager
        for step_name in self.dag:
            node = self.dag.get_step(step_name)
            step: "_Step" = node[STEP_ATTR_NAME]
            if step.is_generator:
                continue
            if input_queue := node.get(INPUT_QUEUE_ATTR_NAME):
                while not input_queue.empty():
                    batch = input_queue.get()
                    if batch is None:
                        continue
                    self._batch_manager.add_batch(  # type: ignore
                        to_step=step_name, batch=batch, prepend=True
                    )
                    self._logger.debug(
                        f"Adding batch back to the batch manager: {batch}"
                    )
                input_queue.put(None)

        # Wait for the input queue to be empty, which means that all the steps finished
        # processing the batches that were sent before the stop flag.
        for step_name in self.dag:
            self._wait_step_input_queue_empty(step_name)

        # Consume the output queue until it's empty to not lose any data that was already
        # processed by the steps before stop was called.
        while not self.output_queue.empty():
            batch = self.output_queue.get()
            if batch is None:
                continue

            if batch.step_name in self.dag.leaf_steps:
                write_buffer.add_batch(batch)

            self._handle_batch_on_stop(batch)

        self._cache()

    def _handle_batch_on_stop(self, batch: "_Batch") -> None:
        """Handles a batch that was received from the output queue when the pipeline was
        stopped. It will add and register the batch in the batch manager.

        Args:
            batch: The batch to handle.
        """
        self._batch_manager.register_batch(batch)  # type: ignore
        step: "Step" = self.dag.get_step(batch.step_name)[STEP_ATTR_NAME]
        for successor in self.dag.get_step_successors(step.name):  # type: ignore
            self._batch_manager.add_batch(successor, batch)  # type: ignore

    def _wait_step_input_queue_empty(self, step_name: str) -> Union["Queue[Any]", None]:
        """Waits for the input queue of a step to be empty.

        Args:
            step_name: The name of the step.

        Returns:
            The input queue of the step if it's not loaded or finished, `None` otherwise.
        """
        if self._check_step_not_loaded_or_finished(step_name):
            return None

        if input_queue := self.dag.get_step(step_name).get(INPUT_QUEUE_ATTR_NAME):
            while input_queue.qsize() != 0:
                pass
            return input_queue

    def _create_shared_info_dict(self, manager: "SyncManager") -> "DictProxy[str, Any]":
        """Creates the shared information dictionary to be used by the processes.

        Args:
            manager: The manager to create the shared information.

        Returns:
            The shared information dictionary.
        """
        # TODO: not very important, but we could use a different lock for each matter
        return manager.dict(
            **{
                _STEPS_LOADED_KEY: manager.list(),
                _STEPS_LOADED_LOCK_KEY: manager.Lock(),
                _CUDA_LLM_DEVICE_PLACEMENT_KEY: manager.dict(**{}),
                _CUDA_LLM_DEVICE_PLACEMENT_LOCK_KEY: manager.Lock(),
            }
        )

    def _all_steps_loaded(self) -> bool:
        """Waits for all the steps to load.

        Returns:
            `True` if all the steps have been loaded correctly, `False` otherwise.
        """

        def _update_all_steps_loaded(steps_loaded: List[str]) -> None:
            with _STEPS_LOADED_LOCK:
                _STEPS_LOADED.update(steps_loaded)

        self._logger.info("⏳ Waiting for all the steps to load...")
        previous_message = None
        while not _STOP_CALLED:
            with self.shared_info[_STEPS_LOADED_LOCK_KEY]:
                steps_loaded = self.shared_info[_STEPS_LOADED_KEY]
                num_steps_loaded = (
                    len(steps_loaded)
                    if steps_loaded != [_STEPS_LOADED_ERROR_CODE]
                    else 0
                )
                self._logger.debug(f"Steps loaded: {steps_loaded}")

                message = f"⏳ Steps loaded: {num_steps_loaded}/{len(self.dag)}"
                if num_steps_loaded > 0 and message != previous_message:
                    self._logger.info(message)
                    previous_message = message

                if num_steps_loaded == len(self.dag):
                    self._logger.info("✅ All the steps have been loaded!")
                    _update_all_steps_loaded(steps_loaded)
                    return True

                if steps_loaded == [_STEPS_LOADED_ERROR_CODE]:
                    self._logger.error("❌ Failed to load all the steps")
                    _update_all_steps_loaded(steps_loaded)
                    return False

            time.sleep(2.5)

        return not _STOP_CALLED

    def _request_initial_batches(self) -> None:
        """Requests the initial batches to the generator steps."""
        assert self._batch_manager, "Batch manager is not set"

        for step in self._batch_manager._steps.values():
            if batch := step.get_batch():
                self._logger.debug(
                    f"Sending initial batch to '{step.step_name}' step: {batch}"
                )
                self._send_batch_to_step(batch)

        for step_name in self.dag.root_steps:
            seq_no = 0
            if last_batch := self._batch_manager.get_last_batch(step_name):
                seq_no = last_batch.seq_no + 1
            batch = _Batch(seq_no=seq_no, step_name=step_name, last_batch=self._dry_run)
            self._logger.debug(
                f"Requesting initial batch to '{step_name}' generator step: {batch}"
            )
            self._send_batch_to_step(batch)

    def _send_batch_to_step(self, batch: "_Batch") -> None:
        """Sends a batch to the input queue of a step.

        Args:
            batch: The batch to send.
        """
        self._logger.debug(
            f"Setting batch {batch.seq_no} as last batch sent to '{batch.step_name}': {batch}"
        )
        self._batch_manager.set_last_batch_sent(batch)  # type: ignore

        self._logger.debug(
            f"Sending batch {batch.seq_no} to step '{batch.step_name}': {batch}"
        )
        input_queue = self.dag.get_step(batch.step_name)[INPUT_QUEUE_ATTR_NAME]
        input_queue.put(batch)

    def _is_convergence_step(self, step_name: str) -> None:
        """Checks if a step is a convergence step.

        Args:
            step_name: The name of the step.
        """
        return self.dag.get_step(step_name).get(CONVERGENCE_STEP_ATTR_NAME)

    def _send_last_batch_flag_to_step(self, step_name: str) -> None:
        """Sends the `LAST_BATCH_SENT_FLAG` to a step to stop processing batches.

        Args:
            step_name: The name of the step.
        """
        batch = self._batch_manager.get_last_batch_sent(step_name)  # type: ignore
        if batch and batch.last_batch:
            return

        self._logger.debug(
            f"Sending `LAST_BATCH_SENT_FLAG` to '{step_name}' step to stop processing"
            " batches..."
        )
        input_queue = self.dag.get_step(step_name)[INPUT_QUEUE_ATTR_NAME]
        input_queue.put(LAST_BATCH_SENT_FLAG)
        self._batch_manager.set_last_batch_flag_sent_to(step_name)  # type: ignore

    def _run_steps_in_loop(
        self,
        pool: "Pool",
        manager: "SyncManager",
        output_queue: "Queue[_Batch]",
        shared_info: "DictProxy[str, Any]",
    ) -> None:
        """Using the `pool`, runs the steps in the DAG in an infinite loop waiting for
        input batches and sending the output batches to the `output_queue`.

        Each `Step` is wrapped in a `_ProcessWrapper`, which will handle the lifecycle of
        the `Step` and the communication with the `input_queue` and `output_queue`. The
        `_ProcessWrapper.run` method is the target function of the process.

        Args:
            pool: The pool of processes.
            manager: The manager to create the queues.
            output_queue: The queue to send the output batches.
            shared_info: The shared information between the processes.
        """
        for step_name in self.dag:
            step: "Step" = self.dag.get_step(step_name)[STEP_ATTR_NAME]
            input_queue = manager.Queue()
            self.dag.set_step_attr(step.name, INPUT_QUEUE_ATTR_NAME, input_queue)

            # Set `pipeline` to `None` as in some Python environments the pipeline is not
            # picklable and it will raise an error when trying to send the step to the process.
            # `TypeError: cannot pickle 'code' object`
            step.pipeline = None

            process_wrapper = _ProcessWrapper(
                step=step,
                input_queue=input_queue,
                output_queue=output_queue,
                shared_info=shared_info,
                dry_run=self._dry_run,
            )

            pool.apply_async(
                process_wrapper.run,
                callback=self._finished_callback,
                error_callback=self._error_callback,
            )  # type: ignore

    def _error_callback(self, e: BaseException) -> None:
        """Error callback that will be called when an error occurs in a `Step` process.

        Args:
            e: The exception raised by the process.
        """
        global _SUBPROCESS_EXCEPTION

        # First we check that the exception is a `_ProcessWrapperException`, otherwise, we
        # print it out and stop the pipeline, since some errors may be unhandled
        if not isinstance(e, _ProcessWrapperException):
            self._logger.error(f"❌ Failed with an unhandled exception: {e}")
            self._stop()
            return

        if e.is_load_error:
            self._logger.error(f"❌ Failed to load step '{e.step.name}': {e.message}")
            with self.shared_info[_STEPS_LOADED_LOCK_KEY]:
                self.shared_info[_STEPS_LOADED_KEY] = [_STEPS_LOADED_ERROR_CODE]
            _SUBPROCESS_EXCEPTION = e.subprocess_exception
            _SUBPROCESS_EXCEPTION.__traceback__ = tblib.Traceback.from_string(
                e.formatted_traceback
            ).as_traceback()
            return

        # If the step is global, is not in the last trophic level and has no successors,
        # then we can ignore the error and continue executing the pipeline
        if (
            e.step.is_global
            and not self.dag.step_in_last_trophic_level(e.step.name)
            and list(self.dag.get_step_successors(e.step.name)) == []
        ):
            self._logger.error(
                f"✋ An error occurred when running global step '{e.step.name}' with no"
                " successors and not in the last trophic level. Pipeline execution can"
                f" continue. Error will be ignored."
            )
            self._logger.error(f"Subprocess traceback:\n\n{e.formatted_traceback}")
            return

        # Global step with successors failed
        self._logger.error(f"An error occurred in global step '{e.step.name}'")
        self._logger.error(f"Subprocess traceback:\n\n{e.formatted_traceback}")
        self._cache()
        self._stop()

    def _finished_callback(self, step_name: str) -> None:
        """Callback that will be called when a `Step` process finishes.

        Args:
            step_name: The name of the step that finished.
        """
        with _STEPS_FINISHED_LOCK:
            _STEPS_FINISHED.add(step_name)

    def _check_step_not_loaded_or_finished(self, step_name: str) -> bool:
        """Checks if a step is not loaded or already finished.

        Args:
            step_name: The name of the step.

        Returns:
            `True` if the step is not loaded or already finished, `False` otherwise.
        """
        with _STEPS_LOADED_LOCK:
            if step_name not in _STEPS_LOADED:
                return True

        with _STEPS_FINISHED_LOCK:
            if step_name in _STEPS_FINISHED:
                return True

        return False

    def _stop(
        self, manager: Optional["SyncManager"] = None, pool: Optional["Pool"] = None
    ) -> None:
        """Stops the pipeline execution. It will first send `None` to the input queues
        of all the steps and then wait until the output queue is empty i.e. all the steps
        finished processing the batches that were sent before the stop flag. Then it will
        send `None` to the output queue to notify the pipeline to stop."""

        global _STOP_CALLED

        with _STOP_CALLED_LOCK:
            if _STOP_CALLED:
                global _STOP_CALLS
                _STOP_CALLS += 1
                # if _STOP_CALLS == 1:
                #     self._logger.warning(
                #         "🛑 Stop has already been called. Ignoring subsequent calls and waiting"
                #         " for the pipeline to finish..."
                #     )
                if _STOP_CALLS == 1:
                    self._logger.warning(
                        "🛑 Press again to force the pipeline to stop."
                    )
                elif _STOP_CALLS > 1:
                    self._logger.warning("🛑 Forcing pipeline interruption.")
                    import gc
                    import sys

                    if manager:
                        manager.shutdown()

                    if pool:
                        pool.close()
                        pool.terminate()

                    gc.collect()

                    sys.exit(1)

                return
            _STOP_CALLED = True

        self._logger.debug(f"Steps loaded before calling `stop`: {_STEPS_LOADED}")
        self._logger.info(
            "🛑 Stopping pipeline. Waiting for steps to finish processing batches..."
        )
        self._logger.debug("Sending `None` to the output queue to notify stop...")
        self.output_queue.put(None)

    def _handle_keyboard_interrupt(
        self, manager: Optional["SyncManager"] = None, pool: Optional["Pool"] = None
    ) -> None:
        """Handles KeyboardInterrupt signal sent during the Pipeline.run method.

        It will try to call self._stop (if the pipeline didn't started yet, it won't
        have any effect), and if the pool is already started, will close it before exiting
        the program.
        """

        def signal_handler(signumber: int, frame: Any) -> None:
            self._stop(manager=manager, pool=pool)

        signal.signal(signal.SIGINT, signal_handler)

run(parameters=None, use_cache=True)

Runs the pipeline.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Dict[str, Any]]]

A dictionary with the step name as the key and a dictionary with the runtime parameters for the step as the value. Defaults to None.

None
use_cache bool

Whether to use the cache from previous pipeline runs. Defaults to True.

True

Returns:

Type Description
Distiset

The Distiset created by the pipeline.

Raises:

Type Description
RuntimeError

If the pipeline fails to load all the steps.

Source code in src/distilabel/pipeline/local.py
def run(
    self,
    parameters: Optional[Dict[str, Dict[str, Any]]] = None,
    use_cache: bool = True,
) -> "Distiset":
    """Runs the pipeline.

    Args:
        parameters: A dictionary with the step name as the key and a dictionary with
            the runtime parameters for the step as the value. Defaults to `None`.
        use_cache: Whether to use the cache from previous pipeline runs. Defaults to
            `True`.

    Returns:
        The `Distiset` created by the pipeline.

    Raises:
        RuntimeError: If the pipeline fails to load all the steps.
    """
    log_queue = mp.Queue()
    # We must place the runtime parameters before calling setup_logging to ensure consistency
    super().run(parameters, use_cache)
    setup_logging(log_queue, filename=str(self._cache_location["log_file"]))  # type: ignore
    self._logger = logging.getLogger("distilabel.pipeline.local")

    if self._dry_run:
        # This message is placed here to ensure we are using the already setup logger.
        self._logger.info("🌵 Dry run mode")

    if self._batch_manager is None:
        self._batch_manager = _BatchManager.from_dag(self.dag)

    # If the batch manager is not able to generate batches, that means that the loaded
    # `_BatchManager` from cache didn't have any remaining batches to process i.e.
    # the previous pipeline execution was completed successfully.
    if not self._batch_manager.can_generate():
        self._logger.info(
            "💾 Loaded batch manager from cache doesn't have any remaining data. Returning"
            " `Distiset` from cache data..."
        )
        stop_logging()
        return create_distiset(
            self._cache_location["data"],
            pipeline_path=self._cache_location["pipeline"],
            log_filename_path=self._cache_location["log_file"],
            enable_metadata=self._enable_metadata,
        )

    buffer_data_path = self._cache_location["data"]
    self._logger.info(f"📝 Pipeline data will be written to '{buffer_data_path}'")
    write_buffer = _WriteBuffer(buffer_data_path, self.dag.leaf_steps)

    num_processes = len(self.dag)
    ctx = mp.get_context()  # type: ignore
    with ctx.Manager() as manager, ctx.Pool(
        num_processes,
        initializer=_init_worker,
        initargs=(log_queue,),
    ) as pool:
        self.output_queue: "Queue[Any]" = manager.Queue()
        self.shared_info = self._create_shared_info_dict(manager)
        self._handle_keyboard_interrupt(manager=manager, pool=pool)

        # Run the steps using the pool of processes
        self._run_steps_in_loop(pool, manager, self.output_queue, self.shared_info)

        # Wait for all the steps to be loaded correctly
        if not self._all_steps_loaded():
            write_buffer.close()
            self._batch_manager = None
            stop_logging()
            raise RuntimeError(
                "Failed to load all the steps. Could not run pipeline."
            ) from _SUBPROCESS_EXCEPTION

        # Send the "first" batches to the steps so the batches starts flowing through
        # the input queues and output queue
        self._request_initial_batches()

        # Start a loop to receive the output batches from the steps
        self._run_output_queue_loop_in_thread(write_buffer)

        # Send `None` to steps `input_queue`s just in case some step is still waiting
        self._notify_steps_to_stop()

        pool.close()
        pool.join()

    write_buffer.close()
    distiset = create_distiset(
        self._cache_location["data"],
        pipeline_path=self._cache_location["pipeline"],
        log_filename_path=self._cache_location["log_file"],
        enable_metadata=self._enable_metadata,
    )
    stop_logging()
    return distiset