diff --git a/LibreNMS/queuemanager.py b/LibreNMS/queuemanager.py index 979ae6d5a0..4d06be54fc 100644 --- a/LibreNMS/queuemanager.py +++ b/LibreNMS/queuemanager.py @@ -10,7 +10,7 @@ import LibreNMS class QueueManager: - def __init__(self, config, lock_manager, type_desc, work_function, auto_start=True): + def __init__(self, config, lock_manager, type_desc, uses_groups=False, auto_start=True): """ This class manages a queue of jobs and can be used to submit jobs to the queue with post_work() and process jobs in that queue in worker threads using the work_function @@ -26,6 +26,7 @@ class QueueManager: :param auto_start: automatically start worker threads """ self.type = type_desc + self.uses_groups = uses_groups self.config = config self.performance = LibreNMS.PerformanceCounter() @@ -34,7 +35,6 @@ class QueueManager: self._queue_create_lock = threading.Lock() self._lm = lock_manager - self._work_function = work_function self._stop_event = threading.Event() info("Groups: {}".format(self.config.group)) @@ -44,7 +44,7 @@ class QueueManager: if auto_start: self.start() - def _service_worker(self, work_func, queue_id): + def _service_worker(self, queue_id): debug("Worker started {}".format(threading.current_thread().getName())) while not self._stop_event.is_set(): debug("Worker {} checking queue {} ({}) for work".format(threading.current_thread().getName(), queue_id, @@ -60,10 +60,7 @@ class QueueManager: debug("Queues: {}".format(self._queues)) target_desc = "{} ({})".format(device_id if device_id else '', queue_id) if queue_id else device_id - if work_func: - work_func(device_id) - else: - self.do_work(device_id, queue_id) + self.do_work(device_id, queue_id) runtime = t.delta() info("Completed {} run for {} in {:.2f}s".format(self.type, target_desc, runtime)) @@ -93,7 +90,7 @@ class QueueManager: """ workers = self.get_poller_config().workers groups = self.config.group if hasattr(self.config.group, "__iter__") else [self.config.group] - if self.type == "discovery" or self.type == "poller": + if self.uses_groups: for group in groups: group_workers = max(int(workers / len(groups)), 1) for i in range(group_workers): @@ -109,7 +106,7 @@ class QueueManager: def spawn_worker(self, thread_name, group): pt = threading.Thread(target=self._service_worker, name=thread_name, - args=(self._work_function, group)) + args=(group,)) pt.daemon = True self._threads.append(pt) pt.start() @@ -211,19 +208,17 @@ class QueueManager: class TimedQueueManager(QueueManager): - def __init__(self, config, lock_manager, type_desc, work_function=None, dispatch_function=None, auto_start=True): + def __init__(self, config, lock_manager, type_desc, uses_groups=False, auto_start=True): """ A queue manager that periodically dispatches work to the queue The times are normalized like they started at 0:00 :param config: LibreNMS.ServiceConfig reference to the service config object :param type_desc: description for this queue manager type - :param work_function: function that will be called to perform the task - :param dispatch_function: function that will be called when the timer is up, should call post_work() + :param uses_groups: If this queue respects assigned groups or there is only one group :param auto_start: automatically start worker threads """ - dispatch_function = dispatch_function if dispatch_function else self.do_dispatch - QueueManager.__init__(self, config, lock_manager, type_desc, work_function, auto_start) - self.timer = LibreNMS.RecurringTimer(self.get_poller_config().frequency, dispatch_function) + QueueManager.__init__(self, config, lock_manager, type_desc, uses_groups, auto_start) + self.timer = LibreNMS.RecurringTimer(self.get_poller_config().frequency, self.do_dispatch) def start_dispatch(self): """ @@ -249,15 +244,14 @@ class TimedQueueManager(QueueManager): class BillingQueueManager(TimedQueueManager): - def __init__(self, config, lock_manager, auto_start=True): + def __init__(self, config, lock_manager): """ A TimedQueueManager with two timers dispatching poll billing and calculate billing to the same work queue :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager - :param auto_start: automatically start worker threads """ - TimedQueueManager.__init__(self, config, lock_manager, 'billing', None, None, auto_start) + TimedQueueManager.__init__(self, config, lock_manager, 'billing') self.calculate_timer = LibreNMS.RecurringTimer(self.get_poller_config().calculate, self.dispatch_calculate_billing, 'calculate_billing_timer') @@ -291,15 +285,14 @@ class BillingQueueManager(TimedQueueManager): class PingQueueManager(TimedQueueManager): - def __init__(self, config, lock_manager, auto_start=True): + def __init__(self, config, lock_manager): """ A TimedQueueManager to manage dispatch and workers for Ping :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager - :param auto_start: automatically start worker threads """ - TimedQueueManager.__init__(self, config, lock_manager, 'ping', auto_start=auto_start) + TimedQueueManager.__init__(self, config, lock_manager, 'ping', True) self._db = LibreNMS.DB(self.config) def do_dispatch(self): @@ -307,8 +300,8 @@ class PingQueueManager(TimedQueueManager): groups = self._db.query("SELECT DISTINCT (`poller_group`) FROM `devices`") for group in groups: self.post_work('', group[0]) - except pymysql.err.Error: - pass + except pymysql.err.Error as e: + critical("DB Exception ({})".format(e)) def do_work(self, context, group): if self.lock(group, 'group', timeout=self.config.ping.frequency): @@ -320,15 +313,14 @@ class PingQueueManager(TimedQueueManager): class ServicesQueueManager(TimedQueueManager): - def __init__(self, config, lock_manager, auto_start=True): + def __init__(self, config, lock_manager): """ A TimedQueueManager to manage dispatch and workers for Services :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager - :param auto_start: automatically start worker threads """ - TimedQueueManager.__init__(self, config, lock_manager, 'services', auto_start=auto_start) + TimedQueueManager.__init__(self, config, lock_manager, 'services', True) self._db = LibreNMS.DB(self.config) def do_dispatch(self): @@ -337,8 +329,8 @@ class ServicesQueueManager(TimedQueueManager): " LEFT JOIN `devices` USING (`device_id`) WHERE `disabled`=0") for device in devices: self.post_work(device[0], device[1]) - except pymysql.err.Error: - pass + except pymysql.err.Error as e: + critical("DB Exception ({})".format(e)) def do_work(self, device_id, group): if self.lock(device_id, timeout=self.config.services.frequency): @@ -355,15 +347,14 @@ class ServicesQueueManager(TimedQueueManager): class AlertQueueManager(TimedQueueManager): - def __init__(self, config, lock_manager, auto_start=True): + def __init__(self, config, lock_manager): """ A TimedQueueManager to manage dispatch and workers for Alerts :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager - :param auto_start: automatically start worker threads """ - TimedQueueManager.__init__(self, config, lock_manager, 'alerting', auto_start=auto_start) + TimedQueueManager.__init__(self, config, lock_manager, 'alerting') self._db = LibreNMS.DB(self.config) def do_dispatch(self): @@ -381,15 +372,14 @@ class AlertQueueManager(TimedQueueManager): class PollerQueueManager(QueueManager): - def __init__(self, config, lock_manager, auto_start=True): + def __init__(self, config, lock_manager): """ A TimedQueueManager to manage dispatch and workers for Alerts :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager - :param auto_start: automatically start worker threads """ - QueueManager.__init__(self, config, lock_manager, 'poller', None, auto_start=auto_start) + QueueManager.__init__(self, config, lock_manager, 'poller', True) def do_work(self, device_id, group): if self.lock(device_id, timeout=self.config.poller.frequency): @@ -414,15 +404,14 @@ class PollerQueueManager(QueueManager): class DiscoveryQueueManager(TimedQueueManager): - def __init__(self, config, lock_manager, auto_start=True): + def __init__(self, config, lock_manager): """ A TimedQueueManager to manage dispatch and workers for Alerts :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager - :param auto_start: automatically start worker threads """ - TimedQueueManager.__init__(self, config, lock_manager, 'discovery', None, auto_start=auto_start) + TimedQueueManager.__init__(self, config, lock_manager, 'discovery', True) self._db = LibreNMS.DB(self.config) def do_dispatch(self): @@ -430,8 +419,8 @@ class DiscoveryQueueManager(TimedQueueManager): devices = self._db.query("SELECT `device_id`, `poller_group` FROM `devices` WHERE `disabled`=0") for device in devices: self.post_work(device[0], device[1]) - except pymysql.err.Error: - pass + except pymysql.err.Error as e: + critical("DB Exception ({})".format(e)) def do_work(self, device_id, group): if self.lock(device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency)):