Skip to content

futures

when_all_complete(futures, callback=None)

Returns a Future that will be completed when all the provided futures are completed, and it will contain the results of the futures.

Parameters:

Name Type Description Default
futures List[Future]

the Futures to wait for.

required

Returns:

Name Type Description
Future Future[List[T]]

the Future that will be completed when all the provided futures are completed, and it will contain the results of the futures.

Source code in src/distilabel/utils/futures.py
def when_all_complete(
    futures: List[Future[T]], callback: Optional[Callable[[List[T]], List[T]]] = None
) -> Future[List[T]]:
    """Returns a `Future` that will be completed when all the provided `futures` are
    completed, and it will contain the results of the `futures`.

    Args:
        futures (List[Future]): the `Future`s to wait for.

    Returns:
        Future: the `Future` that will be completed when all the provided `futures` are
            completed, and it will contain the results of the `futures`.
    """
    all_done_future = Future()
    results: List[T] = [None] * len(futures)  # type: ignore

    def check_all_done(future: Future) -> None:
        # This is done to preserve the order of the results with respect to the order
        # of the futures.
        index = futures.index(future)
        results[index] = future.result()[0]

        _, not_done = wait(futures, return_when="FIRST_COMPLETED")
        if len(not_done) == 0:
            final_results = results
            if callback is not None:
                final_results = callback(results)
            all_done_future.set_result(final_results)

    for future in futures:
        future.add_done_callback(check_all_done)

    return all_done_future