From bfa200f3f7263a3c8e4f33cd7615067702945b58 Mon Sep 17 00:00:00 2001 From: Orsiris de Jong Date: Mon, 27 Sep 2021 21:24:25 +0200 Subject: [PATCH] Full Python code fusion / refactor and hardening 2nd edition (#13188) * New service/discovery/poller wrapper * Convert old wrapper scripts to bootstrap loaders for wrapper.py * Move wrapper.py to LibreNMS module directory * Reformat files * File reformatting * bootstrap files reformatting * Fusion service and wrapper database connections and get_config_data functions * Moved subprocess calls to command_runner * LibreNMS library and __init__ fusion * Reformat files * Normalize logging use * Reformatting code * Fix missing argument for error log * Fix refactor typo in DBConfig class * Add default timeout for config.php data fetching * distributed discovery should finish with a timestamp instead of an epoch * Fix docstring inside dict prevents service key to work * Fix poller insert statement * Fix service wrapper typo * Update docstring since we changed function behavior * Normalize SQL statements * Convert optparse to argparse * Revert discovery thread number * Handle debug logging * Fix file option typo * Reformat code * Add credits to source package * Rename logs depending on the wrapper type * Cap max logfile size to 10MB * Reformat code * Add exception for Redis < 5.0 * Make sure we always log something from service * Fix bogus description * Add an error message on missing config file * Improve error message when .env file cannot be loaded * Improve wrapper logging * Fix cron run may fail when environment path is not set * Add missing -wrapper suffix for logs * Conform to prior naming scheme * Linter fix * Add inline copy of command_runner * Another linter fix * Raise exception after logging * Updated inline command_runner * Add command_runner to requirements * I guess I love linter fixes ;) * Don't spawn more threads than devices * Fix typo in log call * Add exit codes to log on error, add command line to debug log * Add thread name to error message * Log errors in end message for easier debugging * Typo fix * In love of linting --- LibreNMS/__init__.py | 220 ++++++++++-- LibreNMS/command_runner.py | 640 +++++++++++++++++++++++++++++++++++ LibreNMS/library.py | 174 ---------- LibreNMS/queuemanager.py | 153 +++++---- LibreNMS/service.py | 159 ++++----- LibreNMS/wrapper.py | 669 +++++++++++++++++++++++++++++++++++++ discovery-wrapper.py | 481 +++----------------------- librenms-service.py | 5 +- poller-wrapper.py | 513 +++------------------------- requirements.txt | 1 + services-wrapper.py | 485 +++------------------------ 11 files changed, 1831 insertions(+), 1669 deletions(-) create mode 100644 LibreNMS/command_runner.py delete mode 100644 LibreNMS/library.py create mode 100644 LibreNMS/wrapper.py diff --git a/LibreNMS/__init__.py b/LibreNMS/__init__.py index 28701ba344..e4cbb4b5e8 100644 --- a/LibreNMS/__init__.py +++ b/LibreNMS/__init__.py @@ -1,15 +1,17 @@ +import json +import logging import os -import subprocess +import sys +import tempfile import threading import timeit from collections import deque - -from logging import critical, info, debug, exception +from logging.handlers import RotatingFileHandler from math import ceil from queue import Queue from time import time -from .service import Service, ServiceConfig +from .command_runner import command_runner from .queuemanager import ( QueueManager, TimedQueueManager, @@ -20,6 +22,161 @@ from .queuemanager import ( PollerQueueManager, DiscoveryQueueManager, ) +from .service import Service, ServiceConfig + +# Hard limit script execution time so we don't get to "hang" +DEFAULT_SCRIPT_TIMEOUT = 3600 +MAX_LOGFILE_SIZE = (1024 ** 2) * 10 # 10 Megabytes max log files + +logger = logging.getLogger(__name__) + +# Logging functions ######################################################## +# Original logger functions from ofunctions.logger_utils package + +FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s") + + +def logger_get_console_handler(): + try: + console_handler = logging.StreamHandler(sys.stdout) + except OSError as exc: + print("Cannot log to stdout, trying stderr. Message %s" % exc) + try: + console_handler = logging.StreamHandler(sys.stderr) + console_handler.setFormatter(FORMATTER) + return console_handler + except OSError as exc: + print("Cannot log to stderr neither. Message %s" % exc) + return False + else: + console_handler.setFormatter(FORMATTER) + return console_handler + + +def logger_get_file_handler(log_file): + err_output = None + try: + file_handler = RotatingFileHandler( + log_file, + mode="a", + encoding="utf-8", + maxBytes=MAX_LOGFILE_SIZE, + backupCount=3, + ) + except OSError as exc: + try: + print( + "Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s" + % exc + ) + err_output = str(exc) + temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log" + print("Trying temporary log file in " + temp_log_file) + file_handler = RotatingFileHandler( + temp_log_file, + mode="a", + encoding="utf-8", + maxBytes=MAX_LOGFILE_SIZE, + backupCount=1, + ) + file_handler.setFormatter(FORMATTER) + err_output += "\nUsing [%s]" % temp_log_file + return file_handler, err_output + except OSError as exc: + print( + "Cannot create temporary log file either. Will not log to file. Message: %s" + % exc + ) + return False + else: + file_handler.setFormatter(FORMATTER) + return file_handler, err_output + + +def logger_get_logger(log_file=None, temp_log_file=None, debug=False): + # If a name is given to getLogger, than modules can't log to the root logger + _logger = logging.getLogger() + if debug is True: + _logger.setLevel(logging.DEBUG) + else: + _logger.setLevel(logging.INFO) + console_handler = logger_get_console_handler() + if console_handler: + _logger.addHandler(console_handler) + if log_file is not None: + file_handler, err_output = logger_get_file_handler(log_file) + if file_handler: + _logger.addHandler(file_handler) + _logger.propagate = False + if err_output is not None: + print(err_output) + _logger.warning( + "Failed to use log file [%s], %s.", log_file, err_output + ) + if temp_log_file is not None: + if os.path.isfile(temp_log_file): + try: + os.remove(temp_log_file) + except OSError: + logger.warning("Cannot remove temp log file [%s]." % temp_log_file) + file_handler, err_output = logger_get_file_handler(temp_log_file) + if file_handler: + _logger.addHandler(file_handler) + _logger.propagate = False + if err_output is not None: + print(err_output) + _logger.warning( + "Failed to use log file [%s], %s.", log_file, err_output + ) + return _logger + + +# Generic functions ######################################################## + + +def check_for_file(file): + try: + with open(file) as file: + pass + except IOError as exc: + logger.error("File '%s' is not readable" % file) + logger.debug("Traceback:", exc_info=True) + sys.exit(2) + + +# Config functions ######################################################### + + +def get_config_data(base_dir): + check_for_file(os.path.join(base_dir, ".env")) + + try: + import dotenv + + env_path = "{}/.env".format(base_dir) + logger.info("Attempting to load .env from '%s'", env_path) + dotenv.load_dotenv(dotenv_path=env_path, verbose=True) + + if not os.getenv("NODE_ID"): + logger.critical(".env does not contain a valid NODE_ID setting.") + + except ImportError as exc: + logger.critical( + 'Could not import "%s" - Please check that the poller user can read the file, and that composer install has been run recently\nAdditional info: %s' + % (env_path, exc) + ) + logger.debug("Traceback:", exc_info=True) + + config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % base_dir] + try: + exit_code, output = command_runner(config_cmd, timeout=300) + if exit_code == 0: + return json.loads(output) + raise EnvironmentError + except Exception as exc: + logger.critical("ERROR: Could not execute command [%s]: %s" % (config_cmd, exc)) + logger.debug("Traceback:", exc_info=True) + return None def normalize_wait(seconds): @@ -28,8 +185,9 @@ def normalize_wait(seconds): def call_script(script, args=()): """ - Run a LibreNMS script. Captures all output and throws an exception if a non-zero - status is returned. Blocks parent signals (like SIGINT and SIGTERM). + Run a LibreNMS script. Captures all output returns exit code. + Blocks parent signals (like SIGINT and SIGTERM). + Kills script if it takes too long :param script: the name of the executable relative to the base directory :param args: a tuple of arguments to send to the command :returns the output of the command @@ -42,14 +200,10 @@ def call_script(script, args=()): base_dir = os.path.realpath(os.path.dirname(__file__) + "/..") cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args)) - debug("Running {}".format(cmd)) + logger.debug("Running {}".format(cmd)) # preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default) - return subprocess.check_call( - cmd, - stdout=subprocess.DEVNULL, - stderr=subprocess.STDOUT, - preexec_fn=os.setsid, - close_fds=True, + return command_runner( + cmd, preexec_fn=os.setsid, close_fds=True, timeout=DEFAULT_SCRIPT_TIMEOUT ) @@ -70,15 +224,15 @@ class DB: import pymysql pymysql.install_as_MySQLdb() - info("Using pure python SQL client") + logger.info("Using pure python SQL client") except ImportError: - info("Using other SQL client") + logger.info("Using other SQL client") try: import MySQLdb except ImportError: - critical("ERROR: missing a mysql python module") - critical( + logger.critical("ERROR: missing a mysql python module") + logger.critical( "Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI" ) raise @@ -99,7 +253,7 @@ class DB: conn.ping(True) self._db[threading.get_ident()] = conn except Exception as e: - critical("ERROR: Could not connect to MySQL database! {}".format(e)) + logger.critical("ERROR: Could not connect to MySQL database! {}".format(e)) raise def db_conn(self): @@ -128,7 +282,7 @@ class DB: cursor.close() return cursor except Exception as e: - critical("DB Connection exception {}".format(e)) + logger.critical("DB Connection exception {}".format(e)) self.close() raise @@ -167,7 +321,7 @@ class RecurringTimer: class Lock: - """ Base lock class this is not thread safe""" + """Base lock class this is not thread safe""" def __init__(self): self._locks = {} # store a tuple (owner, expiration) @@ -210,7 +364,7 @@ class Lock: return False def print_locks(self): - debug(self._locks) + logger.debug(self._locks) class ThreadingLock(Lock): @@ -269,7 +423,7 @@ class RedisLock(Lock): self._redis = redis.Redis(**kwargs) self._redis.ping() self._namespace = namespace - info( + logger.info( "Created redis lock manager with socket_timeout of {}s".format( redis_kwargs["socket_timeout"] ) @@ -296,7 +450,7 @@ class RedisLock(Lock): non_existing = not (allow_owner_relock and self._redis.get(key) == owner) return self._redis.set(key, owner, ex=int(expiration), nx=non_existing) except redis.exceptions.ResponseError as e: - exception( + logger.critical( "Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s", name, owner, @@ -351,7 +505,7 @@ class RedisUniqueQueue(object): self._redis = redis.Redis(**kwargs) self._redis.ping() self.key = "{}:{}".format(namespace, name) - info( + logger.info( "Created redis queue with socket_timeout of {}s".format( redis_kwargs["socket_timeout"] ) @@ -371,10 +525,20 @@ class RedisUniqueQueue(object): self._redis.zadd(self.key, {item: time()}, nx=True) def get(self, block=True, timeout=None): - if block: - item = self._redis.bzpopmin(self.key, timeout=timeout) - else: - item = self._redis.zpopmin(self.key) + try: + if block: + item = self._redis.bzpopmin(self.key, timeout=timeout) + else: + item = self._redis.zpopmin(self.key) + # Unfortunately we cannot use _redis.exceptions.ResponseError Exception here + # Since it would trigger another exception in queuemanager + except Exception as e: + logger.critical( + "BZPOPMIN/ZPOPMIN command failed: {}\nNote that redis >= 5.0 is required.".format( + e + ) + ) + raise if item: item = item[1] diff --git a/LibreNMS/command_runner.py b/LibreNMS/command_runner.py new file mode 100644 index 0000000000..e93ce7d0d7 --- /dev/null +++ b/LibreNMS/command_runner.py @@ -0,0 +1,640 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# +# This file is part of command_runner module + +""" +command_runner is a quick tool to launch commands from Python, get exit code +and output, and handle most errors that may happen + +Versioning semantics: + Major version: backward compatibility breaking changes + Minor version: New functionality + Patch version: Backwards compatible bug fixes + +""" + +__intname__ = "command_runner" +__author__ = "Orsiris de Jong" +__copyright__ = "Copyright (C) 2015-2021 Orsiris de Jong" +__licence__ = "BSD 3 Clause" +__version__ = "1.2.1" +__build__ = "2021090901" + +import io +import os +import shlex +import subprocess +import sys +from datetime import datetime +from logging import getLogger +from time import sleep + +try: + import psutil +except ImportError: + # Don't bother with an error since we need command_runner to work without dependencies + pass +try: + import signal +except ImportError: + pass + +# Python 2.7 compat fixes (queue was Queue) +try: + import queue +except ImportError: + import Queue as queue +import threading + +# Python 2.7 compat fixes (missing typing and FileNotFoundError) +try: + from typing import Union, Optional, List, Tuple, NoReturn, Any +except ImportError: + pass +try: + FileNotFoundError +except NameError: + # pylint: disable=W0622 (redefined-builtin) + FileNotFoundError = IOError +try: + TimeoutExpired = subprocess.TimeoutExpired +except AttributeError: + + class TimeoutExpired(BaseException): + """ + Basic redeclaration when subprocess.TimeoutExpired does not exist, python <= 3.3 + """ + + def __init__(self, cmd, timeout, output=None, stderr=None): + self.cmd = cmd + self.timeout = timeout + self.output = output + self.stderr = stderr + + def __str__(self): + return "Command '%s' timed out after %s seconds" % (self.cmd, self.timeout) + + @property + def stdout(self): + return self.output + + @stdout.setter + def stdout(self, value): + # There's no obvious reason to set this, but allow it anyway so + # .stdout is a transparent alias for .output + self.output = value + + +class KbdInterruptGetOutput(BaseException): + """ + Make sure we get the current output when KeyboardInterrupt is made + """ + + def __init__(self, output): + self._output = output + + @property + def output(self): + return self._output + + +logger = getLogger(__intname__) +PIPE = subprocess.PIPE +MIN_RESOLUTION = 0.05 # Minimal sleep time between polling, reduces CPU usage + + +def kill_childs_mod( + pid=None, # type: int + itself=False, # type: bool + soft_kill=False, # type: bool +): + # type: (...) -> bool + """ + Inline version of ofunctions.kill_childs that has no hard dependency on psutil + + Kills all childs of pid (current pid can be obtained with os.getpid()) + If no pid given current pid is taken + Good idea when using multiprocessing, is to call with atexit.register(ofunctions.kill_childs, os.getpid(),) + + Beware: MS Windows does not maintain a process tree, so child dependencies are computed on the fly + Knowing this, orphaned processes (where parent process died) cannot be found and killed this way + + Prefer using process.send_signal() in favor of process.kill() to avoid race conditions when PID was reused too fast + + :param pid: Which pid tree we'll kill + :param itself: Should parent be killed too ? + """ + sig = None + + ### BEGIN COMMAND_RUNNER MOD + if "psutil" not in sys.modules: + logger.error( + "No psutil module present. Can only kill direct pids, not child subtree." + ) + if "signal" not in sys.modules: + logger.error( + "No signal module present. Using direct psutil kill API which might have race conditions when PID is reused too fast." + ) + else: + """ + Extract from Python3 doc + On Windows, signal() can only be called with SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM, or SIGBREAK. + A ValueError will be raised in any other case. Note that not all systems define the same set of signal names; + an AttributeError will be raised if a signal name is not defined as SIG* module level constant. + """ + try: + if not soft_kill and hasattr(signal, "SIGKILL"): + # Don't bother to make pylint go crazy on Windows + # pylint: disable=E1101 + sig = signal.SIGKILL + else: + sig = signal.SIGTERM + except NameError: + sig = None + ### END COMMAND_RUNNER MOD + + def _process_killer( + process, # type: Union[subprocess.Popen, psutil.Process] + sig, # type: signal.valid_signals + soft_kill, # type: bool + ): + # (...) -> None + """ + Simple abstract process killer that works with signals in order to avoid reused PID race conditions + and can prefers using terminate than kill + """ + if sig: + try: + process.send_signal(sig) + # psutil.NoSuchProcess might not be available, let's be broad + # pylint: disable=W0703 + except Exception: + pass + else: + if soft_kill: + process.terminate() + else: + process.kill() + + try: + current_process = psutil.Process(pid if pid is not None else os.getpid()) + # psutil.NoSuchProcess might not be available, let's be broad + # pylint: disable=W0703 + except Exception: + if itself: + os.kill( + pid, 15 + ) # 15 being signal.SIGTERM or SIGKILL depending on the platform + return False + + for child in current_process.children(recursive=True): + _process_killer(child, sig, soft_kill) + + if itself: + _process_killer(current_process, sig, soft_kill) + return True + + +def command_runner( + command, # type: Union[str, List[str]] + valid_exit_codes=None, # type: Optional[List[int]] + timeout=3600, # type: Optional[int] + shell=False, # type: bool + encoding=None, # type: Optional[str] + stdout=None, # type: Union[int, str] + stderr=None, # type: Union[int, str] + windows_no_window=False, # type: bool + live_output=False, # type: bool + method="monitor", # type: str + **kwargs # type: Any +): + # type: (...) -> Tuple[Optional[int], str] + """ + Unix & Windows compatible subprocess wrapper that handles output encoding and timeouts + Newer Python check_output already handles encoding and timeouts, but this one is retro-compatible + It is still recommended to set cp437 for windows and utf-8 for unix + + Also allows a list of various valid exit codes (ie no error when exit code = arbitrary int) + + command should be a list of strings, eg ['ping', '127.0.0.1', '-c 2'] + command can also be a single string, ex 'ping 127.0.0.1 -c 2' if shell=True or if os is Windows + + Accepts all of subprocess.popen arguments + + Whenever we can, we need to avoid shell=True in order to preserve better security + Avoiding shell=True involves passing absolute paths to executables since we don't have shell PATH environment + + When no stdout option is given, we'll get output into the returned (exit_code, output) tuple + When stdout = filename or stderr = filename, we'll write output to the given file + + live_output will poll the process for output and show it on screen (output may be non reliable, don't use it if + your program depends on the commands' stdout output) + + windows_no_window will disable visible window (MS Windows platform only) + + Returns a tuple (exit_code, output) + """ + + # Choose default encoding when none set + # cp437 encoding assures we catch most special characters from cmd.exe + if not encoding: + encoding = "cp437" if os.name == "nt" else "utf-8" + + # Fix when unix command was given as single string + # This is more secure than setting shell=True + if os.name == "posix" and shell is False and isinstance(command, str): + command = shlex.split(command) + + # Set default values for kwargs + errors = kwargs.pop( + "errors", "backslashreplace" + ) # Don't let encoding issues make you mad + universal_newlines = kwargs.pop("universal_newlines", False) + creationflags = kwargs.pop("creationflags", 0) + # subprocess.CREATE_NO_WINDOW was added in Python 3.7 for Windows OS only + if ( + windows_no_window + and sys.version_info[0] >= 3 + and sys.version_info[1] >= 7 + and os.name == "nt" + ): + # Disable the following pylint error since the code also runs on nt platform, but + # triggers an error on Unix + # pylint: disable=E1101 + creationflags = creationflags | subprocess.CREATE_NO_WINDOW + close_fds = kwargs.pop("close_fds", "posix" in sys.builtin_module_names) + + # Default buffer size. line buffer (1) is deprecated in Python 3.7+ + bufsize = kwargs.pop("bufsize", 16384) + + # Decide whether we write to output variable only (stdout=None), to output variable and stdout (stdout=PIPE) + # or to output variable and to file (stdout='path/to/file') + if stdout is None: + _stdout = PIPE + stdout_to_file = False + elif isinstance(stdout, str): + # We will send anything to file + _stdout = open(stdout, "wb") + stdout_to_file = True + else: + # We will send anything to given stdout pipe + _stdout = stdout + stdout_to_file = False + + # The only situation where we don't add stderr to stdout is if a specific target file was given + if isinstance(stderr, str): + _stderr = open(stderr, "wb") + stderr_to_file = True + else: + _stderr = subprocess.STDOUT + stderr_to_file = False + + def to_encoding( + process_output, # type: Union[str, bytes] + encoding, # type: str + errors, # type: str + ): + # type: (...) -> str + """ + Convert bytes output to string and handles conversion errors + """ + # Compatibility for earlier Python versions where Popen has no 'encoding' nor 'errors' arguments + if isinstance(process_output, bytes): + try: + process_output = process_output.decode(encoding, errors=errors) + except TypeError: + try: + # handle TypeError: don't know how to handle UnicodeDecodeError in error callback + process_output = process_output.decode(encoding, errors="ignore") + except (ValueError, TypeError): + # What happens when str cannot be concatenated + logger.debug("Output cannot be captured {}".format(process_output)) + return process_output + + def _read_pipe( + stream, # type: io.StringIO + output_queue, # type: queue.Queue + ): + # type: (...) -> None + """ + will read from subprocess.PIPE + Must be threaded since readline() might be blocking on Windows GUI apps + + Partly based on https://stackoverflow.com/a/4896288/2635443 + """ + + # WARNING: Depending on the stream type (binary or text), the sentinel character + # needs to be of the same type, or the iterator won't have an end + + # We also need to check that stream has readline, in case we're writing to files instead of PIPE + if hasattr(stream, "readline"): + sentinel_char = "" if hasattr(stream, "encoding") else b"" + for line in iter(stream.readline, sentinel_char): + output_queue.put(line) + output_queue.put(None) + stream.close() + + def _poll_process( + process, # type: Union[subprocess.Popen[str], subprocess.Popen] + timeout, # type: int + encoding, # type: str + errors, # type: str + ): + # type: (...) -> Tuple[Optional[int], str] + """ + Process stdout/stderr output polling is only used in live output mode + since it takes more resources than using communicate() + + Reads from process output pipe until: + - Timeout is reached, in which case we'll terminate the process + - Process ends by itself + + Returns an encoded string of the pipe output + """ + + begin_time = datetime.now() + output = "" + output_queue = queue.Queue() + + def __check_timeout( + begin_time, # type: datetime.timestamp + timeout, # type: int + ): + # type: (...) -> None + """ + Simple subfunction to check whether timeout is reached + Since we check this alot, we put it into a function + """ + + if timeout and (datetime.now() - begin_time).total_seconds() > timeout: + kill_childs_mod(process.pid, itself=True, soft_kill=False) + raise TimeoutExpired(process, timeout, output) + + try: + read_thread = threading.Thread( + target=_read_pipe, args=(process.stdout, output_queue) + ) + read_thread.daemon = True # thread dies with the program + read_thread.start() + + while True: + try: + line = output_queue.get(timeout=MIN_RESOLUTION) + except queue.Empty: + __check_timeout(begin_time, timeout) + else: + if line is None: + break + else: + line = to_encoding(line, encoding, errors) + if live_output: + sys.stdout.write(line) + output += line + __check_timeout(begin_time, timeout) + + # Make sure we wait for the process to terminate, even after + # output_queue has finished sending data, so we catch the exit code + while process.poll() is None: + __check_timeout(begin_time, timeout) + # Additional timeout check to make sure we don't return an exit code from processes + # that were killed because of timeout + __check_timeout(begin_time, timeout) + exit_code = process.poll() + return exit_code, output + + except KeyboardInterrupt: + raise KbdInterruptGetOutput(output) + + def _timeout_check_thread( + process, # type: Union[subprocess.Popen[str], subprocess.Popen] + timeout, # type: int + timeout_queue, # type: queue.Queue + ): + # type: (...) -> None + + """ + Since elder python versions don't have timeout, we need to manually check the timeout for a process + """ + + begin_time = datetime.now() + while True: + if timeout and (datetime.now() - begin_time).total_seconds() > timeout: + kill_childs_mod(process.pid, itself=True, soft_kill=False) + timeout_queue.put(True) + break + if process.poll() is not None: + break + sleep(MIN_RESOLUTION) + + def _monitor_process( + process, # type: Union[subprocess.Popen[str], subprocess.Popen] + timeout, # type: int + encoding, # type: str + errors, # type: str + ): + # type: (...) -> Tuple[Optional[int], str] + """ + Create a thread in order to enforce timeout + Get stdout output and return it + """ + + # Shared mutable objects have proven to have race conditions with PyPy 3.7 (mutable object + # is changed in thread, but outer monitor function has still old mutable object state) + # Strangely, this happened only sometimes on github actions/ubuntu 20.04.3 & pypy 3.7 + # Let's create a queue to get the timeout thread response on a deterministic way + timeout_queue = queue.Queue() + is_timeout = False + + thread = threading.Thread( + target=_timeout_check_thread, + args=(process, timeout, timeout_queue), + ) + thread.setDaemon(True) + thread.start() + + process_output = None + stdout = None + + try: + # Don't use process.wait() since it may deadlock on old Python versions + # Also it won't allow communicate() to get incomplete output on timeouts + while process.poll() is None: + sleep(MIN_RESOLUTION) + try: + is_timeout = timeout_queue.get_nowait() + except queue.Empty: + pass + else: + break + # We still need to use process.communicate() in this loop so we don't get stuck + # with poll() is not None even after process is finished + try: + stdout, _ = process.communicate() + # ValueError is raised on closed IO file + except (TimeoutExpired, ValueError): + pass + exit_code = process.poll() + + try: + stdout, _ = process.communicate() + except (TimeoutExpired, ValueError): + pass + process_output = to_encoding(stdout, encoding, errors) + + # On PyPy 3.7 only, we can have a race condition where we try to read the queue before + # the thread could write to it, failing to register a timeout. + # This workaround prevents reading the queue while the thread is still alive + while thread.is_alive(): + sleep(MIN_RESOLUTION) + + try: + is_timeout = timeout_queue.get_nowait() + except queue.Empty: + pass + if is_timeout: + raise TimeoutExpired(process, timeout, process_output) + return exit_code, process_output + except KeyboardInterrupt: + raise KbdInterruptGetOutput(process_output) + + try: + # Finally, we won't use encoding & errors arguments for Popen + # since it would defeat the idea of binary pipe reading in live mode + + # Python >= 3.3 has SubProcessError(TimeoutExpired) class + # Python >= 3.6 has encoding & error arguments + # universal_newlines=True makes netstat command fail under windows + # timeout does not work under Python 2.7 with subprocess32 < 3.5 + # decoder may be cp437 or unicode_escape for dos commands or utf-8 for powershell + # Disabling pylint error for the same reason as above + # pylint: disable=E1123 + if sys.version_info >= (3, 6): + process = subprocess.Popen( + command, + stdout=_stdout, + stderr=_stderr, + shell=shell, + universal_newlines=universal_newlines, + encoding=encoding, + errors=errors, + creationflags=creationflags, + bufsize=bufsize, # 1 = line buffered + close_fds=close_fds, + **kwargs + ) + else: + process = subprocess.Popen( + command, + stdout=_stdout, + stderr=_stderr, + shell=shell, + universal_newlines=universal_newlines, + creationflags=creationflags, + bufsize=bufsize, + close_fds=close_fds, + **kwargs + ) + + try: + if method == "poller" or live_output: + exit_code, output = _poll_process(process, timeout, encoding, errors) + else: + exit_code, output = _monitor_process(process, timeout, encoding, errors) + except KbdInterruptGetOutput as exc: + exit_code = -252 + output = "KeyboardInterrupted. Partial output\n{}".format(exc.output) + try: + kill_childs_mod(process.pid, itself=True, soft_kill=False) + except AttributeError: + pass + if stdout_to_file: + _stdout.write(output.encode(encoding, errors=errors)) + + logger.debug( + 'Command "{}" returned with exit code "{}". Command output was:'.format( + command, exit_code + ) + ) + except subprocess.CalledProcessError as exc: + exit_code = exc.returncode + try: + output = exc.output + except AttributeError: + output = "command_runner: Could not obtain output from command." + if exit_code in valid_exit_codes if valid_exit_codes is not None else [0]: + logger.debug( + 'Command "{}" returned with exit code "{}". Command output was:'.format( + command, exit_code + ) + ) + logger.error( + 'Command "{}" failed with exit code "{}". Command output was:'.format( + command, exc.returncode + ) + ) + logger.error(output) + except FileNotFoundError as exc: + logger.error('Command "{}" failed, file not found: {}'.format(command, exc)) + exit_code, output = -253, exc.__str__() + # On python 2.7, OSError is also raised when file is not found (no FileNotFoundError) + # pylint: disable=W0705 (duplicate-except) + except (OSError, IOError) as exc: + logger.error('Command "{}" failed because of OS: {}'.format(command, exc)) + exit_code, output = -253, exc.__str__() + except TimeoutExpired as exc: + message = 'Timeout {} seconds expired for command "{}" execution. Original output was: {}'.format( + timeout, command, exc.output + ) + logger.error(message) + if stdout_to_file: + _stdout.write(message.encode(encoding, errors=errors)) + exit_code, output = ( + -254, + 'Timeout of {} seconds expired for command "{}" execution. Original output was: {}'.format( + timeout, command, exc.output + ), + ) + # We need to be able to catch a broad exception + # pylint: disable=W0703 + except Exception as exc: + logger.error( + 'Command "{}" failed for unknown reasons: {}'.format(command, exc), + exc_info=True, + ) + logger.debug("Error:", exc_info=True) + exit_code, output = -255, exc.__str__() + finally: + if stdout_to_file: + _stdout.close() + if stderr_to_file: + _stderr.close() + + logger.debug(output) + + return exit_code, output + + +def deferred_command(command, defer_time=300): + # type: (str, int) -> None + """ + This is basically an ugly hack to launch commands which are detached from parent process + Especially useful to launch an auto update/deletion of a running executable after a given amount of + seconds after it finished + """ + # Use ping as a standard timer in shell since it's present on virtually *any* system + if os.name == "nt": + deferrer = "ping 127.0.0.1 -n {} > NUL & ".format(defer_time) + else: + deferrer = "ping 127.0.0.1 -c {} > /dev/null && ".format(defer_time) + + # We'll create a independent shell process that will not be attached to any stdio interface + # Our command shall be a single string since shell=True + subprocess.Popen( + deferrer + command, + shell=True, + stdin=None, + stdout=None, + stderr=None, + close_fds=True, + ) diff --git a/LibreNMS/library.py b/LibreNMS/library.py deleted file mode 100644 index e0916201db..0000000000 --- a/LibreNMS/library.py +++ /dev/null @@ -1,174 +0,0 @@ -#! /usr/bin/env python3 -# -*- coding: utf-8 -*- - -import sys -import os -import logging -import tempfile -import subprocess -import threading -import time -from logging.handlers import RotatingFileHandler - -try: - import MySQLdb -except ImportError: - try: - import pymysql - - pymysql.install_as_MySQLdb() - import MySQLdb - except ImportError as exc: - print("ERROR: missing the mysql python module please run:") - print("pip install -r requirements.txt") - print("ERROR: %s" % exc) - sys.exit(2) - -logger = logging.getLogger(__name__) - -# Logging functions ######################################################## - -FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s") - - -def logger_get_console_handler(): - try: - console_handler = logging.StreamHandler(sys.stdout) - except OSError as exc: - print("Cannot log to stdout, trying stderr. Message %s" % exc) - try: - console_handler = logging.StreamHandler(sys.stderr) - console_handler.setFormatter(FORMATTER) - return console_handler - except OSError as exc: - print("Cannot log to stderr neither. Message %s" % exc) - return False - else: - console_handler.setFormatter(FORMATTER) - return console_handler - - -def logger_get_file_handler(log_file): - err_output = None - try: - file_handler = RotatingFileHandler( - log_file, mode="a", encoding="utf-8", maxBytes=1024000, backupCount=3 - ) - except OSError as exc: - try: - print( - "Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s" - % exc - ) - err_output = str(exc) - temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log" - print("Trying temporary log file in " + temp_log_file) - file_handler = RotatingFileHandler( - temp_log_file, - mode="a", - encoding="utf-8", - maxBytes=1000000, - backupCount=1, - ) - file_handler.setFormatter(FORMATTER) - err_output += "\nUsing [%s]" % temp_log_file - return file_handler, err_output - except OSError as exc: - print( - "Cannot create temporary log file either. Will not log to file. Message: %s" - % exc - ) - return False - else: - file_handler.setFormatter(FORMATTER) - return file_handler, err_output - - -def logger_get_logger(log_file=None, temp_log_file=None, debug=False): - # If a name is given to getLogger, than modules can't log to the root logger - _logger = logging.getLogger() - if debug is True: - _logger.setLevel(logging.DEBUG) - else: - _logger.setLevel(logging.INFO) - console_handler = logger_get_console_handler() - if console_handler: - _logger.addHandler(console_handler) - if log_file is not None: - file_handler, err_output = logger_get_file_handler(log_file) - if file_handler: - _logger.addHandler(file_handler) - _logger.propagate = False - if err_output is not None: - print(err_output) - _logger.warning( - "Failed to use log file [%s], %s.", log_file, err_output - ) - if temp_log_file is not None: - if os.path.isfile(temp_log_file): - try: - os.remove(temp_log_file) - except OSError: - logger.warning("Cannot remove temp log file [%s]." % temp_log_file) - file_handler, err_output = logger_get_file_handler(temp_log_file) - if file_handler: - _logger.addHandler(file_handler) - _logger.propagate = False - if err_output is not None: - print(err_output) - _logger.warning( - "Failed to use log file [%s], %s.", log_file, err_output - ) - return _logger - - -# Generic functions ######################################################## - - -def check_for_file(file): - try: - with open(file) as f: - pass - except IOError as exc: - logger.error("Oh dear... %s does not seem readable" % file) - logger.debug("ERROR:", exc_info=True) - sys.exit(2) - - -# Config functions ######################################################### - - -def get_config_data(install_dir): - config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % install_dir] - try: - proc = subprocess.Popen( - config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE - ) - return proc.communicate()[0].decode() - except Exception as e: - print("ERROR: Could not execute: %s" % config_cmd) - print(e) - sys.exit(2) - - -# Database functions ####################################################### - - -def db_open(db_socket, db_server, db_port, db_username, db_password, db_dbname): - try: - options = dict( - host=db_server, - port=int(db_port), - user=db_username, - passwd=db_password, - db=db_dbname, - ) - - if db_socket: - options["unix_socket"] = db_socket - - return MySQLdb.connect(**options) - except Exception as dbexc: - print("ERROR: Could not connect to MySQL database!") - print("ERROR: %s" % dbexc) - sys.exit(2) diff --git a/LibreNMS/queuemanager.py b/LibreNMS/queuemanager.py index d5baf97ce5..934212ee77 100644 --- a/LibreNMS/queuemanager.py +++ b/LibreNMS/queuemanager.py @@ -1,14 +1,16 @@ +import logging import pymysql -import subprocess import threading import traceback -from logging import debug, info, error, critical, warning from queue import Empty from subprocess import CalledProcessError import LibreNMS +logger = logging.getLogger(__name__) + + class QueueManager: def __init__( self, config, lock_manager, type_desc, uses_groups=False, auto_start=True @@ -39,8 +41,8 @@ class QueueManager: self._stop_event = threading.Event() - info("Groups: {}".format(self.config.group)) - info( + logger.info("Groups: {}".format(self.config.group)) + logger.info( "{} QueueManager created: {} workers, {}s frequency".format( self.type.title(), self.get_poller_config().workers, @@ -52,9 +54,9 @@ class QueueManager: self.start() def _service_worker(self, queue_id): - debug("Worker started {}".format(threading.current_thread().getName())) + logger.debug("Worker started {}".format(threading.current_thread().getName())) while not self._stop_event.is_set(): - debug( + logger.debug( "Worker {} checking queue {} ({}) for work".format( threading.current_thread().getName(), queue_id, @@ -68,13 +70,13 @@ class QueueManager: if ( device_id is not None ): # None returned by redis after timeout when empty - debug( + logger.debug( "Worker {} ({}) got work {} ".format( threading.current_thread().getName(), queue_id, device_id ) ) with LibreNMS.TimeitContext.start() as t: - debug("Queues: {}".format(self._queues)) + logger.debug("Queues: {}".format(self._queues)) target_desc = ( "{} ({})".format(device_id if device_id else "", queue_id) if queue_id @@ -83,7 +85,7 @@ class QueueManager: self.do_work(device_id, queue_id) runtime = t.delta() - info( + logger.info( "Completed {} run for {} in {:.2f}s".format( self.type, target_desc, runtime ) @@ -92,13 +94,13 @@ class QueueManager: except Empty: pass # ignore empty queue exception from subprocess.Queue except CalledProcessError as e: - error( + logger.error( "{} poller script error! {} returned {}: {}".format( self.type.title(), e.cmd, e.returncode, e.output ) ) except Exception as e: - error("{} poller exception! {}".format(self.type.title(), e)) + logger.error("{} poller exception! {}".format(self.type.title(), e)) traceback.print_exc() def post_work(self, payload, queue_id): @@ -108,7 +110,7 @@ class QueueManager: :param queue_id: which queue to post to, 0 is the default """ self.get_queue(queue_id).put(payload) - debug( + logger.debug( "Posted work for {} to {}:{} queue size: {}".format( payload, self.type, queue_id, self.get_queue(queue_id).qsize() ) @@ -131,7 +133,7 @@ class QueueManager: thread_name = "{}_{}-{}".format(self.type.title(), group, i + 1) self.spawn_worker(thread_name, group) - debug( + logger.debug( "Started {} {} threads for group {}".format( group_workers, self.type, group ) @@ -196,7 +198,7 @@ class QueueManager: :param group: :return: """ - info("Creating queue {}".format(self.queue_name(queue_type, group))) + logger.info("Creating queue {}".format(self.queue_name(queue_type, group))) try: return LibreNMS.RedisUniqueQueue( self.queue_name(queue_type, group), @@ -213,15 +215,19 @@ class QueueManager: except ImportError: if self.config.distributed: - critical("ERROR: Redis connection required for distributed polling") - critical( + logger.critical( + "ERROR: Redis connection required for distributed polling" + ) + logger.critical( "Please install redis-py, either through your os software repository or from PyPI" ) exit(2) except Exception as e: if self.config.distributed: - critical("ERROR: Redis connection required for distributed polling") - critical("Could not connect to Redis. {}".format(e)) + logger.critical( + "ERROR: Redis connection required for distributed polling" + ) + logger.critical("Could not connect to Redis. {}".format(e)) exit(2) return LibreNMS.UniqueQueue() @@ -341,11 +347,19 @@ class BillingQueueManager(TimedQueueManager): def do_work(self, run_type, group): if run_type == "poll": - info("Polling billing") - LibreNMS.call_script("poll-billing.php") + logger.info("Polling billing") + exit_code, output = LibreNMS.call_script("poll-billing.php") + if exit_code != 0: + logger.warning( + "Error {} in Polling billing:\n{}".format(exit_code, output) + ) else: # run_type == 'calculate' - info("Calculating billing") - LibreNMS.call_script("billing-calculate.php") + logger.info("Calculating billing") + exit_code, output = LibreNMS.call_script("billing-calculate.php") + if exit_code != 0: + logger.warning( + "Error {} in Calculating billing:\n{}".format(exit_code, output) + ) class PingQueueManager(TimedQueueManager): @@ -365,13 +379,19 @@ class PingQueueManager(TimedQueueManager): for group in groups: self.post_work("", group[0]) except pymysql.err.Error as e: - critical("DB Exception ({})".format(e)) + logger.critical("DB Exception ({})".format(e)) def do_work(self, context, group): if self.lock(group, "group", timeout=self.config.ping.frequency): try: - info("Running fast ping") - LibreNMS.call_script("ping.php", ("-g", group)) + logger.info("Running fast ping") + exit_code, output = LibreNMS.call_script("ping.php", ("-g", group)) + if exit_code != 0: + logger.warning( + "Running fast ping for {} failed with error code {}: {}".format( + group, exit_code, output + ) + ) finally: self.unlock(group, "group") @@ -396,16 +416,19 @@ class ServicesQueueManager(TimedQueueManager): for device in devices: self.post_work(device[0], device[1]) except pymysql.err.Error as e: - critical("DB Exception ({})".format(e)) + logger.critical("DB Exception ({})".format(e)) def do_work(self, device_id, group): if self.lock(device_id, timeout=self.config.services.frequency): - try: - info("Checking services on device {}".format(device_id)) - LibreNMS.call_script("check-services.php", ("-h", device_id)) - except subprocess.CalledProcessError as e: - if e.returncode == 5: - info( + logger.info("Checking services on device {}".format(device_id)) + exit_code, output = LibreNMS.call_script( + "check-services.php", ("-h", device_id) + ) + if exit_code == 0: + self.unlock(device_id) + else: + if exit_code == 5: + logger.info( "Device {} is down, cannot poll service, waiting {}s for retry".format( device_id, self.config.down_retry ) @@ -413,8 +436,12 @@ class ServicesQueueManager(TimedQueueManager): self.lock( device_id, allow_relock=True, timeout=self.config.down_retry ) - else: - self.unlock(device_id) + else: + logger.warning( + "Unknown error while checking services on device {} with exit code {}: {}".format( + device_id, exit_code, output + ) + ) class AlertQueueManager(TimedQueueManager): @@ -432,14 +459,13 @@ class AlertQueueManager(TimedQueueManager): self.post_work("alerts", 0) def do_work(self, device_id, group): - try: - info("Checking alerts") - LibreNMS.call_script("alerts.php") - except subprocess.CalledProcessError as e: - if e.returncode == 1: - warning("There was an error issuing alerts: {}".format(e.output)) + logger.info("Checking alerts") + exit_code, output = LibreNMS.call_script("alerts.php") + if exit_code != 0: + if exit_code == 1: + logger.warning("There was an error issuing alerts: {}".format(output)) else: - raise + raise CalledProcessError class PollerQueueManager(QueueManager): @@ -454,13 +480,14 @@ class PollerQueueManager(QueueManager): def do_work(self, device_id, group): if self.lock(device_id, timeout=self.config.poller.frequency): - info("Polling device {}".format(device_id)) + logger.info("Polling device {}".format(device_id)) - try: - LibreNMS.call_script("poller.php", ("-h", device_id)) - except subprocess.CalledProcessError as e: - if e.returncode == 6: - warning( + exit_code, output = LibreNMS.call_script("poller.php", ("-h", device_id)) + if exit_code == 0: + self.unlock(device_id) + else: + if exit_code == 6: + logger.warning( "Polling device {} unreachable, waiting {}s for retry".format( device_id, self.config.down_retry ) @@ -470,12 +497,14 @@ class PollerQueueManager(QueueManager): device_id, allow_relock=True, timeout=self.config.down_retry ) else: - error("Polling device {} failed! {}".format(device_id, e)) + logger.error( + "Polling device {} failed with exit code {}: {}".format( + device_id, exit_code, output + ) + ) self.unlock(device_id) - else: - self.unlock(device_id) else: - debug("Tried to poll {}, but it is locked".format(device_id)) + logger.debug("Tried to poll {}, but it is locked".format(device_id)) class DiscoveryQueueManager(TimedQueueManager): @@ -497,18 +526,19 @@ class DiscoveryQueueManager(TimedQueueManager): for device in devices: self.post_work(device[0], device[1]) except pymysql.err.Error as e: - critical("DB Exception ({})".format(e)) + logger.critical("DB Exception ({})".format(e)) def do_work(self, device_id, group): if self.lock( device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency) ): - try: - info("Discovering device {}".format(device_id)) - LibreNMS.call_script("discovery.php", ("-h", device_id)) - except subprocess.CalledProcessError as e: - if e.returncode == 5: - info( + logger.info("Discovering device {}".format(device_id)) + exit_code, output = LibreNMS.call_script("discovery.php", ("-h", device_id)) + if exit_code == 0: + self.unlock(device_id) + else: + if exit_code == 5: + logger.info( "Device {} is down, cannot discover, waiting {}s for retry".format( device_id, self.config.down_retry ) @@ -517,6 +547,9 @@ class DiscoveryQueueManager(TimedQueueManager): device_id, allow_relock=True, timeout=self.config.down_retry ) else: + logger.error( + "Discovering device {} failed with exit code {}: {}".format( + device_id, exit_code, output + ) + ) self.unlock(device_id) - else: - self.unlock(device_id) diff --git a/LibreNMS/service.py b/LibreNMS/service.py index 215b4cff5a..73f02bc21d 100644 --- a/LibreNMS/service.py +++ b/LibreNMS/service.py @@ -1,10 +1,7 @@ import LibreNMS - -import json import logging import os import pymysql -import subprocess import threading import sys import time @@ -16,7 +13,6 @@ except ImportError: from datetime import timedelta from datetime import datetime -from logging import debug, info, warning, error, critical, exception from platform import python_version from time import sleep from socket import gethostname @@ -28,6 +24,8 @@ try: except ImportError: pass +logger = logging.getLogger(__name__) + class ServiceConfig: def __init__(self): @@ -99,7 +97,7 @@ class ServiceConfig: watchdog_logfile = "logs/librenms.log" def populate(self): - config = self._get_config_data() + config = LibreNMS.get_config_data(self.BASE_DIR) # populate config variables self.node_id = os.getenv("NODE_ID") @@ -232,7 +230,7 @@ class ServiceConfig: try: logging.getLogger().setLevel(self.log_level) except ValueError: - error( + logger.error( "Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format( self.log_level ) @@ -301,39 +299,7 @@ class ServiceConfig: if settings["watchdog_log"] is not None: self.watchdog_logfile = settings["watchdog_log"] except pymysql.err.Error: - warning("Unable to load poller (%s) config", self.node_id) - - def _get_config_data(self): - try: - import dotenv - - env_path = "{}/.env".format(self.BASE_DIR) - info("Attempting to load .env from '%s'", env_path) - dotenv.load_dotenv(dotenv_path=env_path, verbose=True) - - if not os.getenv("NODE_ID"): - raise ImportError(".env does not contain a valid NODE_ID setting.") - - except ImportError as e: - exception( - "Could not import .env - check that the poller user can read the file, and that composer install has been run recently" - ) - sys.exit(3) - - config_cmd = [ - "/usr/bin/env", - "php", - "{}/config_to_json.php".format(self.BASE_DIR), - "2>&1", - ] - try: - return json.loads(subprocess.check_output(config_cmd).decode()) - except subprocess.CalledProcessError as e: - error( - "ERROR: Could not load or parse configuration! {}: {}".format( - subprocess.list2cmdline(e.cmd), e.output.decode() - ) - ) + logger.warning("Unable to load poller (%s) config", self.node_id) @staticmethod def parse_group(g): @@ -347,7 +313,7 @@ class ServiceConfig: except ValueError: pass - error("Could not parse group string, defaulting to 0") + logger.error("Could not parse group string, defaulting to 0") return [0] @@ -382,7 +348,7 @@ class Service: self.config.poller.frequency, self.log_performance_stats, "performance" ) if self.config.watchdog_enabled: - info( + logger.info( "Starting watchdog timer for log file: {}".format( self.config.watchdog_logfile ) @@ -391,7 +357,7 @@ class Service: self.config.poller.frequency, self.logfile_watchdog, "watchdog" ) else: - info("Watchdog is disabled.") + logger.info("Watchdog is disabled.") self.systemd_watchdog_timer = LibreNMS.RecurringTimer( 10, self.systemd_watchdog, "systemd-watchdog" ) @@ -401,14 +367,16 @@ class Service: return time.time() - self.start_time def attach_signals(self): - info("Attaching signal handlers on thread %s", threading.current_thread().name) + logger.info( + "Attaching signal handlers on thread %s", threading.current_thread().name + ) signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully signal(SIGQUIT, self.terminate) # capture sigquit and exit gracefully signal(SIGINT, self.terminate) # capture sigint and exit gracefully signal(SIGHUP, self.reload) # capture sighup and restart gracefully if "psutil" not in sys.modules: - warning("psutil is not available, polling gap possible") + logger.warning("psutil is not available, polling gap possible") else: signal(SIGCHLD, self.reap) # capture sigchld and reap the process @@ -427,7 +395,7 @@ class Service: if status == psutil.STATUS_ZOMBIE: pid = p.pid r = os.waitpid(p.pid, os.WNOHANG) - warning( + logger.warning( 'Reaped long running job "%s" in state %s with PID %d - job returned %d', cmd, status, @@ -439,7 +407,7 @@ class Service: continue def start(self): - debug("Performing startup checks...") + logger.debug("Performing startup checks...") if self.config.single_instance: self.check_single_instance() # don't allow more than one service at a time @@ -448,7 +416,7 @@ class Service: raise RuntimeWarning("Not allowed to start Poller twice") self._started = True - debug("Starting up queue managers...") + logger.debug("Starting up queue managers...") # initialize and start the worker pools self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm) @@ -478,8 +446,8 @@ class Service: if self.config.watchdog_enabled: self.watchdog_timer.start() - info("LibreNMS Service: {} started!".format(self.config.unique_name)) - info( + logger.info("LibreNMS Service: {} started!".format(self.config.unique_name)) + logger.info( "Poller group {}. Using Python {} and {} locks and queues".format( "0 (default)" if self.config.group == [0] else self.config.group, python_version(), @@ -487,19 +455,19 @@ class Service: ) ) if self.config.update_enabled: - info( + logger.info( "Maintenance tasks will be run every {}".format( timedelta(seconds=self.config.update_frequency) ) ) else: - warning("Maintenance tasks are disabled.") + logger.warning("Maintenance tasks are disabled.") # Main dispatcher loop try: while not self.terminate_flag: if self.reload_flag: - info("Picked up reload flag, calling the reload process") + logger.info("Picked up reload flag, calling the reload process") self.restart() if self.reap_flag: @@ -509,7 +477,9 @@ class Service: master_lock = self._acquire_master() if master_lock: if not self.is_master: - info("{} is now the master dispatcher".format(self.config.name)) + logger.info( + "{} is now the master dispatcher".format(self.config.name) + ) self.is_master = True self.start_dispatch_timers() @@ -525,7 +495,7 @@ class Service: self.dispatch_immediate_discovery(device_id, group) else: if self.is_master: - info( + logger.info( "{} is no longer the master dispatcher".format( self.config.name ) @@ -536,7 +506,7 @@ class Service: except KeyboardInterrupt: pass - info("Dispatch loop terminated") + logger.info("Dispatch loop terminated") self.shutdown() def _acquire_master(self): @@ -565,7 +535,7 @@ class Service: if elapsed > ( self.config.poller.frequency - self.config.master_resolution ): - debug( + logger.debug( "Dispatching polling for device {}, time since last poll {:.2f}s".format( device_id, elapsed ) @@ -602,7 +572,7 @@ class Service: except pymysql.err.Error: self.db_failures += 1 if self.db_failures > self.config.max_db_failures: - warning( + logger.warning( "Too many DB failures ({}), attempting to release master".format( self.db_failures ) @@ -622,23 +592,22 @@ class Service: wait = 5 max_runtime = 86100 max_tries = int(max_runtime / wait) - info("Waiting for schema lock") + logger.info("Waiting for schema lock") while not self._lm.lock("schema-update", self.config.unique_name, max_runtime): attempt += 1 if attempt >= max_tries: # don't get stuck indefinitely - warning("Reached max wait for other pollers to update, updating now") + logger.warning( + "Reached max wait for other pollers to update, updating now" + ) break sleep(wait) - info("Running maintenance tasks") - try: - output = LibreNMS.call_script("daily.sh") - info("Maintenance tasks complete\n{}".format(output)) - except subprocess.CalledProcessError as e: - error( - "Error in daily.sh:\n" - + (e.output.decode() if e.output is not None else "No output") - ) + logger.info("Running maintenance tasks") + exit_code, output = LibreNMS.call_script("daily.sh") + if exit_code == 0: + logger.info("Maintenance tasks complete\n{}".format(output)) + else: + logger.error("Error {} in daily.sh:\n{}".format(exit_code, output)) self._lm.unlock("schema-update", self.config.unique_name) @@ -665,15 +634,19 @@ class Service: ) except ImportError: if self.config.distributed: - critical("ERROR: Redis connection required for distributed polling") - critical( + logger.critical( + "ERROR: Redis connection required for distributed polling" + ) + logger.critical( "Please install redis-py, either through your os software repository or from PyPI" ) self.exit(2) except Exception as e: if self.config.distributed: - critical("ERROR: Redis connection required for distributed polling") - critical("Could not connect to Redis. {}".format(e)) + logger.critical( + "ERROR: Redis connection required for distributed polling" + ) + logger.critical("Could not connect to Redis. {}".format(e)) self.exit(2) return LibreNMS.ThreadingLock() @@ -684,14 +657,16 @@ class Service: Has the effect of reloading the python files from disk. """ if sys.version_info < (3, 4, 0): - warning("Skipping restart as running under an incompatible interpreter") - warning("Please restart manually") + logger.warning( + "Skipping restart as running under an incompatible interpreter" + ) + logger.warning("Please restart manually") return - info("Restarting service... ") + logger.info("Restarting service... ") if "psutil" not in sys.modules: - warning("psutil is not available, polling gap possible") + logger.warning("psutil is not available, polling gap possible") self._stop_managers_and_wait() else: self._stop_managers() @@ -715,7 +690,9 @@ class Service: :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ - info("Received signal on thread %s, handling", threading.current_thread().name) + logger.info( + "Received signal on thread %s, handling", threading.current_thread().name + ) self.reload_flag = True def terminate(self, signalnum=None, flag=None): @@ -724,7 +701,9 @@ class Service: :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ - info("Received signal on thread %s, handling", threading.current_thread().name) + logger.info( + "Received signal on thread %s, handling", threading.current_thread().name + ) self.terminate_flag = True def shutdown(self, signalnum=None, flag=None): @@ -733,7 +712,7 @@ class Service: :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ - info("Shutting down, waiting for running jobs to complete...") + logger.info("Shutting down, waiting for running jobs to complete...") self.stop_dispatch_timers() self._release_master() @@ -747,7 +726,9 @@ class Service: self._stop_managers_and_wait() # try to release master lock - info("Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name) + logger.info( + "Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name + ) self.exit(0) def start_dispatch_timers(self): @@ -802,11 +783,11 @@ class Service: try: fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: - warning("Another instance is already running, quitting.") + logger.warning("Another instance is already running, quitting.") self.exit(2) def log_performance_stats(self): - info("Counting up time spent polling") + logger.info("Counting up time spent polling") try: # Report on the poller instance as a whole @@ -851,8 +832,9 @@ class Service: ) ) except pymysql.err.Error: - exception( - "Unable to log performance statistics - is the database still online?" + logger.critical( + "Unable to log performance statistics - is the database still online?", + exc_info=True, ) def systemd_watchdog(self): @@ -867,18 +849,19 @@ class Service: self.config.watchdog_logfile ) except FileNotFoundError as e: - error("Log file not found! {}".format(e)) + logger.error("Log file not found! {}".format(e)) return if logfile_mdiff > self.config.poller.frequency: - critical( + logger.critical( "BARK! Log file older than {}s, restarting service!".format( self.config.poller.frequency - ) + ), + exc_info=True, ) self.restart() else: - info("Log file updated {}s ago".format(int(logfile_mdiff))) + logger.info("Log file updated {}s ago".format(int(logfile_mdiff))) def exit(self, code=0): sys.stdout.flush() diff --git a/LibreNMS/wrapper.py b/LibreNMS/wrapper.py new file mode 100644 index 0000000000..8ca8321bda --- /dev/null +++ b/LibreNMS/wrapper.py @@ -0,0 +1,669 @@ +#! /usr/bin/env python3 +""" + wrapper A small tool which wraps services, discovery and poller php scripts + in order to run them as threads with Queue and workers + + Authors: Orsiris de Jong + Neil Lathwood + Job Snijders + + Distributed poller code (c) 2015, GPLv3, Daniel Preussker + All code parts that belong to Daniel are enclosed in EOC comments + + Date: Sep 2021 + + Usage: This program accepts three command line arguments + - the number of threads (defaults to 1 for discovery / service, and 16 for poller) + - the wrapper type (service, discovery or poller) + - optional debug boolean + + + Ubuntu Linux: apt-get install python-mysqldb + FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean + RHEL 7: yum install MySQL-python + RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient + + Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 / AlmaLinux 8.4 + + License: This program is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program. If not, see https://www.gnu.org/licenses/. + + LICENSE.txt contains a copy of the full GPLv3 licensing conditions. +""" + +import logging +import os +import queue +import sys +import threading +import time +import uuid +from argparse import ArgumentParser + +import LibreNMS +from LibreNMS.command_runner import command_runner + + +logger = logging.getLogger(__name__) + +# Timeout in seconds for any poller / service / discovery action per device +# Should be higher than stepping which defaults to 300 +PER_DEVICE_TIMEOUT = 900 + +# 5 = no new discovered devices, 6 = unreachable device +VALID_EXIT_CODES = [0, 5, 6] + + +DISTRIBUTED_POLLING = False # Is overriden by config.php +REAL_DURATION = 0 +DISCOVERED_DEVICES_COUNT = 0 +PER_DEVICE_DURATION = {} +ERRORS = 0 + +MEMC = None +IS_NODE = None +STEPPING = None +MASTER_TAG = None +NODES_TAG = None +TIME_TAG = "" + +""" +Per wrapper type configuration +All time related variables are in seconds +""" +wrappers = { + "service": { + "executable": "check-services.php", + "table_name": "services", + "memc_touch_time": 10, + "stepping": 300, + "nodes_stepping": 300, + "total_exec_time": 300, + }, + "discovery": { + "executable": "discovery.php", + "table_name": "devices", + "memc_touch_time": 30, + "stepping": 300, + "nodes_stepping": 3600, + "total_exec_time": 21600, + }, + "poller": { + "executable": "poller.php", + "table_name": "devices", + "memc_touch_time": 10, + "stepping": 300, + "nodes_stepping": 300, + "total_exec_time": 300, + }, +} + +""" + Threading helper functions +""" + + +# << None + """ + Actual code that runs various php scripts, in single node mode or distributed poller mode + """ + + global MEMC + global IS_NODE + global DISTRIBUTED_POLLING + global MASTER_TAG + global NODES_TAG + global TIME_TAG + global STEPPING + + # Setup wrapper dependent variables + STEPPING = wrappers[wrapper_type]["stepping"] + if wrapper_type == "poller": + if "rrd" in config and "step" in config["rrd"]: + STEPPING = config["rrd"]["step"] + TIME_TAG = "." + str(get_time_tag(STEPPING)) + + MASTER_TAG = "{}.master{}".format(wrapper_type, TIME_TAG) + NODES_TAG = "{}.nodes{}".format(wrapper_type, TIME_TAG) + + # << amount_of_devices: + amount_of_workers = amount_of_devices + + logger.info( + "starting the {} check at {} with {} threads for {} devices".format( + wrapper_type, + time.strftime("%Y-%m-%d %H:%M:%S"), + amount_of_workers, + amount_of_devices, + ) + ) + + for device_id in devices_list: + poll_queue.put(device_id) + + for _ in range(amount_of_workers): + worker = threading.Thread( + target=poll_worker, + kwargs={ + "poll_queue": poll_queue, + "print_queue": print_queue, + "config": config, + "log_dir": log_dir, + "wrapper_type": wrapper_type, + "debug": _debug, + }, + ) + worker.setDaemon(True) + worker.start() + + pworker = threading.Thread( + target=print_worker, + kwargs={"print_queue": print_queue, "wrapper_type": wrapper_type}, + ) + pworker.setDaemon(True) + pworker.start() + + try: + poll_queue.join() + print_queue.join() + except (KeyboardInterrupt, SystemExit): + raise + + total_time = int(time.time() - s_time) + + end_msg = "{}-wrapper checked {} devices in {} seconds with {} workers with {} errors".format( + wrapper_type, DISCOVERED_DEVICES_COUNT, total_time, amount_of_workers, ERRORS + ) + if ERRORS == 0: + logger.info(end_msg) + else: + logger.error(end_msg) + + # << 0: + try: + time.sleep(1) + nodes = MEMC.get(NODES_TAG) + except: + pass + logger.info("Clearing Locks for {}".format(NODES_TAG)) + x = minlocks + while x <= maxlocks: + MEMC.delete("{}.device.{}".format(wrapper_type, x)) + x = x + 1 + logger.info("{} Locks Cleared".format(x)) + logger.info("Clearing Nodes") + MEMC.delete(MASTER_TAG) + MEMC.delete(NODES_TAG) + else: + MEMC.decr(NODES_TAG) + logger.info("Finished {}.".format(time.strftime("%Y-%m-%d %H:%M:%S"))) + # EOC + + # Update poller statistics + if wrapper_type == "poller": + query = "UPDATE pollers SET last_polled=NOW(), devices='{}', time_taken='{}' WHERE poller_name='{}'".format( + DISCOVERED_DEVICES_COUNT, total_time, config["distributed_poller_name"] + ) + cursor = db_connection.query(query) + if cursor.rowcount < 1: + query = "INSERT INTO pollers SET poller_name='{}', last_polled=NOW(), devices='{}', time_taken='{}'".format( + config["distributed_poller_name"], DISCOVERED_DEVICES_COUNT, total_time + ) + db_connection.query(query) + + db_connection.close() + + if total_time > wrappers[wrapper_type]["total_exec_time"]: + logger.warning( + "the process took more than {} seconds to finish, you need faster hardware or more threads".format( + wrappers[wrapper_type]["total_exec_time"] + ) + ) + logger.warning( + "in sequential style service checks the elapsed time would have been: {} seconds".format( + REAL_DURATION + ) + ) + show_stopper = False + for device in PER_DEVICE_DURATION: + if PER_DEVICE_DURATION[device] > wrappers[wrapper_type]["nodes_stepping"]: + logger.warning( + "device {} is taking too long: {} seconds".format( + device, PER_DEVICE_DURATION[device] + ) + ) + show_stopper = True + if show_stopper: + logger.error( + "Some devices are taking more than {} seconds, the script cannot recommend you what to do.".format( + wrappers[wrapper_type]["nodes_stepping"] + ) + ) + else: + recommend = int(total_time / STEPPING * amount_of_workers + 1) + logger.warning( + "Consider setting a minimum of {} threads. (This does not constitute professional advice!)".format( + recommend + ) + ) + sys.exit(2) + + +if __name__ == "__main__": + parser = ArgumentParser( + prog="wrapper.py", + usage="usage: %(prog)s [options] \n" + "wrapper_type = 'service', 'poller' or 'disccovery'" + "workers defaults to 1 for service and discovery, and 16 for poller " + "(Do not set too high, or you will get an OOM)", + description="Spawn multiple librenms php processes in parallel.", + ) + parser.add_argument( + "-d", + "--debug", + action="store_true", + default=False, + help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", + ) + + parser.add_argument( + dest="wrapper", + default=None, + help="Execute wrapper for 'service', 'poller' or 'discovery'", + ) + parser.add_argument( + dest="threads", action="store_true", default=None, help="Number of workers" + ) + + args = parser.parse_args() + + debug = args.debug + wrapper_type = args.wrapper + amount_of_workers = args.threads + + if wrapper_type not in ["service", "discovery", "poller"]: + parser.error("Invalid wrapper type '{}'".format(wrapper_type)) + sys.exit(4) + + config = LibreNMS.get_config_data( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + ) + log_dir = config["log_dir"] + log_file = os.path.join(log_dir, wrapper_type + ".log") + logger = LibreNMS.logger_get_logger(log_file, debug=debug) + + try: + amount_of_workers = int(amount_of_workers) + except (IndexError, ValueError, TypeError): + amount_of_workers = ( + 16 if wrapper_type == "poller" else 1 + ) # Defaults to 1 for service/discovery, 16 for poller + logger.warning( + "Bogus number of workers given. Using default number ({}) of workers.".format( + amount_of_workers + ) + ) + + wrapper(wrapper_type, amount_of_workers, config, log_dir, _debug=debug) diff --git a/discovery-wrapper.py b/discovery-wrapper.py index 1ae8c2d725..bb9920f9e9 100755 --- a/discovery-wrapper.py +++ b/discovery-wrapper.py @@ -1,436 +1,63 @@ #! /usr/bin/env python3 """ - discovery-wrapper A small tool which wraps around discovery and tries to - guide the discovery process with a more modern approach with a - Queue and workers. - - Based on the original version of poller-wrapper.py by Job Snijders - - Author: Neil Lathwood - Orsiris de Jong - Date: Oct 2019 - - Usage: This program accepts one command line argument: the number of threads - that should run simultaneously. If no argument is given it will assume - a default of 1 thread. - - Ubuntu Linux: apt-get install python-mysqldb - FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean - RHEL 7: yum install MySQL-python - RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient - - Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 - - License: This program is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General - Public License for more details. - - You should have received a copy of the GNU General Public License along - with this program. If not, see https://www.gnu.org/licenses/. - - LICENSE.txt contains a copy of the full GPLv3 licensing conditions. +This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups """ -import LibreNMS.library as LNMS +import os +import sys +import logging +from argparse import ArgumentParser + +import LibreNMS +import LibreNMS.wrapper as wrapper + +WRAPPER_TYPE = "discovery" +DEFAULT_WORKERS = 1 + +""" + Take the amount of threads we want to run in parallel from the commandline + if None are given or the argument was garbage, fall back to default +""" +usage = ( + "usage: %(prog)s [options] (Default: {}" + "(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS) +) +description = "Spawn multiple discovery.php processes in parallel." +parser = ArgumentParser(usage=usage, description=description) +parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS) +parser.add_argument( + "-d", + "--debug", + dest="debug", + action="store_true", + default=False, + help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", +) +args = parser.parse_args() + +config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__))) +if not config: + logger = logging.getLogger(__name__) + logger.critical("Could not run {} wrapper. Missing config".format(WRAPPER_TYPE)) + sys.exit(1) +log_dir = config["log_dir"] +log_file = os.path.join(log_dir, WRAPPER_TYPE + "_wrapper.log") +logger = LibreNMS.logger_get_logger(log_file, debug=args.debug) try: - - import json - import os - import queue - import subprocess - import sys - import threading - import time - from optparse import OptionParser - -except ImportError as exc: - print("ERROR: missing one or more of the following python modules:") - print("threading, queue, sys, subprocess, time, os, json") - print("ERROR: %s" % exc) - sys.exit(2) - -APP_NAME = "discovery_wrapper" -LOG_FILE = "logs/" + APP_NAME + ".log" -_DEBUG = False -distdisco = False -real_duration = 0 -discovered_devices = 0 - -# (c) 2015, GPLv3, Daniel Preussker << << <<> %s/discover_device_%s.log" % (log_dir, device_id) - if debug - else ">> /dev/null" - ) - command = "/usr/bin/env php %s -h %s %s 2>&1" % ( - discovery_path, - device_id, - output, - ) - # TODO: Replace with command_runner - subprocess.check_call(command, shell=True) - - elapsed_time = int(time.time() - start_time) - print_queue.put( - [threading.current_thread().name, device_id, elapsed_time] - ) - except (KeyboardInterrupt, SystemExit): - raise - except: - pass - poll_queue.task_done() - - -if __name__ == "__main__": - logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG) - - install_dir = os.path.dirname(os.path.realpath(__file__)) - LNMS.check_for_file(install_dir + "/.env") - config = json.loads(LNMS.get_config_data(install_dir)) - - discovery_path = config["install_dir"] + "/discovery.php" - log_dir = config["log_dir"] - - # (c) 2015, GPLv3, Daniel Preussker << << << << 0: - try: - time.sleep(1) - nodes = memc.get("discovery.nodes") - except: - pass - print("Clearing Locks") - x = minlocks - while x <= maxlocks: - memc.delete("discovery.device." + str(x)) - x = x + 1 - print("%s Locks Cleared" % x) - print("Clearing Nodes") - memc.delete("discovery.master") - memc.delete("discovery.nodes") - else: - memc.decr("discovery.nodes") - print("Finished %s." % time.time()) - # EOC6 - - show_stopper = False - - if total_time > 21600: - print( - "WARNING: the process took more than 6 hours to finish, you need faster hardware or more threads" - ) - print( - "INFO: in sequential style discovery the elapsed time would have been: %s seconds" - % real_duration - ) - for device in per_device_duration: - if per_device_duration[device] > 3600: - print( - "WARNING: device %s is taking too long: %s seconds" - % (device, per_device_duration[device]) - ) - show_stopper = True - if show_stopper: - print( - "ERROR: Some devices are taking more than 3600 seconds, the script cannot recommend you what to do." - ) - else: - recommend = int(total_time / 300.0 * amount_of_workers + 1) - print( - "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" - % recommend - ) - - sys.exit(2) +wrapper.wrapper( + WRAPPER_TYPE, + amount_of_workers=amount_of_workers, + config=config, + log_dir=log_dir, + _debug=args.debug, +) diff --git a/librenms-service.py b/librenms-service.py index 0be34c5d60..83f52dcb7c 100755 --- a/librenms-service.py +++ b/librenms-service.py @@ -43,9 +43,10 @@ if __name__ == "__main__": if args.verbose: logging.getLogger().setLevel(logging.INFO) - - if args.debug: + elif args.debug: logging.getLogger().setLevel(logging.DEBUG) + else: + logging.getLogger().setLevel(logging.WARNING) info("Configuring LibreNMS service") try: diff --git a/poller-wrapper.py b/poller-wrapper.py index 78e46c75fc..85f3a1f240 100755 --- a/poller-wrapper.py +++ b/poller-wrapper.py @@ -1,468 +1,63 @@ #! /usr/bin/env python3 """ - poller-wrapper A small tool which wraps around the poller and tries to - guide the polling process with a more modern approach with a - Queue and workers - - Authors: Job Snijders - Orsiris de Jong - Date: Oct 2019 - - Usage: This program accepts one command line argument: the number of threads - that should run simultaneously. If no argument is given it will assume - a default of 16 threads. - - Ubuntu Linux: apt-get install python-mysqldb - FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean - RHEL 7: yum install MySQL-python - RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient - - Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8.0 - - License: To the extent possible under law, Job Snijders has waived all - copyright and related or neighboring rights to this script. - This script has been put into the Public Domain. This work is - published from: The Netherlands. +This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups """ -import LibreNMS.library as LNMS +import os +import sys +import logging +from argparse import ArgumentParser + +import LibreNMS +import LibreNMS.wrapper as wrapper + +WRAPPER_TYPE = "poller" +DEFAULT_WORKERS = 16 + +""" + Take the amount of threads we want to run in parallel from the commandline + if None are given or the argument was garbage, fall back to default +""" +usage = ( + "usage: %(prog)s [options] (Default: {}" + "(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS) +) +description = "Spawn multiple poller.php processes in parallel." +parser = ArgumentParser(usage=usage, description=description) +parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS) +parser.add_argument( + "-d", + "--debug", + dest="debug", + action="store_true", + default=False, + help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", +) +args = parser.parse_args() + +config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__))) +if not config: + logger = logging.getLogger(__name__) + logger.critical("Could not run {} wrapper. Missing config".format(WRAPPER_TYPE)) + sys.exit(1) +log_dir = config["log_dir"] +log_file = os.path.join(log_dir, WRAPPER_TYPE + "_wrapper.log") +logger = LibreNMS.logger_get_logger(log_file, debug=args.debug) try: - - import json - import os - import queue - import subprocess - import sys - import threading - import time - from optparse import OptionParser - -except ImportError as exc: - print("ERROR: missing one or more of the following python modules:") - print("threading, queue, sys, subprocess, time, os, json") - print("ERROR: %s" % exc) - sys.exit(2) - - -APP_NAME = "poller_wrapper" -LOG_FILE = "logs/" + APP_NAME + ".log" -_DEBUG = False -distpoll = False -real_duration = 0 -polled_devices = 0 - -""" - Threading helper functions -""" -# (c) 2015, GPLv3, Daniel Preussker << << <<> %s/poll_device_%s.log" % (log_dir, device_id) - if debug - else ">> /dev/null" - ) - command = "/usr/bin/env php %s -h %s %s 2>&1" % ( - poller_path, - device_id, - output, - ) - # TODO: replace with command_runner - subprocess.check_call(command, shell=True) - - elapsed_time = int(time.time() - start_time) - print_queue.put( - [threading.current_thread().name, device_id, elapsed_time] - ) - except (KeyboardInterrupt, SystemExit): - raise - except: - pass - poll_queue.task_done() - - -if __name__ == "__main__": - logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG) - - install_dir = os.path.dirname(os.path.realpath(__file__)) - LNMS.check_for_file(install_dir + "/.env") - config = json.loads(LNMS.get_config_data(install_dir)) - - poller_path = config["install_dir"] + "/poller.php" - log_dir = config["log_dir"] - - if "rrd" in config and "step" in config["rrd"]: - step = config["rrd"]["step"] - else: - step = 300 - - # (c) 2015, GPLv3, Daniel Preussker << << << << 0: - try: - time.sleep(1) - nodes = memc.get(nodes_tag) - except: - pass - print("Clearing Locks for %s" % time_tag) - x = minlocks - while x <= maxlocks: - res = memc.delete("poller.device.%s.%s" % (x, time_tag)) - x += 1 - print("%s Locks Cleared" % x) - print("Clearing Nodes") - memc.delete(master_tag) - memc.delete(nodes_tag) - else: - memc.decr(nodes_tag) - print("Finished %.3fs after interval start." % (time.time() - int(time_tag))) - # EOC6 - - show_stopper = False - - db = LNMS.db_open( - config["db_socket"], - config["db_host"], - config["db_port"], - config["db_user"], - config["db_pass"], - config["db_name"], - ) - cursor = db.cursor() - query = ( - "update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'" - % (polled_devices, total_time, config["distributed_poller_name"]) - ) - response = cursor.execute(query) - if response == 1: - db.commit() - else: - query = ( - "insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'" - % (config["distributed_poller_name"], polled_devices, total_time) - ) - cursor.execute(query) - db.commit() - db.close() - - if total_time > step: - print( - "WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads" - % step - ) - print( - "INFO: in sequential style polling the elapsed time would have been: %s seconds" - % real_duration - ) - for device in per_device_duration: - if per_device_duration[device] > step: - print( - "WARNING: device %s is taking too long: %s seconds" - % (device, per_device_duration[device]) - ) - show_stopper = True - if show_stopper: - print( - "ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do." - % step - ) - else: - recommend = int(total_time / step * amount_of_workers + 1) - print( - "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" - % recommend - ) - - sys.exit(2) +wrapper.wrapper( + WRAPPER_TYPE, + amount_of_workers=amount_of_workers, + config=config, + log_dir=log_dir, + _debug=args.debug, +) diff --git a/requirements.txt b/requirements.txt index b5d86ebea7..e7bc9a58db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ python-dotenv redis>=3.0 setuptools psutil +command_runner>=1.2.1 diff --git a/services-wrapper.py b/services-wrapper.py index 5a4f1c08da..cdeb04a7ee 100755 --- a/services-wrapper.py +++ b/services-wrapper.py @@ -1,440 +1,63 @@ #! /usr/bin/env python3 """ - services-wrapper A small tool which wraps around check-services.php and tries to - guide the services process with a more modern approach with a - Queue and workers. - - Based on the original version of poller-wrapper.py by Job Snijders - - Author: Neil Lathwood - Orsiris de Jong - Date: Oct 2019 - - Usage: This program accepts one command line argument: the number of threads - that should run simultaneously. If no argument is given it will assume - a default of 1 thread. - - Ubuntu Linux: apt-get install python-mysqldb - FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean - RHEL 7: yum install MySQL-python - RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient - - Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 - - License: This program is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General - Public License for more details. - - You should have received a copy of the GNU General Public License along - with this program. If not, see https://www.gnu.org/licenses/. - - LICENSE.txt contains a copy of the full GPLv3 licensing conditions. +This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups """ -import LibreNMS.library as LNMS +import os +import sys +import logging +from argparse import ArgumentParser + +import LibreNMS +import LibreNMS.wrapper as wrapper + +WRAPPER_TYPE = "service" +DEFAULT_WORKERS = 1 + +""" + Take the amount of threads we want to run in parallel from the commandline + if None are given or the argument was garbage, fall back to default +""" +usage = ( + "usage: %(prog)s [options] (Default: {}" + "(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS) +) +description = "Spawn multiple check-services.php processes in parallel." +parser = ArgumentParser(usage=usage, description=description) +parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS) +parser.add_argument( + "-d", + "--debug", + dest="debug", + action="store_true", + default=False, + help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", +) +args = parser.parse_args() + +config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__))) +if not config: + logger = logging.getLogger(__name__) + logger.critical("Could not run {} wrapper. Missing config".format(WRAPPER_TYPE)) + sys.exit(1) +log_dir = config["log_dir"] +log_file = os.path.join(log_dir, WRAPPER_TYPE + "_wrapper.log") +logger = LibreNMS.logger_get_logger(log_file, debug=args.debug) try: - - import json - import os - import queue - import subprocess - import sys - import threading - import time - from optparse import OptionParser - -except ImportError as exc: - print("ERROR: missing one or more of the following python modules:") - print("threading, queue, sys, subprocess, time, os, json") - print("ERROR: %s" % exc) - sys.exit(2) - - -APP_NAME = "services_wrapper" -LOG_FILE = "logs/" + APP_NAME + ".log" -_DEBUG = False -servicedisco = False -real_duration = 0 -service_devices = 0 - -""" - Threading helper functions -""" -# (c) 2015, GPLv3, Daniel Preussker << << <<> %s/services_device_%s.log" % (log_dir, device_id) - if debug - else ">> /dev/null" - ) - # TODO replace with command_runner - command = "/usr/bin/env php %s -h %s %s 2>&1" % ( - service_path, - device_id, - output, - ) - subprocess.check_call(command, shell=True) - - elapsed_time = int(time.time() - start_time) - print_queue.put( - [threading.current_thread().name, device_id, elapsed_time] - ) - except (KeyboardInterrupt, SystemExit): - raise - except: - pass - poll_queue.task_done() - - -if __name__ == "__main__": - logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG) - - install_dir = os.path.dirname(os.path.realpath(__file__)) - LNMS.check_for_file(install_dir + "/.env") - config = json.loads(LNMS.get_config_data(install_dir)) - - service_path = config["install_dir"] + "/check-services.php" - log_dir = config["log_dir"] - - # (c) 2015, GPLv3, Daniel Preussker << << << << 0: - try: - time.sleep(1) - nodes = memc.get("service.nodes") - except: - pass - print("Clearing Locks") - x = minlocks - while x <= maxlocks: - memc.delete("service.device." + str(x)) - x = x + 1 - print("%s Locks Cleared" % x) - print("Clearing Nodes") - memc.delete("service.master") - memc.delete("service.nodes") - else: - memc.decr("service.nodes") - print("Finished %s." % time.time()) - # EOC6 - - show_stopper = False - - if total_time > 300: - print( - "WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads" - ) - print( - "INFO: in sequential style service checks the elapsed time would have been: %s seconds" - % real_duration - ) - for device in per_device_duration: - if per_device_duration[device] > 300: - print( - "WARNING: device %s is taking too long: %s seconds" - % (device, per_device_duration[device]) - ) - show_stopper = True - if show_stopper: - print( - "ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do." - ) - else: - recommend = int(total_time / 300.0 * amount_of_workers + 1) - print( - "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" - % recommend - ) - - sys.exit(2) +wrapper.wrapper( + WRAPPER_TYPE, + amount_of_workers=amount_of_workers, + config=config, + log_dir=log_dir, + _debug=args.debug, +)