Skip to content

futures

when_all_complete(futures)

Returns a Future that will be completed when all the provided futures are completed, and it will contain the results of the futures.

Parameters:

Name Type Description Default
futures List[Future]

the Futures to wait for.

required

Returns:

Name Type Description
Future Future[T]

the Future that will be completed when all the provided futures are completed, and it will contain the results of the futures.

Source code in src/distilabel/utils/futures.py
def when_all_complete(futures: List[Future[T]]) -> Future[T]:
    """Returns a `Future` that will be completed when all the provided `futures` are
    completed, and it will contain the results of the `futures`.

    Args:
        futures (List[Future]): the `Future`s to wait for.

    Returns:
        Future: the `Future` that will be completed when all the provided `futures` are
            completed, and it will contain the results of the `futures`.
    """
    all_done_future = Future()
    results = [None] * len(futures)

    def check_all_done(future: Future) -> None:
        # This is done to preserve the order of the results with respect to the order
        # of the futures.
        index = futures.index(future)
        results[index] = future.result()[0]

        _, not_done = wait(futures, return_when="FIRST_COMPLETED")
        if len(not_done) == 0:
            all_done_future.set_result(results)

    for future in futures:
        future.add_done_callback(check_all_done)

    return all_done_future