librenms/LibreNMS/queuemanager.py
Tony Murray 0ba76e6d62 New python service for poller, discovery + more (#8455)
Currently has a file handle leak (and will eventually run out of handles) related to the self update process.

Either need to fix that or rip out self-update and leave that up to cron or something.


DO NOT DELETE THIS TEXT

#### Please note

> Please read this information carefully. You can run `./scripts/pre-commit.php` to check your code before submitting.

- [x] Have you followed our [code guidelines?](http://docs.librenms.org/Developing/Code-Guidelines/)

#### Testers

If you would like to test this pull request then please run: `./scripts/github-apply <pr_id>`, i.e `./scripts/github-apply 5926`
2018-06-30 12:19:49 +01:00

241 lines
9.4 KiB
Python

import random
import threading
import traceback
from logging import debug, info, error, critical
from multiprocessing import Queue
from subprocess import CalledProcessError
import sys
import LibreNMS
if sys.version_info[0] < 3:
from Queue import Empty
else:
from queue import Empty
class QueueManager:
def __init__(self, config, type_desc, work_function, auto_start=True):
"""
This class manages a queue of jobs and can be used to submit jobs to the queue with post_work()
and process jobs in that queue in worker threads using the work_function
This will attempt to use redis to create a queue, but fall back to an internal queue.
If you are using redis, you can have multiple QueueManagers working on the same queue
You can start or stop the worker threads with start(), stop(), and stop_and_wait()
:param config: LibreNMS.ServiceConfig reference to the service config object
:param type_desc: description for this queue manager type
:param work_function: function that will be called to perform the task
:param auto_start: automatically start worker threads
"""
self.type = type_desc
self.config = config
self._threads = []
self._queues = {}
self._queue_create_lock = threading.Lock()
self._work_function = work_function
self._stop_event = threading.Event()
info("Groups: {}".format(self.config.group))
info("{} QueueManager created: {} workers, {}s frequency"
.format(self.type.title(), self.get_poller_config().workers, self.get_poller_config().frequency))
if auto_start:
self.start()
def _service_worker(self, work_func, queue_id):
while not self._stop_event.is_set():
try:
# cannot break blocking request with redis-py, so timeout :(
device_id = self.get_queue(queue_id).get(True, 3)
if device_id: # None returned by redis after timeout when empty
debug("Queues: {}".format(self._queues))
work_func(device_id)
except Empty:
pass # ignore empty queue exception from subprocess.Queue
except CalledProcessError as e:
error('{} poller script error! {} returned {}: {}'
.format(self.type.title(), e.cmd, e.returncode, e.output))
except Exception as e:
error('{} poller exception! {}'.format(self.type.title(), e))
traceback.print_exc()
def post_work(self, payload, queue_id):
"""
Post work to the the queue group.
:param payload: string payload to deliver to the worker
:param queue_id: which queue to post to, 0 is the default
"""
self.get_queue(queue_id).put(payload)
debug("Posted work for {} to {}:{} queue size: {}"
.format(payload, self.type, queue_id, self.get_queue(queue_id).qsize()))
def start(self):
"""
Start worker threads
"""
workers = self.get_poller_config().workers
groups = self.config.group if hasattr(self.config.group, "__iter__") else [self.config.group]
if self.type == "discovery" or self.type == "poller":
for group in groups:
group_workers = max(int(workers / len(groups)), 1)
for i in range(group_workers):
thread_name = "{}_{}-{}".format(self.type.title(), group, i + 1)
self.spawn_worker(thread_name, group)
debug("Started {} {} threads for group {}".format(group_workers, self.type, group))
else:
self.spawn_worker(self.type.title(), 0)
def spawn_worker(self, thread_name, group):
pt = threading.Thread(target=self._service_worker, name=thread_name,
args=(self._work_function, group))
pt.daemon = True
self._threads.append(pt)
pt.start()
def restart(self):
"""
Stop the worker threads and wait for them to finish. Then start them again.
"""
self.stop_and_wait()
self.start()
def stop(self):
"""
Stop the worker threads, does not wait for them to finish.
"""
self._stop_event.set()
def stop_and_wait(self):
"""
Stop the worker threads and wait for them to finish.
"""
self.stop() # make sure this has been called so we don't block forever
for t in self._threads:
t.join()
del self._threads[:]
def get_poller_config(self):
"""
Returns the LibreNMS.PollerConfig for this QueueManager
:return: LibreNMS.PollerConfig
"""
return getattr(self.config, self.type)
def get_queue(self, group):
name = self.queue_name(self.type, group)
if name not in self._queues.keys():
with self._queue_create_lock:
if name not in self._queues.keys():
self._queues[name] = self._create_queue(self.type, group)
return self._queues[name]
def _create_queue(self, queue_type, group):
"""
Create a queue (not thread safe)
:param queue_type:
:param group:
:return:
"""
info("Creating queue {}".format(self.queue_name(queue_type, group)))
try:
return LibreNMS.RedisQueue(self.queue_name(queue_type, group),
namespace='librenms.queue',
host=self.config.redis_host,
port=self.config.redis_port,
db=self.config.redis_db,
password=self.config.redis_pass,
unix_socket_path=self.config.redis_socket
)
except ImportError:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical("Please install redis-py, either through your os software repository or from PyPI")
exit(2)
except Exception as e:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical("Could not connect to Redis. {}".format(e))
exit(2)
return Queue()
@staticmethod
def queue_name(queue_type, group):
if queue_type and type(group) == int:
return "{}:{}".format(queue_type, group)
else:
raise ValueError("Refusing to create improperly scoped queue - parameters were invalid or not set")
class TimedQueueManager(QueueManager):
def __init__(self, config, type_desc, work_function, dispatch_function, auto_start=True):
"""
A queue manager that periodically dispatches work to the queue
The times are normalized like they started at 0:00
:param config: LibreNMS.ServiceConfig reference to the service config object
:param type_desc: description for this queue manager type
:param work_function: function that will be called to perform the task
:param dispatch_function: function that will be called when the timer is up, should call post_work()
:param auto_start: automatically start worker threads
"""
QueueManager.__init__(self, config, type_desc, work_function, auto_start)
self.timer = LibreNMS.RecurringTimer(self.get_poller_config().frequency, dispatch_function)
def start_dispatch(self):
"""
Start the dispatch timer, this is not called automatically on init
"""
self.timer.start()
def stop_dispatch(self):
"""
Stop the dispatch timer
"""
self.timer.stop()
def stop(self):
"""
Stop the worker threads and dispatcher thread, does not wait for them to finish.
"""
self.stop_dispatch()
QueueManager.stop(self)
class BillingQueueManager(TimedQueueManager):
def __init__(self, config, work_function, poll_dispatch_function, calculate_dispatch_function,
auto_start=True):
"""
A TimedQueueManager with two timers dispatching poll billing and calculate billing to the same work queue
:param config: LibreNMS.ServiceConfig reference to the service config object
:param work_function: function that will be called to perform the task
:param poll_dispatch_function: function that will be called when the timer is up, should call post_work()
:param calculate_dispatch_function: function that will be called when the timer is up, should call post_work()
:param auto_start: automatically start worker threads
"""
TimedQueueManager.__init__(self, config, 'billing', work_function, poll_dispatch_function, auto_start)
self.calculate_timer = LibreNMS.RecurringTimer(self.get_poller_config().calculate, calculate_dispatch_function, 'calculate_billing_timer')
def start_dispatch(self):
"""
Start the dispatch timer, this is not called automatically on init
"""
self.calculate_timer.start()
TimedQueueManager.start_dispatch(self)
def stop_dispatch(self):
"""
Stop the dispatch timer
"""
self.calculate_timer.stop()
TimedQueueManager.stop_dispatch(self)