mirror of
https://github.com/librenms/librenms.git
synced 2024-09-21 10:28:13 +00:00
723600751c
* Dispatcher option to log output -o --log-output Log output into various files in the log directory wire up -d option to be passed into scheduled commands Caution, can fill your disk. * style fixes * more silly style fixes (and a typo accidentally added) * final lint maybe? * more lint... * believe it or not, more lint
923 lines
35 KiB
Python
923 lines
35 KiB
Python
import logging
|
|
import os
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
import pymysql # pylint: disable=import-error
|
|
|
|
import LibreNMS
|
|
from LibreNMS.config import DBConfig
|
|
|
|
try:
|
|
import psutil
|
|
except ImportError:
|
|
pass
|
|
|
|
from datetime import timedelta
|
|
from datetime import datetime
|
|
from platform import python_version
|
|
from time import sleep
|
|
from socket import gethostname
|
|
from signal import signal, SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGCHLD
|
|
from uuid import uuid1
|
|
|
|
try:
|
|
from systemd.daemon import notify
|
|
except ImportError:
|
|
pass
|
|
|
|
try:
|
|
from redis.exceptions import ConnectionError as RedisConnectionError
|
|
except ImportError:
|
|
|
|
class RedisConnectionError(Exception):
|
|
pass
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ServiceConfig(DBConfig):
|
|
def __init__(self):
|
|
"""
|
|
Stores all of the configuration variables for the LibreNMS service in a common object
|
|
Starts with defaults, but can be populated with variables from config.php by calling populate()
|
|
"""
|
|
self._uuid = str(uuid1())
|
|
self.set_name(gethostname())
|
|
|
|
def set_name(self, name):
|
|
if name:
|
|
self.name = name.strip()
|
|
self.unique_name = "{}-{}".format(self.name, self._uuid)
|
|
|
|
class PollerConfig:
|
|
def __init__(self, workers, frequency, calculate=None):
|
|
self.enabled = True
|
|
self.workers = workers
|
|
self.frequency = frequency
|
|
self.calculate = calculate
|
|
|
|
# config variables with defaults
|
|
BASE_DIR = os.path.abspath(
|
|
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)
|
|
)
|
|
|
|
node_id = None
|
|
name = None
|
|
unique_name = None
|
|
single_instance = True
|
|
distributed = False
|
|
group = 0
|
|
|
|
debug = False
|
|
log_level = 20
|
|
max_db_failures = 5
|
|
|
|
alerting = PollerConfig(1, 60)
|
|
poller = PollerConfig(24, 300)
|
|
services = PollerConfig(8, 300)
|
|
discovery = PollerConfig(16, 21600)
|
|
billing = PollerConfig(2, 300, 60)
|
|
ping = PollerConfig(1, 60)
|
|
down_retry = 60
|
|
update_enabled = True
|
|
update_frequency = 86400
|
|
|
|
master_resolution = 1
|
|
master_timeout = 10
|
|
|
|
redis_host = "localhost"
|
|
redis_port = 6379
|
|
redis_db = 0
|
|
redis_user = None
|
|
redis_pass = None
|
|
redis_socket = None
|
|
redis_sentinel = None
|
|
redis_sentinel_user = None
|
|
redis_sentinel_pass = None
|
|
redis_sentinel_service = None
|
|
redis_timeout = 60
|
|
|
|
log_output = False
|
|
logdir = "logs"
|
|
|
|
watchdog_enabled = False
|
|
watchdog_logfile = "logs/librenms.log"
|
|
|
|
def populate(self):
|
|
config = LibreNMS.get_config_data(self.BASE_DIR)
|
|
|
|
# populate config variables
|
|
self.node_id = os.getenv("NODE_ID")
|
|
self.set_name(config.get("distributed_poller_name", None))
|
|
self.distributed = config.get("distributed_poller", ServiceConfig.distributed)
|
|
self.group = ServiceConfig.parse_group(
|
|
config.get("distributed_poller_group", ServiceConfig.group)
|
|
)
|
|
|
|
# backward compatible options
|
|
self.master_timeout = config.get(
|
|
"service_master_timeout", ServiceConfig.master_timeout
|
|
)
|
|
self.poller.workers = config.get(
|
|
"poller_service_workers", ServiceConfig.poller.workers
|
|
)
|
|
self.poller.frequency = config.get(
|
|
"poller_service_poll_frequency", ServiceConfig.poller.frequency
|
|
)
|
|
self.discovery.frequency = config.get(
|
|
"poller_service_discover_frequency", ServiceConfig.discovery.frequency
|
|
)
|
|
self.down_retry = config.get(
|
|
"poller_service_down_retry", ServiceConfig.down_retry
|
|
)
|
|
self.log_level = config.get("poller_service_loglevel", ServiceConfig.log_level)
|
|
|
|
# new options
|
|
self.poller.enabled = config.get("service_poller_enabled", True) # unused
|
|
self.poller.workers = config.get(
|
|
"service_poller_workers", ServiceConfig.poller.workers
|
|
)
|
|
self.poller.frequency = config.get(
|
|
"service_poller_frequency", ServiceConfig.poller.frequency
|
|
)
|
|
self.discovery.enabled = config.get("service_discovery_enabled", True) # unused
|
|
self.discovery.workers = config.get(
|
|
"service_discovery_workers", ServiceConfig.discovery.workers
|
|
)
|
|
self.discovery.frequency = config.get(
|
|
"service_discovery_frequency", ServiceConfig.discovery.frequency
|
|
)
|
|
self.services.enabled = config.get("service_services_enabled", True)
|
|
self.services.workers = config.get(
|
|
"service_services_workers", ServiceConfig.services.workers
|
|
)
|
|
self.services.frequency = config.get(
|
|
"service_services_frequency", ServiceConfig.services.frequency
|
|
)
|
|
self.billing.enabled = config.get("service_billing_enabled", True)
|
|
self.billing.frequency = config.get(
|
|
"service_billing_frequency", ServiceConfig.billing.frequency
|
|
)
|
|
self.billing.calculate = config.get(
|
|
"service_billing_calculate_frequency", ServiceConfig.billing.calculate
|
|
)
|
|
self.alerting.enabled = config.get("service_alerting_enabled", True)
|
|
self.alerting.frequency = config.get(
|
|
"service_alerting_frequency", ServiceConfig.alerting.frequency
|
|
)
|
|
self.ping.enabled = config.get("service_ping_enabled", False)
|
|
self.ping.frequency = config.get("ping_rrd_step", ServiceConfig.ping.frequency)
|
|
self.down_retry = config.get(
|
|
"service_poller_down_retry", ServiceConfig.down_retry
|
|
)
|
|
self.log_level = config.get("service_loglevel", ServiceConfig.log_level)
|
|
self.update_enabled = config.get(
|
|
"service_update_enabled", ServiceConfig.update_enabled
|
|
)
|
|
self.update_frequency = config.get(
|
|
"service_update_frequency", ServiceConfig.update_frequency
|
|
)
|
|
|
|
self.redis_host = os.getenv(
|
|
"REDIS_HOST", config.get("redis_host", ServiceConfig.redis_host)
|
|
)
|
|
self.redis_db = os.getenv(
|
|
"REDIS_DB", config.get("redis_db", ServiceConfig.redis_db)
|
|
)
|
|
self.redis_user = os.getenv(
|
|
"REDIS_USERNAME", config.get("redis_user", ServiceConfig.redis_user)
|
|
)
|
|
self.redis_pass = os.getenv(
|
|
"REDIS_PASSWORD", config.get("redis_pass", ServiceConfig.redis_pass)
|
|
)
|
|
self.redis_port = int(
|
|
os.getenv("REDIS_PORT", config.get("redis_port", ServiceConfig.redis_port))
|
|
)
|
|
self.redis_socket = os.getenv(
|
|
"REDIS_SOCKET", config.get("redis_socket", ServiceConfig.redis_socket)
|
|
)
|
|
self.redis_sentinel = os.getenv(
|
|
"REDIS_SENTINEL", config.get("redis_sentinel", ServiceConfig.redis_sentinel)
|
|
)
|
|
self.redis_sentinel_user = os.getenv(
|
|
"REDIS_SENTINEL_USERNAME",
|
|
config.get("redis_sentinel_user", ServiceConfig.redis_sentinel_user),
|
|
)
|
|
self.redis_sentinel_pass = os.getenv(
|
|
"REDIS_SENTINEL_PASSWORD",
|
|
config.get("redis_sentinel_pass", ServiceConfig.redis_sentinel_pass),
|
|
)
|
|
self.redis_sentinel_service = os.getenv(
|
|
"REDIS_SENTINEL_SERVICE",
|
|
config.get("redis_sentinel_service", ServiceConfig.redis_sentinel_service),
|
|
)
|
|
self.redis_timeout = int(
|
|
os.getenv(
|
|
"REDIS_TIMEOUT",
|
|
self.alerting.frequency
|
|
if self.alerting.frequency != 0
|
|
else self.redis_timeout,
|
|
)
|
|
)
|
|
|
|
self.db_host = os.getenv(
|
|
"DB_HOST", config.get("db_host", ServiceConfig.db_host)
|
|
)
|
|
self.db_name = os.getenv(
|
|
"DB_DATABASE", config.get("db_name", ServiceConfig.db_name)
|
|
)
|
|
self.db_pass = os.getenv(
|
|
"DB_PASSWORD", config.get("db_pass", ServiceConfig.db_pass)
|
|
)
|
|
self.db_port = int(
|
|
os.getenv("DB_PORT", config.get("db_port", ServiceConfig.db_port))
|
|
)
|
|
self.db_socket = os.getenv(
|
|
"DB_SOCKET", config.get("db_socket", ServiceConfig.db_socket)
|
|
)
|
|
self.db_user = os.getenv(
|
|
"DB_USERNAME", config.get("db_user", ServiceConfig.db_user)
|
|
)
|
|
self.db_sslmode = os.getenv(
|
|
"DB_SSLMODE", config.get("db_sslmode", ServiceConfig.db_sslmode)
|
|
)
|
|
self.db_ssl_ca = os.getenv(
|
|
"MYSQL_ATTR_SSL_CA", config.get("db_ssl_ca", ServiceConfig.db_ssl_ca)
|
|
)
|
|
|
|
self.watchdog_enabled = config.get(
|
|
"service_watchdog_enabled", ServiceConfig.watchdog_enabled
|
|
)
|
|
self.logdir = config.get("log_dir", ServiceConfig.BASE_DIR + "/logs")
|
|
self.watchdog_logfile = config.get("log_file", self.logdir + "/librenms.log")
|
|
|
|
# set convenient debug variable
|
|
self.debug = logging.getLogger().isEnabledFor(logging.DEBUG)
|
|
|
|
if not self.debug and self.log_level:
|
|
try:
|
|
logging.getLogger().setLevel(self.log_level)
|
|
except ValueError:
|
|
logger.error(
|
|
"Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format(
|
|
self.log_level
|
|
)
|
|
)
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
|
|
def load_poller_config(self, db):
|
|
try:
|
|
settings = {}
|
|
cursor = db.query(
|
|
"SELECT * FROM `poller_cluster` WHERE `node_id`=%s", self.node_id
|
|
)
|
|
if cursor.rowcount == 0:
|
|
return
|
|
|
|
for index, setting in enumerate(cursor.fetchone()):
|
|
name = cursor.description[index][0]
|
|
settings[name] = setting
|
|
|
|
if settings["poller_name"] is not None:
|
|
self.set_name(settings["poller_name"])
|
|
if settings["poller_groups"] is not None:
|
|
self.group = ServiceConfig.parse_group(settings["poller_groups"])
|
|
if settings["poller_enabled"] is not None:
|
|
self.poller.enabled = settings["poller_enabled"]
|
|
if settings["poller_frequency"] is not None:
|
|
self.poller.frequency = settings["poller_frequency"]
|
|
if settings["poller_workers"] is not None:
|
|
self.poller.workers = settings["poller_workers"]
|
|
if settings["poller_down_retry"] is not None:
|
|
self.down_retry = settings["poller_down_retry"]
|
|
if settings["discovery_enabled"] is not None:
|
|
self.discovery.enabled = settings["discovery_enabled"]
|
|
if settings["discovery_frequency"] is not None:
|
|
self.discovery.frequency = settings["discovery_frequency"]
|
|
if settings["discovery_workers"] is not None:
|
|
self.discovery.workers = settings["discovery_workers"]
|
|
if settings["services_enabled"] is not None:
|
|
self.services.enabled = settings["services_enabled"]
|
|
if settings["services_frequency"] is not None:
|
|
self.services.frequency = settings["services_frequency"]
|
|
if settings["services_workers"] is not None:
|
|
self.services.workers = settings["services_workers"]
|
|
if settings["billing_enabled"] is not None:
|
|
self.billing.enabled = settings["billing_enabled"]
|
|
if settings["billing_frequency"] is not None:
|
|
self.billing.frequency = settings["billing_frequency"]
|
|
if settings["billing_calculate_frequency"] is not None:
|
|
self.billing.calculate = settings["billing_calculate_frequency"]
|
|
if settings["alerting_enabled"] is not None:
|
|
self.alerting.enabled = settings["alerting_enabled"]
|
|
if settings["alerting_frequency"] is not None:
|
|
self.alerting.frequency = settings["alerting_frequency"]
|
|
if settings["ping_enabled"] is not None:
|
|
self.ping.enabled = settings["ping_enabled"]
|
|
if settings["ping_frequency"] is not None:
|
|
self.ping.frequency = settings["ping_frequency"]
|
|
if settings["update_enabled"] is not None:
|
|
self.update_enabled = settings["update_enabled"]
|
|
if settings["update_frequency"] is not None:
|
|
self.update_frequency = settings["update_frequency"]
|
|
if settings["loglevel"] is not None:
|
|
self.log_level = settings["loglevel"]
|
|
if settings["watchdog_enabled"] is not None:
|
|
self.watchdog_enabled = settings["watchdog_enabled"]
|
|
if settings["watchdog_log"] is not None:
|
|
self.watchdog_logfile = settings["watchdog_log"]
|
|
except pymysql.err.Error:
|
|
logger.warning("Unable to load poller (%s) config", self.node_id)
|
|
|
|
@staticmethod
|
|
def parse_group(g):
|
|
if g is None:
|
|
return [0]
|
|
elif type(g) is int:
|
|
return [g]
|
|
elif type(g) is str:
|
|
try:
|
|
return [int(x) for x in set(g.split(","))]
|
|
except ValueError:
|
|
pass
|
|
|
|
logger.error("Could not parse group string, defaulting to 0")
|
|
return [0]
|
|
|
|
|
|
class Service:
|
|
config = ServiceConfig()
|
|
_fp = False
|
|
_started = False
|
|
start_time = 0
|
|
queue_managers = {}
|
|
poller_manager = None
|
|
discovery_manager = None
|
|
last_poll = {}
|
|
reap_flag = False
|
|
terminate_flag = False
|
|
reload_flag = False
|
|
db_failures = 0
|
|
|
|
def __init__(self):
|
|
self.start_time = time.time()
|
|
self.config.populate()
|
|
self._db = LibreNMS.DB(self.config)
|
|
self.config.load_poller_config(self._db)
|
|
|
|
threading.current_thread().name = self.config.name # rename main thread
|
|
self.attach_signals()
|
|
|
|
self._lm = self.create_lock_manager()
|
|
self.daily_timer = LibreNMS.RecurringTimer(
|
|
self.config.update_frequency, self.run_maintenance, "maintenance"
|
|
)
|
|
self.stats_timer = LibreNMS.RecurringTimer(
|
|
self.config.poller.frequency, self.log_performance_stats, "performance"
|
|
)
|
|
if self.config.watchdog_enabled:
|
|
logger.info(
|
|
"Starting watchdog timer for log file: {}".format(
|
|
self.config.watchdog_logfile
|
|
)
|
|
)
|
|
self.watchdog_timer = LibreNMS.RecurringTimer(
|
|
self.config.poller.frequency, self.logfile_watchdog, "watchdog"
|
|
)
|
|
else:
|
|
logger.info("Watchdog is disabled.")
|
|
self.systemd_watchdog_timer = LibreNMS.RecurringTimer(
|
|
10, self.systemd_watchdog, "systemd-watchdog"
|
|
)
|
|
self.is_master = False
|
|
|
|
def service_age(self):
|
|
return time.time() - self.start_time
|
|
|
|
def attach_signals(self):
|
|
logger.debug(
|
|
"Attaching signal handlers on thread %s", threading.current_thread().name
|
|
)
|
|
signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully
|
|
signal(SIGQUIT, self.terminate) # capture sigquit and exit gracefully
|
|
signal(SIGINT, self.terminate) # capture sigint and exit gracefully
|
|
signal(SIGHUP, self.reload) # capture sighup and restart gracefully
|
|
|
|
if "psutil" not in sys.modules:
|
|
logger.warning("psutil is not available, polling gap possible")
|
|
else:
|
|
signal(SIGCHLD, self.reap) # capture sigchld and reap the process
|
|
|
|
def reap_psutil(self):
|
|
"""
|
|
A process from a previous invocation is trying to report its status
|
|
"""
|
|
# Speed things up by only looking at direct zombie children
|
|
for p in psutil.Process().children(recursive=False):
|
|
try:
|
|
cmd = (
|
|
p.cmdline()
|
|
) # cmdline is uncached, so needs to go here to avoid NoSuchProcess
|
|
status = p.status()
|
|
|
|
if status == psutil.STATUS_ZOMBIE:
|
|
pid = p.pid
|
|
r = os.waitpid(p.pid, os.WNOHANG)
|
|
logger.warning(
|
|
'Reaped long running job "%s" in state %s with PID %d - job returned %d',
|
|
cmd,
|
|
status,
|
|
r[0],
|
|
r[1],
|
|
)
|
|
except (OSError, psutil.NoSuchProcess):
|
|
# process was already reaped
|
|
continue
|
|
|
|
def start(self):
|
|
logger.debug("Performing startup checks...")
|
|
|
|
if self.config.single_instance:
|
|
self.check_single_instance() # don't allow more than one service at a time
|
|
|
|
if self._started:
|
|
raise RuntimeWarning("Not allowed to start Poller twice")
|
|
self._started = True
|
|
|
|
logger.debug("Starting up queue managers...")
|
|
|
|
# initialize and start the worker pools
|
|
self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm)
|
|
self.queue_managers["poller"] = self.poller_manager
|
|
self.discovery_manager = LibreNMS.DiscoveryQueueManager(self.config, self._lm)
|
|
self.queue_managers["discovery"] = self.discovery_manager
|
|
self.queue_managers["alerting"] = LibreNMS.AlertQueueManager(
|
|
self.config, self._lm
|
|
)
|
|
self.queue_managers["services"] = LibreNMS.ServicesQueueManager(
|
|
self.config, self._lm
|
|
)
|
|
self.queue_managers["billing"] = LibreNMS.BillingQueueManager(
|
|
self.config, self._lm
|
|
)
|
|
self.queue_managers["ping"] = LibreNMS.PingQueueManager(self.config, self._lm)
|
|
|
|
if self.config.update_enabled:
|
|
self.daily_timer.start()
|
|
self.stats_timer.start()
|
|
self.systemd_watchdog_timer.start()
|
|
if self.config.watchdog_enabled:
|
|
self.watchdog_timer.start()
|
|
|
|
logger.info("LibreNMS Service: {} started!".format(self.config.unique_name))
|
|
logger.info(
|
|
"Poller group {}. Using Python {} and {} locks and queues".format(
|
|
"0 (default)" if self.config.group == [0] else self.config.group,
|
|
python_version(),
|
|
"redis" if isinstance(self._lm, LibreNMS.RedisLock) else "internal",
|
|
)
|
|
)
|
|
logger.info(
|
|
"Queue Workers: Discovery={} Poller={} Services={} Alerting={} Billing={} Ping={}".format(
|
|
self.config.discovery.workers
|
|
if self.config.discovery.enabled
|
|
else "disabled",
|
|
self.config.poller.workers
|
|
if self.config.poller.enabled
|
|
else "disabled",
|
|
self.config.services.workers
|
|
if self.config.services.enabled
|
|
else "disabled",
|
|
"enabled" if self.config.alerting.enabled else "disabled",
|
|
"enabled" if self.config.billing.enabled else "disabled",
|
|
"enabled" if self.config.ping.enabled else "disabled",
|
|
)
|
|
)
|
|
|
|
if self.config.update_enabled:
|
|
logger.info(
|
|
"Maintenance tasks will be run every {}".format(
|
|
timedelta(seconds=self.config.update_frequency)
|
|
)
|
|
)
|
|
else:
|
|
logger.warning("Maintenance tasks are disabled.")
|
|
|
|
# Main dispatcher loop
|
|
try:
|
|
while not self.terminate_flag:
|
|
if self.reload_flag:
|
|
logger.info("Picked up reload flag, calling the reload process")
|
|
self.restart()
|
|
|
|
if self.reap_flag:
|
|
self.reap_flag = False
|
|
self.reap_psutil()
|
|
|
|
master_lock = self._acquire_master()
|
|
if master_lock:
|
|
if not self.is_master:
|
|
logger.info(
|
|
"{} is now the master dispatcher".format(self.config.name)
|
|
)
|
|
self.is_master = True
|
|
self.start_dispatch_timers()
|
|
|
|
devices = self.fetch_immediate_device_list()
|
|
for device in devices:
|
|
device_id = device[0]
|
|
group = device[1]
|
|
|
|
if device[2]: # polling
|
|
self.dispatch_immediate_polling(device_id, group)
|
|
|
|
if device[3]: # discovery
|
|
self.dispatch_immediate_discovery(device_id, group)
|
|
else:
|
|
if self.is_master:
|
|
logger.info(
|
|
"{} is no longer the master dispatcher".format(
|
|
self.config.name
|
|
)
|
|
)
|
|
self.stop_dispatch_timers()
|
|
self.is_master = False # no longer master
|
|
sleep(self.config.master_resolution)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
logger.info("Dispatch loop terminated")
|
|
self.shutdown()
|
|
|
|
def _acquire_master(self):
|
|
return self._lm.lock(
|
|
"dispatch.master", self.config.unique_name, self.config.master_timeout, True
|
|
)
|
|
|
|
def _release_master(self):
|
|
self._lm.unlock("dispatch.master", self.config.unique_name)
|
|
|
|
# ------------ Discovery ------------
|
|
def dispatch_immediate_discovery(self, device_id, group):
|
|
if not self.discovery_manager.is_locked(device_id):
|
|
self.discovery_manager.post_work(device_id, group)
|
|
|
|
# ------------ Polling ------------
|
|
def dispatch_immediate_polling(self, device_id, group):
|
|
if not self.poller_manager.is_locked(device_id):
|
|
self.poller_manager.post_work(device_id, group)
|
|
|
|
if self.config.debug:
|
|
cur_time = time.time()
|
|
elapsed = cur_time - self.last_poll.get(device_id, cur_time)
|
|
self.last_poll[device_id] = cur_time
|
|
# arbitrary limit to reduce spam
|
|
if elapsed > (
|
|
self.config.poller.frequency - self.config.master_resolution
|
|
):
|
|
logger.debug(
|
|
"Dispatching polling for device {}, time since last poll {:.2f}s".format(
|
|
device_id, elapsed
|
|
)
|
|
)
|
|
|
|
def fetch_immediate_device_list(self):
|
|
try:
|
|
poller_find_time = self.config.poller.frequency - 1
|
|
discovery_find_time = self.config.discovery.frequency - 1
|
|
|
|
result = self._db.query(
|
|
"""SELECT `device_id`,
|
|
`poller_group`,
|
|
COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_polled_timetaken`, 0) SECOND), 1) AS `poll`,
|
|
IF(status=0, 0, IF (%s < `last_discovered_timetaken` * 1.25, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_discovered_timetaken`, 0) SECOND), 1))) AS `discover`
|
|
FROM `devices`
|
|
WHERE `disabled` = 0 AND (
|
|
`last_polled` IS NULL OR
|
|
`last_discovered` IS NULL OR
|
|
`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_polled_timetaken`, 0) SECOND) OR
|
|
`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_discovered_timetaken`, 0) SECOND)
|
|
)
|
|
ORDER BY `last_polled_timetaken` DESC""",
|
|
(
|
|
poller_find_time,
|
|
self.service_age(),
|
|
discovery_find_time,
|
|
poller_find_time,
|
|
discovery_find_time,
|
|
),
|
|
)
|
|
self.db_failures = 0
|
|
return result
|
|
except pymysql.err.Error:
|
|
self.db_failures += 1
|
|
if self.db_failures > self.config.max_db_failures:
|
|
logger.warning(
|
|
"Too many DB failures ({}), attempting to release master".format(
|
|
self.db_failures
|
|
)
|
|
)
|
|
self._release_master()
|
|
sleep(
|
|
self.config.master_resolution
|
|
) # sleep to give another node a chance to acquire
|
|
return []
|
|
|
|
def run_maintenance(self):
|
|
"""
|
|
Runs update and cleanup tasks by calling daily.sh. Reloads the python script after the update.
|
|
Sets a schema-update lock so no distributed pollers will update until the schema has been updated.
|
|
"""
|
|
attempt = 0
|
|
wait = 5
|
|
max_runtime = 86100
|
|
max_tries = int(max_runtime / wait)
|
|
logger.info("Waiting for schema lock")
|
|
while not self._lm.lock("schema-update", self.config.unique_name, max_runtime):
|
|
attempt += 1
|
|
if attempt >= max_tries: # don't get stuck indefinitely
|
|
logger.warning(
|
|
"Reached max wait for other pollers to update, updating now"
|
|
)
|
|
break
|
|
sleep(wait)
|
|
|
|
logger.info("Running maintenance tasks")
|
|
exit_code, output = LibreNMS.call_script("daily.sh")
|
|
if exit_code == 0:
|
|
logger.info("Maintenance tasks complete\n{}".format(output))
|
|
else:
|
|
logger.error("Error {} in daily.sh:\n{}".format(exit_code, output))
|
|
|
|
self._lm.unlock("schema-update", self.config.unique_name)
|
|
|
|
self.restart()
|
|
|
|
def create_lock_manager(self):
|
|
"""
|
|
Create a new LockManager. Tries to create a Redis LockManager, but falls
|
|
back to python's internal threading lock implementation.
|
|
Exits if distributing poller is enabled and a Redis LockManager cannot be created.
|
|
:return: Instance of LockManager
|
|
"""
|
|
try:
|
|
return LibreNMS.RedisLock(
|
|
sentinel_kwargs={
|
|
"username": self.config.redis_sentinel_user,
|
|
"password": self.config.redis_sentinel_pass,
|
|
"socket_timeout": self.config.redis_timeout,
|
|
"unix_socket_path": self.config.redis_socket,
|
|
},
|
|
namespace="librenms.lock",
|
|
host=self.config.redis_host,
|
|
port=self.config.redis_port,
|
|
db=self.config.redis_db,
|
|
username=self.config.redis_user,
|
|
password=self.config.redis_pass,
|
|
unix_socket_path=self.config.redis_socket,
|
|
sentinel=self.config.redis_sentinel,
|
|
sentinel_service=self.config.redis_sentinel_service,
|
|
socket_timeout=self.config.redis_timeout,
|
|
)
|
|
except ImportError:
|
|
if self.config.distributed:
|
|
logger.critical(
|
|
"ERROR: Redis connection required for distributed polling"
|
|
)
|
|
logger.critical(
|
|
"Please install redis-py, either through your os software repository or from PyPI"
|
|
)
|
|
self.exit(2)
|
|
except Exception as e:
|
|
if self.config.distributed:
|
|
logger.critical(
|
|
"ERROR: Redis connection required for distributed polling"
|
|
)
|
|
logger.critical(
|
|
"Lock manager could not connect to Redis. {}: {}".format(
|
|
type(e).__name__, e
|
|
)
|
|
)
|
|
self.exit(2)
|
|
|
|
return LibreNMS.ThreadingLock()
|
|
|
|
def restart(self):
|
|
"""
|
|
Stop then recreate this entire process by re-calling the original script.
|
|
Has the effect of reloading the python files from disk.
|
|
"""
|
|
if sys.version_info < (3, 4, 0):
|
|
logger.warning(
|
|
"Skipping restart as running under an incompatible interpreter"
|
|
)
|
|
logger.warning("Please restart manually")
|
|
return
|
|
|
|
logger.info("Restarting service... ")
|
|
|
|
if "psutil" not in sys.modules:
|
|
logger.warning("psutil is not available, polling gap possible")
|
|
self._stop_managers_and_wait()
|
|
else:
|
|
self._stop_managers()
|
|
self._release_master()
|
|
|
|
python = sys.executable
|
|
sys.stdout.flush()
|
|
os.execl(python, python, *sys.argv)
|
|
|
|
def reap(self, signalnum=None, flag=None):
|
|
"""
|
|
Handle a set the reload flag to begin a clean restart
|
|
:param signalnum: UNIX signal number
|
|
:param flag: Flags accompanying signal
|
|
"""
|
|
self.reap_flag = True
|
|
|
|
def reload(self, signalnum=None, flag=None):
|
|
"""
|
|
Handle a set the reload flag to begin a clean restart
|
|
:param signalnum: UNIX signal number
|
|
:param flag: Flags accompanying signal
|
|
"""
|
|
logger.info(
|
|
"Received signal on thread %s, handling", threading.current_thread().name
|
|
)
|
|
self.reload_flag = True
|
|
|
|
def terminate(self, signalnum=None, flag=None):
|
|
"""
|
|
Handle a set the terminate flag to begin a clean shutdown
|
|
:param signalnum: UNIX signal number
|
|
:param flag: Flags accompanying signal
|
|
"""
|
|
logger.info(
|
|
"Received signal on thread %s, handling", threading.current_thread().name
|
|
)
|
|
self.terminate_flag = True
|
|
|
|
def shutdown(self, signalnum=None, flag=None):
|
|
"""
|
|
Stop and exit, waiting for all child processes to exit.
|
|
:param signalnum: UNIX signal number
|
|
:param flag: Flags accompanying signal
|
|
"""
|
|
logger.info("Shutting down, waiting for running jobs to complete...")
|
|
|
|
self.stop_dispatch_timers()
|
|
self._release_master()
|
|
|
|
self.daily_timer.stop()
|
|
self.stats_timer.stop()
|
|
self.systemd_watchdog_timer.stop()
|
|
if self.config.watchdog_enabled:
|
|
self.watchdog_timer.stop()
|
|
|
|
self._stop_managers_and_wait()
|
|
|
|
# try to release master lock
|
|
logger.info(
|
|
"Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name
|
|
)
|
|
self.exit(0)
|
|
|
|
def start_dispatch_timers(self):
|
|
"""
|
|
Start all dispatch timers and begin pushing events into queues.
|
|
This should only be started when we are the master dispatcher.
|
|
"""
|
|
for manager in self.queue_managers.values():
|
|
try:
|
|
manager.start_dispatch()
|
|
except AttributeError:
|
|
pass
|
|
|
|
def stop_dispatch_timers(self):
|
|
"""
|
|
Stop all dispatch timers, this should be called when we are no longer the master dispatcher.
|
|
"""
|
|
for manager in self.queue_managers.values():
|
|
try:
|
|
manager.stop_dispatch()
|
|
except AttributeError:
|
|
pass
|
|
|
|
def _stop_managers(self):
|
|
for manager in self.queue_managers.values():
|
|
manager.stop()
|
|
|
|
def _stop_managers_and_wait(self):
|
|
"""
|
|
Stop all QueueManagers, and wait for their processing threads to complete.
|
|
We send the stop signal to all QueueManagers first, then wait for them to finish.
|
|
"""
|
|
self._stop_managers()
|
|
|
|
for manager in self.queue_managers.values():
|
|
manager.stop_and_wait()
|
|
|
|
def check_single_instance(self):
|
|
"""
|
|
Check that there is only one instance of the service running on this computer.
|
|
We do this be creating a file in the base directory (.lock.service) if it doesn't exist and
|
|
obtaining an exclusive lock on that file.
|
|
"""
|
|
lock_file = "{}/{}".format(self.config.BASE_DIR, ".lock.service")
|
|
|
|
import fcntl
|
|
|
|
self._fp = open(
|
|
lock_file, "w"
|
|
) # keep a reference so the file handle isn't garbage collected
|
|
self._fp.flush()
|
|
try:
|
|
fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except IOError:
|
|
logger.warning("Another instance is already running, quitting.")
|
|
self.exit(2)
|
|
|
|
def log_performance_stats(self):
|
|
logger.info("Counting up time spent polling")
|
|
|
|
try:
|
|
# Report on the poller instance as a whole
|
|
self._db.query(
|
|
"INSERT INTO poller_cluster(node_id, poller_name, poller_version, poller_groups, last_report, master) "
|
|
'values("{0}", "{1}", "{2}", "{3}", NOW(), {4}) '
|
|
'ON DUPLICATE KEY UPDATE poller_version="{2}", poller_groups="{3}", last_report=NOW(), master={4}; '.format(
|
|
self.config.node_id,
|
|
self.config.name,
|
|
"librenms-service",
|
|
",".join(str(i) for i in self.config.group),
|
|
1 if self.is_master else 0,
|
|
)
|
|
)
|
|
|
|
# Find our ID
|
|
self._db.query(
|
|
'SELECT id INTO @parent_poller_id FROM poller_cluster WHERE node_id="{0}"; '.format(
|
|
self.config.node_id
|
|
)
|
|
)
|
|
|
|
for worker_type, manager in self.queue_managers.items():
|
|
worker_seconds, devices = manager.performance.reset()
|
|
|
|
# Record the queue state
|
|
self._db.query(
|
|
"INSERT INTO poller_cluster_stats(parent_poller, poller_type, depth, devices, worker_seconds, workers, frequency) "
|
|
'values(@parent_poller_id, "{0}", {1}, {2}, {3}, {4}, {5}) '
|
|
"ON DUPLICATE KEY UPDATE depth={1}, devices={2}, worker_seconds={3}, workers={4}, frequency={5}; ".format(
|
|
worker_type,
|
|
sum(
|
|
[
|
|
manager.get_queue(group).qsize()
|
|
for group in self.config.group
|
|
]
|
|
),
|
|
devices,
|
|
worker_seconds,
|
|
getattr(self.config, worker_type).workers,
|
|
getattr(self.config, worker_type).frequency,
|
|
)
|
|
)
|
|
except (pymysql.err.Error, ConnectionResetError, RedisConnectionError):
|
|
logger.critical(
|
|
"Unable to log performance statistics - is the database still online?",
|
|
exc_info=True,
|
|
)
|
|
|
|
def systemd_watchdog(self):
|
|
if "systemd.daemon" in sys.modules:
|
|
notify("WATCHDOG=1")
|
|
|
|
def logfile_watchdog(self):
|
|
|
|
try:
|
|
# check that lofgile has been written to within last poll period
|
|
logfile_mdiff = datetime.now().timestamp() - os.path.getmtime(
|
|
self.config.watchdog_logfile
|
|
)
|
|
except FileNotFoundError as e:
|
|
logger.error("Log file not found! {}".format(e))
|
|
return
|
|
|
|
if logfile_mdiff > self.config.poller.frequency:
|
|
logger.critical(
|
|
"BARK! Log file older than {}s, restarting service!".format(
|
|
self.config.poller.frequency
|
|
),
|
|
exc_info=True,
|
|
)
|
|
self.restart()
|
|
else:
|
|
logger.info("Log file updated {}s ago".format(int(logfile_mdiff)))
|
|
|
|
def exit(self, code=0):
|
|
sys.stdout.flush()
|
|
sys.exit(code)
|