Dispatcher Service: ping and services did not respect assigned poller groups (#10259)

* Dispatcher Service: alerting and ping did not respect assigned poller groups

* Fix issue (missing comma)
add log entry for mysql errors
This commit is contained in:
Tony Murray 2019-05-25 19:08:04 -05:00 committed by GitHub
parent 5c39c41891
commit aff2ac49e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -10,7 +10,7 @@ import LibreNMS
class QueueManager:
def __init__(self, config, lock_manager, type_desc, work_function, auto_start=True):
def __init__(self, config, lock_manager, type_desc, uses_groups=False, auto_start=True):
"""
This class manages a queue of jobs and can be used to submit jobs to the queue with post_work()
and process jobs in that queue in worker threads using the work_function
@ -26,6 +26,7 @@ class QueueManager:
:param auto_start: automatically start worker threads
"""
self.type = type_desc
self.uses_groups = uses_groups
self.config = config
self.performance = LibreNMS.PerformanceCounter()
@ -34,7 +35,6 @@ class QueueManager:
self._queue_create_lock = threading.Lock()
self._lm = lock_manager
self._work_function = work_function
self._stop_event = threading.Event()
info("Groups: {}".format(self.config.group))
@ -44,7 +44,7 @@ class QueueManager:
if auto_start:
self.start()
def _service_worker(self, work_func, queue_id):
def _service_worker(self, queue_id):
debug("Worker started {}".format(threading.current_thread().getName()))
while not self._stop_event.is_set():
debug("Worker {} checking queue {} ({}) for work".format(threading.current_thread().getName(), queue_id,
@ -60,10 +60,7 @@ class QueueManager:
debug("Queues: {}".format(self._queues))
target_desc = "{} ({})".format(device_id if device_id else '',
queue_id) if queue_id else device_id
if work_func:
work_func(device_id)
else:
self.do_work(device_id, queue_id)
self.do_work(device_id, queue_id)
runtime = t.delta()
info("Completed {} run for {} in {:.2f}s".format(self.type, target_desc, runtime))
@ -93,7 +90,7 @@ class QueueManager:
"""
workers = self.get_poller_config().workers
groups = self.config.group if hasattr(self.config.group, "__iter__") else [self.config.group]
if self.type == "discovery" or self.type == "poller":
if self.uses_groups:
for group in groups:
group_workers = max(int(workers / len(groups)), 1)
for i in range(group_workers):
@ -109,7 +106,7 @@ class QueueManager:
def spawn_worker(self, thread_name, group):
pt = threading.Thread(target=self._service_worker, name=thread_name,
args=(self._work_function, group))
args=(group,))
pt.daemon = True
self._threads.append(pt)
pt.start()
@ -211,19 +208,17 @@ class QueueManager:
class TimedQueueManager(QueueManager):
def __init__(self, config, lock_manager, type_desc, work_function=None, dispatch_function=None, auto_start=True):
def __init__(self, config, lock_manager, type_desc, uses_groups=False, auto_start=True):
"""
A queue manager that periodically dispatches work to the queue
The times are normalized like they started at 0:00
:param config: LibreNMS.ServiceConfig reference to the service config object
:param type_desc: description for this queue manager type
:param work_function: function that will be called to perform the task
:param dispatch_function: function that will be called when the timer is up, should call post_work()
:param uses_groups: If this queue respects assigned groups or there is only one group
:param auto_start: automatically start worker threads
"""
dispatch_function = dispatch_function if dispatch_function else self.do_dispatch
QueueManager.__init__(self, config, lock_manager, type_desc, work_function, auto_start)
self.timer = LibreNMS.RecurringTimer(self.get_poller_config().frequency, dispatch_function)
QueueManager.__init__(self, config, lock_manager, type_desc, uses_groups, auto_start)
self.timer = LibreNMS.RecurringTimer(self.get_poller_config().frequency, self.do_dispatch)
def start_dispatch(self):
"""
@ -249,15 +244,14 @@ class TimedQueueManager(QueueManager):
class BillingQueueManager(TimedQueueManager):
def __init__(self, config, lock_manager, auto_start=True):
def __init__(self, config, lock_manager):
"""
A TimedQueueManager with two timers dispatching poll billing and calculate billing to the same work queue
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
:param auto_start: automatically start worker threads
"""
TimedQueueManager.__init__(self, config, lock_manager, 'billing', None, None, auto_start)
TimedQueueManager.__init__(self, config, lock_manager, 'billing')
self.calculate_timer = LibreNMS.RecurringTimer(self.get_poller_config().calculate,
self.dispatch_calculate_billing, 'calculate_billing_timer')
@ -291,15 +285,14 @@ class BillingQueueManager(TimedQueueManager):
class PingQueueManager(TimedQueueManager):
def __init__(self, config, lock_manager, auto_start=True):
def __init__(self, config, lock_manager):
"""
A TimedQueueManager to manage dispatch and workers for Ping
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
:param auto_start: automatically start worker threads
"""
TimedQueueManager.__init__(self, config, lock_manager, 'ping', auto_start=auto_start)
TimedQueueManager.__init__(self, config, lock_manager, 'ping', True)
self._db = LibreNMS.DB(self.config)
def do_dispatch(self):
@ -307,8 +300,8 @@ class PingQueueManager(TimedQueueManager):
groups = self._db.query("SELECT DISTINCT (`poller_group`) FROM `devices`")
for group in groups:
self.post_work('', group[0])
except pymysql.err.Error:
pass
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
def do_work(self, context, group):
if self.lock(group, 'group', timeout=self.config.ping.frequency):
@ -320,15 +313,14 @@ class PingQueueManager(TimedQueueManager):
class ServicesQueueManager(TimedQueueManager):
def __init__(self, config, lock_manager, auto_start=True):
def __init__(self, config, lock_manager):
"""
A TimedQueueManager to manage dispatch and workers for Services
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
:param auto_start: automatically start worker threads
"""
TimedQueueManager.__init__(self, config, lock_manager, 'services', auto_start=auto_start)
TimedQueueManager.__init__(self, config, lock_manager, 'services', True)
self._db = LibreNMS.DB(self.config)
def do_dispatch(self):
@ -337,8 +329,8 @@ class ServicesQueueManager(TimedQueueManager):
" LEFT JOIN `devices` USING (`device_id`) WHERE `disabled`=0")
for device in devices:
self.post_work(device[0], device[1])
except pymysql.err.Error:
pass
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
def do_work(self, device_id, group):
if self.lock(device_id, timeout=self.config.services.frequency):
@ -355,15 +347,14 @@ class ServicesQueueManager(TimedQueueManager):
class AlertQueueManager(TimedQueueManager):
def __init__(self, config, lock_manager, auto_start=True):
def __init__(self, config, lock_manager):
"""
A TimedQueueManager to manage dispatch and workers for Alerts
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
:param auto_start: automatically start worker threads
"""
TimedQueueManager.__init__(self, config, lock_manager, 'alerting', auto_start=auto_start)
TimedQueueManager.__init__(self, config, lock_manager, 'alerting')
self._db = LibreNMS.DB(self.config)
def do_dispatch(self):
@ -381,15 +372,14 @@ class AlertQueueManager(TimedQueueManager):
class PollerQueueManager(QueueManager):
def __init__(self, config, lock_manager, auto_start=True):
def __init__(self, config, lock_manager):
"""
A TimedQueueManager to manage dispatch and workers for Alerts
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
:param auto_start: automatically start worker threads
"""
QueueManager.__init__(self, config, lock_manager, 'poller', None, auto_start=auto_start)
QueueManager.__init__(self, config, lock_manager, 'poller', True)
def do_work(self, device_id, group):
if self.lock(device_id, timeout=self.config.poller.frequency):
@ -414,15 +404,14 @@ class PollerQueueManager(QueueManager):
class DiscoveryQueueManager(TimedQueueManager):
def __init__(self, config, lock_manager, auto_start=True):
def __init__(self, config, lock_manager):
"""
A TimedQueueManager to manage dispatch and workers for Alerts
:param config: LibreNMS.ServiceConfig reference to the service config object
:param lock_manager: the single instance of lock manager
:param auto_start: automatically start worker threads
"""
TimedQueueManager.__init__(self, config, lock_manager, 'discovery', None, auto_start=auto_start)
TimedQueueManager.__init__(self, config, lock_manager, 'discovery', True)
self._db = LibreNMS.DB(self.config)
def do_dispatch(self):
@ -430,8 +419,8 @@ class DiscoveryQueueManager(TimedQueueManager):
devices = self._db.query("SELECT `device_id`, `poller_group` FROM `devices` WHERE `disabled`=0")
for device in devices:
self.post_work(device[0], device[1])
except pymysql.err.Error:
pass
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
def do_work(self, device_id, group):
if self.lock(device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency)):