Is there a simple way to track the overall progress of a joblib.Parallel execution?


I have a long-running execution composed of thousands of jobs, which I want to track and record in a database. However, to do that, whenever Parallel finishes a task, I need it to execute a callback, reporting how many remaining jobs are left.


I've accomplished a similar task before with Python's stdlib multiprocessing.Pool, by launching a thread that records the number of pending jobs in Pool's job list.


Looking at the code, Parallel inherits Pool, so I thought I could pull off the same trick, but it doesn't seem to use these that list, and I haven't been able to figure out how else to "read" it's internal status any other way.


5 个解决方案



The documentation you linked to states that Parallel has an optional progress meter. It's implemented by using the callback keyword argument provided by multiprocessing.Pool.apply_async:

您链接到并行状态的文档有一个可选的进度表。它是通过使用multiprocess . pool .apply_async提供的回调关键字参数实现的。

# This is inside a dispatch function
job = self._pool.apply_async(SafeFunction(func), args,
            kwargs, callback=CallBack(self.n_dispatched, self))
self.n_dispatched += 1


class CallBack(object):
    """ Callback used by parallel: it is used for progress reporting, and
        to add data to be processed
    def __init__(self, index, parallel):
        self.parallel = parallel
        self.index = index

    def __call__(self, out):
        if self.parallel._original_iterable:

And here's print_progress:


def print_progress(self, index):
    elapsed_time = time.time() - self._start_time

    # This is heuristic code to print only 'verbose' times a messages
    # The challenge is that we may not know the queue length
    if self._original_iterable:
        if _verbosity_filter(index, self.verbose):
        self._print('Done %3i jobs       | elapsed: %s',
                    (index + 1,
        # We are finished dispatching
        queue_length = self.n_dispatched
        # We always display the first loop
        if not index == 0:
            # Display depending on the number of remaining items
            # A message as soon as we finish dispatching, cursor is 0
            cursor = (queue_length - index + 1
                      - self._pre_dispatch_amount)
            frequency = (queue_length // self.verbose) + 1
            is_last_item = (index + 1 == queue_length)
            if (is_last_item or cursor % frequency):
        remaining_time = (elapsed_time / (index + 1) *
                    (self.n_dispatched - index - 1.))
        self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                    (index + 1,

The way they implement this is kind of weird, to be honest - it seems to assume that tasks will always be completed in the order that they're started. The index variable that goes to print_progress is just the self.n_dispatched variable at the time the job was actually started. So the first job launched will always finish with an index of 0, even if say, the third job finished first. It also means they don't actually keep track of the number of completed jobs. So there's no instance variable for you to monitor.


I think your best best is to make your own CallBack class, and monkey patch Parallel:

我认为你最好自己做回调类,monkey patch Parallel:

from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed

class CallBack(object):
    completed = defaultdict(int)

    def __init__(self, index, parallel):
        self.index = index
        self.parallel = parallel

    def __call__(self, index):
        CallBack.completed[self.parallel] += 1
        print("done with {}".format(CallBack.completed[self.parallel]))
        if self.parallel._original_iterable:

import joblib.parallel
joblib.parallel.CallBack = CallBack

if __name__ == "__main__":
    print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))



done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

That way, your callback gets called whenever a job completes, rather than the default one.



  1. Python:Sympy定义与包含变量的边界的积分
  2. “全局变量是坏的”是什么意思?
  3. 环境变量的安装以及python cahrm的安装以及常用快捷键的使用
  4. tensorflow 变量定义路径//问题
  5. 变量和数据类型
  6. python的全局变量与局部变量实验
  7. 在混合的Bash-Python代码片段中,变量的双引号和单引号
  8. 即使我返回2个变量,对象也不可迭代?
  9. 如何正确地获取在pysnmp中被捕获的变量的表行索引和命名值?


  1. 跟核心虚拟机Dalvik说再见 Android Runti
  2. 实现微信布局的四种方式(一)
  3. Android防止活动被回收而丢失数据
  4. [置顶] Android获取存储卡路径的
  5. 关于Android Studio 3.1.3
  6. Android公钥私钥及代码详细解读
  7. 关于Android的阅读界面设计问题
  8. android 中的 odex 文件
  9. Android实现非本地图片的点击效果
  10. [置顶] Android开发的一些小技巧