nova中periodic task的实现
16lz
2021-01-22
作为个人学习笔记分享,有任何问题欢迎交流!
本文主要分析nova中periodic task的实现过程,周期性任务有如跟踪资源的变化,虚拟机状态的变化等。它的实现使用的python中的metaclass,要了解metaclass可以参考:http://jianpx.iteye.com/blog/908121,该文对metaclass做了较为全面的说明。总而言之,metaclass是一种可以控制类创建过程的机制,实现原理是重写type的__new()__ 或 __init()__函数。此外,还想说的一点就是:metaclass与mixin的区别是,metaclass可以动态的改变类或实例的属性或方法,而mixin只能通过__bases__属性来动态的改变其基类(个人理解)
1 periodic task实现原理
主要实现的代码在/nova/openstack/common/periodic_task.py中文件中,其中包括三个部分:
1.1def periodic_task(*args, **kwargs)
该函数实现的是一个装饰器,详情可以参考:http://blog.csdn.net/epugv/article/details/42612261
主要用来说明被装饰的函数需要周期性的执行,如nova/compute/manager.py中:
@periodic_task.periodic_task
def update_available_resource(self, context)
该装饰器有两种使用方法,在注释中已说明:
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on the default
interval of 60 seconds.
2. With arguments:
@periodic_task(spacing=N [, run_immediately=[True|False]])
this will be run on approximately every N seconds. If this number is
negative the periodic task will be disabled. If the run_immediately
argument is provided and has a value of 'True', the first run of the
task will be shortly after task scheduler starts. If
run_immediately is omitted or set to 'False', the first time the
task runs will be approximately N seconds after the task scheduler
starts.
"""
def decorator(f):
# Test for old style invocation
if 'ticks_between_runs' in kwargs:
raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
# Control if run at all
f._periodic_task = True #这里设置f的_periodic_task属性为True, f就是被装饰的函数,下面类似的设置了disable/enable, last_run等值
f._periodic_external_ok = kwargs.pop('external_process_ok', False) if f._periodic_external_ok and not CONF.run_external_periodic_tasks: f._periodic_enabled = False else: f._periodic_enabled = kwargs.pop('enabled', True) # Control frequency f._periodic_spacing = kwargs.pop('spacing', 0) f._periodic_immediate = kwargs.pop('run_immediately', False) if f._periodic_immediate: f._periodic_last_run = None else: f._periodic_last_run = time.time() return f if kwargs: return decorator else: return decorator(args[0])
1.2 元类
class _PeriodicTasksMeta(type):
def __init__(cls, names, bases, dict_):
#cls: 将要被创建的类; names: 类的名字; bases: 基类; dict_:类的属性,是一个字典,主要保存类(如ComputeManager)的方法</span>
"""Metaclass that allows us to collect decorated periodic tasks.""" super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)用于收集被1.1中装饰器所装饰的函数,实现如下所示:
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):#如果方法的_periodic_task为True(在1.1中设为了True), 则将该方法加入到periodic task列表中
task = value
name = task.__name__
if task._periodic_spacing < 0:
LOG.info(_LI('Skipping periodic task %(task)s because '
'its interval is negative'),
{'task': name})
continue
if not task._periodic_enabled:
LOG.info(_LI('Skipping periodic task %(task)s because '
'it is disabled'),
{'task': name})
continue
# A periodic spacing of zero indicates that this task should
# be run on the default interval to avoid running too
# frequently.
if task._periodic_spacing == 0:
task._periodic_spacing = DEFAULT_INTERVAL
cls._periodic_tasks.append((name, task))#将该方法加入到periodic task列表中
cls._periodic_spacing[name] = task._periodic_spacing
1.3 需要创建的类PeriodicTasks,该类主要用来启动周期性任务
当nova-compute这个进程开始启动时,nova/cmd/compute.py: server = service.Service.create(binary='nova-compute',topic=CONF.compute_topic,
db_allowed=CONF.conductor.use_local)
Service.create()是调用了nova/service.py:Service.create(),该函数就会导入ComputeManager这个类 继承关系如下:nova/openstack/commom/periodic_task.py:PeriodicTasks<----------nova/manager.py:Manager<-----------nova/compute/manager.py:ComputeManager
@six.add_metaclass(_PeriodicTasksMeta)
class PeriodicTasks(object):
#six.add_metaclass相当于这里的__metaclass__=_PeriodicTasks def __init__(self): super(PeriodicTasks, self).__init__() self._periodic_last_run = {} for name, task in self._periodic_tasks: self._periodic_last_run[name] = task._periodic_last_run
def run_periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" idle_for = DEFAULT_INTERVAL#默认60s for task_name, task in self._periodic_tasks: full_task_name = '.'.join([self.__class__.__name__, task_name]) spacing = self._periodic_spacing[task_name]#间隔 last_run = self._periodic_last_run[task_name]#最近一次执行的时间 # If a periodic task is _nearly_ due, then we'll run it early idle_for = min(idle_for, spacing) if last_run is not None: delta = last_run + spacing - time.time() if delta > 0.2: idle_for = min(idle_for, delta) continue LOG.debug("Running periodic task %(full_task_name)s", {"full_task_name": full_task_name}) self._periodic_last_run[task_name] = time.time() try: task(self, context)#执行被装饰的函数 except Exception as e: if raise_on_error: raise LOG.exception(_LE("Error during %(full_task_name)s: %(e)s"), {"full_task_name": full_task_name, "e": e}) time.sleep(0) return idle_for
2 periodic task的启动
periodic task的启动在nova/service.py:Service.start()中:if self.periodic_enable:因为nova/service.py:Service(service.Service),所以是tg来自nova/openstack/common/service.py中,tg实际上就是threadgroup,主要用来管理定时器Timer和greenthread
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
self.tg.add_dynamic_timer(self.periodic_tasks,#这是nova/manager.py:Manager的periodic_tasks(),其又将会调用PeriodicTasks类的run_periodic_tasks()
initial_delay=initial_delay,
periodic_interval_max=
self.periodic_interval_max)
def add_dynamic_timer(self, callback, initial_delay=None,timer.start()的实现如下,使用到了eventlet的event: event.Event()可使任意数量的协程(coroutine)等待(wait函数)一个事件(event),send()可以触发wait()并可以传入变量,如下面的send(e.retvalue),wait()等待send,并返回send带的变量,如这里的e.retvalue,但是send()只能调用一次,下次在调用需重新创建Event。
periodic_interval_max=None, *args, **kwargs):
timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)#定时器就是循环调用callback了
timer.start(initial_delay=initial_delay,
periodic_interval_max=periodic_interval_max)
self.timers.append(timer)
def start(self, initial_delay=None, periodic_interval_max=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
idle = self.f(*self.args, **self.kw)
if not self._running:
break
if periodic_interval_max is not None:
idle = min(idle, periodic_interval_max)
LOG.debug('Dynamic looping call sleeping for %.02f '
'seconds', idle)
greenthread.sleep(idle)#等待
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)#这个send到哪个wait()还有待研究 :)
except Exception:
LOG.exception(_LE('in dynamic looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn(_inner)#启动一个绿色线程
return self.done
更多相关文章
- python中range()函数的用法--转载
- Python3入门(六)——函数式编程
- 更简单的方法来启用详细日志记录
- Python安装模块(numpy等)问题的两种解决办法——常规方法和Anacond
- Python执行系统命令:使用subprocess的Popen函数
- python传递列表作为函数参数
- python的内置函数
- python魔法方法、构造函数、序列与映射、迭代器、生成器
- Python 字典 pop() 方法