Source code for invenio_celery.ext

# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015-2018 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Celery application for Invenio."""

from __future__ import absolute_import, print_function

import time
import warnings

import pkg_resources
from celery.signals import import_modules
from flask_celeryext import FlaskCeleryExt

from . import config


[docs]class InvenioCelery(object): """Invenio celery extension.""" def __init__(self, app=None, **kwargs): """Extension initialization.""" self.celery = None if app: self.init_app(app, **kwargs)
[docs] def init_app(self, app, entry_point_group='invenio_celery.tasks', **kwargs): """Initialize application object.""" self.init_config(app) self.celery = FlaskCeleryExt(app).celery self.entry_point_group = entry_point_group app.extensions['invenio-celery'] = self
[docs] def load_entry_points(self): """Load tasks from entry points.""" if self.entry_point_group: task_packages = {} for item in pkg_resources.iter_entry_points( group=self.entry_point_group): # Celery 4.2 requires autodiscover to be called with # related_name for Python 2.7. try: pkg, related_name = item.module_name.rsplit('.', 1) except ValueError: warnings.warn( 'The celery task module "{}" was not loaded. ' 'Defining modules in bare Python modules is no longer ' 'supported due to Celery v4.2 constraints. Please ' 'move the module into a Python package.'.format( item.module_name ), RuntimeWarning ) continue if related_name not in task_packages: task_packages[related_name] = [] task_packages[related_name].append(pkg) if task_packages: for related_name, packages in task_packages.items(): self.celery.autodiscover_tasks( packages, related_name=related_name, force=True )
[docs] def init_config(self, app): """Initialize configuration.""" for k in dir(config): if k.startswith('CELERY_') or k.startswith('BROKER_'): app.config.setdefault(k, getattr(config, k))
[docs] def get_queues(self): """Return a list of current active Celery queues.""" res = self.celery.control.inspect().active_queues() or dict() return [result.get('name') for host in res.values() for result in host]
[docs] def disable_queue(self, name): """Disable given Celery queue.""" self.celery.control.cancel_consumer(name)
[docs] def enable_queue(self, name): """Enable given Celery queue.""" self.celery.control.add_consumer(name)
[docs] def get_active_tasks(self): """Return a list of UUIDs of active tasks.""" current_tasks = self.celery.control.inspect().active() or dict() return [ task.get('id') for host in current_tasks.values() for task in host]
[docs] def suspend_queues(self, active_queues, sleep_time=10.0): """Suspend Celery queues and wait for running tasks to complete.""" for queue in active_queues: self.disable_queue(queue) while self.get_active_tasks(): time.sleep(sleep_time)
[docs]@import_modules.connect() def celery_module_imports(sender, signal=None, **kwargs): """Load shared celery tasks.""" app = getattr(sender, 'flask_app', None) if app: app.extensions['invenio-celery'].load_entry_points()