#! /usr/bin/env python3 """ wrapper A small tool which wraps services, discovery and poller php scripts in order to run them as threads with Queue and workers Authors: Orsiris de Jong Neil Lathwood Job Snijders Distributed poller code (c) 2015, GPLv3, Daniel Preussker All code parts that belong to Daniel are enclosed in EOC comments Date: Sep 2021 Usage: This program accepts three command line arguments - the number of threads (defaults to 1 for discovery / service, and 16 for poller) - the wrapper type (service, discovery or poller) - optional debug boolean Ubuntu Linux: apt-get install python-mysqldb FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean RHEL 7: yum install MySQL-python RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 / AlmaLinux 8.4 License: This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see https://www.gnu.org/licenses/. LICENSE.txt contains a copy of the full GPLv3 licensing conditions. """ import logging import os import queue import re import sys import threading import time import uuid from argparse import ArgumentParser import LibreNMS from LibreNMS.command_runner import command_runner from LibreNMS.config import DBConfig logger = logging.getLogger(__name__) # Timeout in seconds for any poller / service / discovery action per device # Should be higher than stepping which defaults to 300 PER_DEVICE_TIMEOUT = 900 # 5 = no new discovered devices, 6 = unreachable device VALID_EXIT_CODES = [0, 5, 6] DISTRIBUTED_POLLING = False # Is overriden by config.php REAL_DURATION = 0 DISCOVERED_DEVICES_COUNT = 0 PER_DEVICE_DURATION = {} ERRORS = 0 MEMC = None IS_NODE = None STEPPING = None MASTER_TAG = None NODES_TAG = None TIME_TAG = "" """ Per wrapper type configuration All time related variables are in seconds """ wrappers = { "service": { "executable": "check-services.php", "option": "-h", "table_name": "services", "memc_touch_time": 10, "stepping": 300, "nodes_stepping": 300, "total_exec_time": 300, }, "discovery": { "executable": "discovery.php", "option": "-h", "table_name": "devices", "memc_touch_time": 30, "stepping": 300, "nodes_stepping": 3600, "total_exec_time": 21600, }, "poller": { "executable": "lnms", "option": "device:poll -q", "table_name": "devices", "memc_touch_time": 10, "stepping": 300, "nodes_stepping": 300, "total_exec_time": 300, }, } """ Threading helper functions """ # << None """ Actual code that runs various php scripts, in single node mode or distributed poller mode """ global MEMC global IS_NODE global DISTRIBUTED_POLLING global MASTER_TAG global NODES_TAG global TIME_TAG global STEPPING # Setup wrapper dependent variables STEPPING = wrappers[wrapper_type]["stepping"] if wrapper_type == "poller": if "rrd" in config and "step" in config["rrd"]: STEPPING = config["rrd"]["step"] TIME_TAG = "." + str(get_time_tag(STEPPING)) MASTER_TAG = "{}.master{}".format(wrapper_type, TIME_TAG) NODES_TAG = "{}.nodes{}".format(wrapper_type, TIME_TAG) # << amount_of_devices: amount_of_workers = amount_of_devices logger.info( "starting the {} check at {} with {} threads for {} devices".format( wrapper_type, time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers, amount_of_devices, ) ) for device_id in devices_list: poll_queue.put(device_id) for _ in range(amount_of_workers): worker = threading.Thread( target=poll_worker, kwargs={ "poll_queue": poll_queue, "print_queue": print_queue, "config": config, "log_dir": log_dir, "wrapper_type": wrapper_type, "debug": _debug, "modules": kwargs.get("modules", ""), }, ) worker.setDaemon(True) worker.start() pworker = threading.Thread( target=print_worker, kwargs={"print_queue": print_queue, "wrapper_type": wrapper_type}, ) pworker.setDaemon(True) pworker.start() try: poll_queue.join() print_queue.join() except (KeyboardInterrupt, SystemExit): raise total_time = int(time.time() - s_time) end_msg = "{}-wrapper checked {} devices in {} seconds with {} workers with {} errors".format( wrapper_type, DISCOVERED_DEVICES_COUNT, total_time, amount_of_workers, ERRORS ) if ERRORS == 0: logger.info(end_msg) else: logger.error(end_msg) # << 0: try: time.sleep(1) nodes = MEMC.get(NODES_TAG) except: pass logger.info("Clearing Locks for {}".format(NODES_TAG)) x = minlocks while x <= maxlocks: MEMC.delete("{}.device.{}".format(wrapper_type, x)) x = x + 1 logger.info("{} Locks Cleared".format(x)) logger.info("Clearing Nodes") MEMC.delete(MASTER_TAG) MEMC.delete(NODES_TAG) else: MEMC.decr(NODES_TAG) logger.info("Finished {}.".format(time.strftime("%Y-%m-%d %H:%M:%S"))) # EOC # Update poller statistics if wrapper_type == "poller": query = "UPDATE pollers SET last_polled=NOW(), devices='{}', time_taken='{}' WHERE poller_name='{}'".format( DISCOVERED_DEVICES_COUNT, total_time, config["distributed_poller_name"] ) cursor = db_connection.query(query) if cursor.rowcount < 1: query = "INSERT INTO pollers SET poller_name='{}', last_polled=NOW(), devices='{}', time_taken='{}'".format( config["distributed_poller_name"], DISCOVERED_DEVICES_COUNT, total_time ) db_connection.query(query) db_connection.close() if total_time > wrappers[wrapper_type]["total_exec_time"]: logger.warning( "the process took more than {} seconds to finish, you need faster hardware or more threads".format( wrappers[wrapper_type]["total_exec_time"] ) ) logger.warning( "in sequential style service checks the elapsed time would have been: {} seconds".format( REAL_DURATION ) ) show_stopper = False for device in PER_DEVICE_DURATION: if PER_DEVICE_DURATION[device] > wrappers[wrapper_type]["nodes_stepping"]: logger.warning( "device {} is taking too long: {} seconds".format( device, PER_DEVICE_DURATION[device] ) ) show_stopper = True if show_stopper: logger.error( "Some devices are taking more than {} seconds, the script cannot recommend you what to do.".format( wrappers[wrapper_type]["nodes_stepping"] ) ) else: recommend = int(total_time / STEPPING * amount_of_workers + 1) logger.warning( "Consider setting a minimum of {} threads. (This does not constitute professional advice!)".format( recommend ) ) sys.exit(2) if __name__ == "__main__": parser = ArgumentParser( prog="wrapper.py", usage="usage: %(prog)s [options] \n" "wrapper_type = 'service', 'poller' or 'disccovery'" "workers defaults to 1 for service and discovery, and 16 for poller " "(Do not set too high, or you will get an OOM)", description="Spawn multiple librenms php processes in parallel.", ) parser.add_argument( "-d", "--debug", action="store_true", default=False, help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", ) parser.add_argument( "-m", "--modules", default="", help="Enable passing of a module string, modules are separated by comma", ) parser.add_argument( dest="wrapper", default=None, help="Execute wrapper for 'service', 'poller' or 'discovery'", ) parser.add_argument( dest="threads", action="store_true", default=None, help="Number of workers" ) args = parser.parse_args() debug = args.debug modules = args.modules or "" wrapper_type = args.wrapper amount_of_workers = args.threads if wrapper_type not in ["service", "discovery", "poller"]: parser.error("Invalid wrapper type '{}'".format(wrapper_type)) sys.exit(4) config = LibreNMS.get_config_data( os.path.dirname(os.path.dirname(os.path.realpath(__file__))) ) log_dir = config["log_dir"] log_file = os.path.join(log_dir, wrapper_type + ".log") logger = LibreNMS.logger_get_logger(log_file, debug=debug) try: amount_of_workers = int(amount_of_workers) except (IndexError, ValueError, TypeError): amount_of_workers = ( 16 if wrapper_type == "poller" else 1 ) # Defaults to 1 for service/discovery, 16 for poller logger.warning( "Bogus number of workers given. Using default number ({}) of workers.".format( amount_of_workers ) ) if wrapper_type in ["discovery", "poller"]: modules_validated = modules else: modules_validated = "" # ignore module parameter wrapper( wrapper_type, amount_of_workers, config, log_dir, _debug=debug, modules=modules_validated, )