Is there a simple way to track the overall progress of a joblib.Parallel execution?

有没有一种简单的方法来跟踪一个joblib的整体进展?并行执行?

I have a long-running execution composed of thousands of jobs, which I want to track and record in a database. However, to do that, whenever Parallel finishes a task, I need it to execute a callback, reporting how many remaining jobs are left.

我有一个由数千个作业组成的长时间运行的执行,我希望在数据库中跟踪记录。然而,要做到这一点,无论何时并行完成一个任务,我都需要它执行一个回调,报告还剩下多少个作业。

I've accomplished a similar task before with Python's stdlib multiprocessing.Pool, by launching a thread that records the number of pending jobs in Pool's job list.

我以前用Python的stdlib多处理完成过类似的任务。Pool,通过启动一个线程,该线程记录Pool作业列表中的待处理作业的数量。

Looking at the code, Parallel inherits Pool, so I thought I could pull off the same trick, but it doesn't seem to use these that list, and I haven't been able to figure out how else to "read" it's internal status any other way.

看一下代码,并行继承池,我想我可以用同样的技巧,但是它似乎没有使用这个列表,而且我还没能找到其他方法来“读取”它的内部状态。

5 个解决方案

#1


8

The documentation you linked to states that Parallel has an optional progress meter. It's implemented by using the callback keyword argument provided by multiprocessing.Pool.apply_async:

您链接到并行状态的文档有一个可选的进度表。它是通过使用multiprocess . pool .apply_async提供的回调关键字参数实现的。

# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
            kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1

...

class CallBack(object):
    """ Callback used by parallel: it is used for progress reporting, and
        to add data to be processed
    """
    def __init__(self, index, parallel):
        self.parallel = parallel
        self.index = index

    def __call__(self, out):
        self.parallel.print_progress(self.index)
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

And here's print_progress:

这里是print_progress:

def print_progress(self, index):
    elapsed_time = time.time() - self._start_time

    # This is heuristic code to print only 'verbose' times a messages
    # The challenge is that we may not know the queue length
    if self._original_iterable:
        if _verbosity_filter(index, self.verbose):
            return
        self._print('Done %3i jobs       | elapsed: %s',
                    (index + 1,
                     short_format_time(elapsed_time),
                    ))
    else:
        # We are finished dispatching
        queue_length = self.n_dispatched
        # We always display the first loop
        if not index == 0:
            # Display depending on the number of remaining items
            # A message as soon as we finish dispatching, cursor is 0
            cursor = (queue_length - index + 1
                      - self._pre_dispatch_amount)
            frequency = (queue_length // self.verbose) + 1
            is_last_item = (index + 1 == queue_length)
            if (is_last_item or cursor % frequency):
                return
        remaining_time = (elapsed_time / (index + 1) *
                    (self.n_dispatched - index - 1.))
        self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                    (index + 1,
                     queue_length,
                     short_format_time(elapsed_time),
                     short_format_time(remaining_time),
                    ))

The way they implement this is kind of weird, to be honest - it seems to assume that tasks will always be completed in the order that they're started. The index variable that goes to print_progress is just the self.n_dispatched variable at the time the job was actually started. So the first job launched will always finish with an index of 0, even if say, the third job finished first. It also means they don't actually keep track of the number of completed jobs. So there's no instance variable for you to monitor.

老实说,他们执行这个任务的方式有点奇怪——它似乎假设任务总是按照开始的顺序完成。到print_progress的索引变量就是self。作业实际启动时的n_dispatch变量。因此,启动的第一个作业的索引总是为0,即使第三个作业先完成。这也意味着他们实际上没有跟踪完成的工作的数量。所以没有实例变量可以监控。

I think your best best is to make your own CallBack class, and monkey patch Parallel:

我认为你最好自己做回调类,monkey patch Parallel:

from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed

class CallBack(object):
    completed = defaultdict(int)

    def __init__(self, index, parallel):
        self.index = index
        self.parallel = parallel

    def __call__(self, index):
        CallBack.completed[self.parallel] += 1
        print("done with {}".format(CallBack.completed[self.parallel]))
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.CallBack = CallBack

if __name__ == "__main__":
    print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))

Output:

输出:

done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

That way, your callback gets called whenever a job completes, rather than the default one.

这样,每当一个作业完成时,就会调用回调函数,而不是默认的回调函数。

更多相关文章

  1. Python:Sympy定义与包含变量的边界的积分
  2. “全局变量是坏的”是什么意思?
  3. 环境变量的安装以及python cahrm的安装以及常用快捷键的使用
  4. tensorflow 变量定义路径//问题
  5. 变量和数据类型
  6. python的全局变量与局部变量实验
  7. 在混合的Bash-Python代码片段中,变量的双引号和单引号
  8. 即使我返回2个变量,对象也不可迭代?
  9. 如何正确地获取在pysnmp中被捕获的变量的表行索引和命名值?

随机推荐

  1. 跟核心虚拟机Dalvik说再见 Android Runti
  2. 实现微信布局的四种方式(一)
  3. Android防止活动被回收而丢失数据
  4. [置顶] Android获取存储卡路径的
  5. 关于Android Studio 3.1.3
  6. Android公钥私钥及代码详细解读
  7. 关于Android的阅读界面设计问题
  8. android 中的 odex 文件
  9. Android实现非本地图片的点击效果
  10. [置顶] Android开发的一些小技巧