librenms/poller-service.py

287 lines
11 KiB
Python
Raw Normal View History

2015-07-06 14:52:32 +00:00
#! /usr/bin/env python
2015-07-06 14:47:18 +00:00
"""
poller-service A service to wrap SNMP polling. It will poll up to $threads devices at a time, and will not re-poll
2015-07-07 18:33:33 +00:00
devices that have been polled within the last $poll_frequency seconds. It will prioritize devices based
on the last time polled. If resources are sufficient, this service should poll every device every
2015-07-07 17:02:13 +00:00
$poll_frequency seconds, but should gracefully degrade if resources are inefficient, polling devices as
2015-07-16 16:11:45 +00:00
frequently as possible. This service is based on Job Snijders' poller-wrapper.py.
2015-07-06 14:47:18 +00:00
Author: Clint Armstrong <clint@clintarmstrong.net>
Date: July 2015
2015-07-16 16:11:45 +00:00
License: BSD 2-Clause
2015-07-06 14:47:18 +00:00
"""
import json
import os
import subprocess
import sys
import threading
import time
import MySQLdb
2015-07-07 15:20:09 +00:00
import logging
import logging.handlers
2015-07-06 16:00:21 +00:00
from datetime import datetime, timedelta
2015-07-06 14:47:18 +00:00
2015-07-07 15:20:09 +00:00
log = logging.getLogger('poller-service')
log.setLevel(logging.DEBUG)
formatter = logging.Formatter('poller-service: %(message)s')
2015-07-07 18:33:33 +00:00
handler = logging.handlers.SysLogHandler(address='/dev/log')
2015-07-07 15:20:09 +00:00
handler.setFormatter(formatter)
log.addHandler(handler)
2015-07-06 14:47:18 +00:00
install_dir = os.path.dirname(os.path.realpath(__file__))
2015-07-06 14:52:59 +00:00
config_file = install_dir + '/config.php'
2015-07-06 14:47:18 +00:00
2015-07-07 15:40:29 +00:00
log.info('INFO: Starting poller-service')
2015-07-06 19:10:48 +00:00
2015-07-07 18:33:33 +00:00
2015-07-06 14:47:18 +00:00
def get_config_data():
2015-07-06 14:52:59 +00:00
config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir]
2015-07-06 14:47:18 +00:00
try:
proc = subprocess.Popen(config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
except:
2015-07-07 15:20:09 +00:00
log.critical("ERROR: Could not execute: %s" % config_cmd)
2015-07-06 14:47:18 +00:00
sys.exit(2)
return proc.communicate()[0]
try:
with open(config_file) as f:
pass
except IOError as e:
2015-07-07 15:20:09 +00:00
log.critical("ERROR: Oh dear... %s does not seem readable" % config_file)
2015-07-06 14:47:18 +00:00
sys.exit(2)
try:
config = json.loads(get_config_data())
except:
2015-07-07 15:20:09 +00:00
log.critical("ERROR: Could not load or parse configuration, are PATHs correct?")
2015-07-06 14:47:18 +00:00
sys.exit(2)
2015-07-07 15:29:04 +00:00
try:
2015-07-07 15:40:29 +00:00
loglevel = logging.getLevelName(config['poller_service_loglevel'].upper())
2015-07-07 15:29:04 +00:00
except KeyError:
2015-07-07 15:40:29 +00:00
loglevel = logging.getLevelName('INFO')
if not isinstance(loglevel, int):
log.warning('ERROR: {} is not a valid log level'.format(str(loglevel)))
loglevel = logging.getLevelName('INFO')
log.setLevel(loglevel)
2015-07-07 15:29:04 +00:00
2015-07-06 14:47:18 +00:00
poller_path = config['install_dir'] + '/poller.php'
2015-07-07 17:02:13 +00:00
discover_path = config['install_dir'] + '/discovery.php'
2015-07-06 14:47:18 +00:00
db_username = config['db_user']
db_password = config['db_pass']
if config['db_host'][:5].lower() == 'unix:':
2015-07-06 19:10:48 +00:00
db_server = config['db_host']
db_port = 0
2015-07-06 14:47:18 +00:00
elif ':' in config['db_host']:
2015-07-06 19:10:48 +00:00
db_server = config['db_host'].rsplit(':')[0]
db_port = int(config['db_host'].rsplit(':')[1])
2015-07-06 14:47:18 +00:00
else:
2015-07-06 19:10:48 +00:00
db_server = config['db_host']
db_port = 0
2015-07-06 14:47:18 +00:00
db_dbname = config['db_name']
2015-07-06 16:00:21 +00:00
2015-07-06 14:47:18 +00:00
try:
2015-07-07 15:40:29 +00:00
amount_of_workers = int(config['poller_service_workers'])
2015-07-06 14:47:18 +00:00
if amount_of_workers == 0:
2015-07-16 15:55:04 +00:00
amount_of_workers = 16
2015-07-07 15:40:29 +00:00
except KeyError:
2015-07-06 14:47:18 +00:00
amount_of_workers = 16
try:
2015-07-07 17:02:13 +00:00
poll_frequency = int(config['poller_service_poll_frequency'])
if poll_frequency == 0:
2015-07-16 15:55:04 +00:00
poll_frequency = 300
2015-07-07 15:40:29 +00:00
except KeyError:
2015-07-07 17:02:13 +00:00
poll_frequency = 300
try:
discover_frequency = int(config['poller_service_discover_frequency'])
2015-07-07 17:10:36 +00:00
if discover_frequency == 0:
2015-07-16 15:55:04 +00:00
discover_frequency = 21600
2015-07-07 17:02:13 +00:00
except KeyError:
2015-07-07 17:10:36 +00:00
discover_frequency = 21600
2015-07-06 14:47:18 +00:00
2015-07-06 18:51:03 +00:00
try:
2015-07-07 15:40:29 +00:00
down_retry = int(config['poller_service_down_retry'])
2015-07-06 18:51:03 +00:00
if down_retry == 0:
2015-07-16 15:55:04 +00:00
down_retry = 60
2015-07-07 15:40:29 +00:00
except KeyError:
2015-07-07 17:33:16 +00:00
down_retry = 60
2015-07-06 18:51:03 +00:00
2015-07-06 14:47:18 +00:00
try:
if db_port == 0:
db = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
else:
db = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
2015-07-06 18:51:03 +00:00
db.autocommit(True)
2015-07-06 14:47:18 +00:00
cursor = db.cursor()
except:
2015-07-07 15:20:09 +00:00
log.critical("ERROR: Could not connect to MySQL database!")
2015-07-06 14:47:18 +00:00
sys.exit(2)
2015-07-06 19:10:48 +00:00
2015-07-07 17:02:13 +00:00
def poll_worker(device_id, action):
2015-07-06 16:00:21 +00:00
try:
start_time = time.time()
2015-07-07 17:02:13 +00:00
path = poller_path
2015-07-07 17:10:36 +00:00
if action == 'discovery':
path = discover_path
2015-07-07 17:02:13 +00:00
command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id)
2015-07-06 16:00:21 +00:00
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
2015-07-07 15:20:09 +00:00
if elapsed_time < 300:
log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time))
2015-07-07 15:20:09 +00:00
else:
log.warning("WARNING: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time))
2015-07-06 16:00:21 +00:00
except (KeyboardInterrupt, SystemExit):
raise
2015-07-06 19:10:48 +00:00
except:
pass
2015-07-06 16:00:21 +00:00
2015-07-06 19:10:48 +00:00
2015-07-06 18:51:03 +00:00
def lockFree(lock):
global cursor
query = "SELECT IS_FREE_LOCK('{}')".format(lock)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
2015-07-06 16:00:21 +00:00
2015-07-06 19:10:48 +00:00
2015-07-06 18:51:03 +00:00
def getLock(lock):
global cursor
query = "SELECT GET_LOCK('{}', 0)".format(lock)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
2015-07-06 19:10:48 +00:00
2015-07-06 18:51:03 +00:00
def releaseLock(lock):
global cursor
query = "SELECT RELEASE_LOCK('{}')".format(lock)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
2015-07-07 18:33:33 +00:00
def sleep_until(timestamp):
now = datetime.now()
if timestamp > now:
sleeptime = (timestamp - now).seconds
else:
sleeptime = 0
time.sleep(sleeptime)
poller_group = ('and poller_group IN({}) '
2015-07-07 18:33:33 +00:00
.format(str(config['distributed_poller_group'])) if 'distributed_poller_group' in config else '')
# Add last_polled and last_polled_timetaken so we can sort by the time the last poll started, with the goal
# of having each device complete a poll within the given time range.
2015-07-13 18:32:00 +00:00
dev_query = ('SELECT device_id, status, '
2015-07-13 14:15:06 +00:00
'DATE_ADD( '
' DATE_SUB( '
' last_polled, '
' INTERVAL last_polled_timetaken SECOND '
' ), '
' INTERVAL {0} SECOND) AS next_poll, '
'DATE_ADD( '
' DATE_SUB( '
' last_discovered, '
' INTERVAL last_discovered_timetaken SECOND '
' ), '
' INTERVAL {1} SECOND) AS next_discovery '
'FROM devices WHERE '
'disabled = 0 '
'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("queued.", device_id)) '
'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) '
' OR last_poll_attempted IS NULL ) '
'{3} '
'ORDER BY next_poll asc ').format(poll_frequency,
discover_frequency,
down_retry,
poller_group)
threads = 0
2015-07-13 14:59:26 +00:00
next_update = datetime.now() + timedelta(minutes=1)
2015-07-07 15:29:04 +00:00
devices_scanned = 0
2015-07-06 18:51:03 +00:00
while True:
cur_threads = threading.active_count()
if cur_threads != threads:
threads = cur_threads
2015-07-07 15:20:09 +00:00
log.debug('DEBUG: {} threads currently active'.format(threads))
if next_update < datetime.now():
2015-07-13 14:59:26 +00:00
seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds
2015-07-10 21:34:34 +00:00
update_query = ('INSERT INTO pollers(poller_name, '
' last_polled, '
' devices, '
2015-07-10 21:34:34 +00:00
' time_taken) '
' values("{0}", NOW(), "{1}", "{2}") '
'ON DUPLICATE KEY UPDATE '
' last_polled=values(last_polled), '
2015-07-13 14:51:56 +00:00
' devices=values(devices), '
' time_taken=values(time_taken) ').format(config['distributed_poller_name'].strip(),
devices_scanned,
seconds_taken)
2015-07-13 15:10:14 +00:00
try:
cursor.execute(update_query)
except:
log.critical('ERROR: MySQL query error. Is your schema up to date?')
2015-07-13 15:16:51 +00:00
sys.exit(2)
cursor.fetchall()
2015-07-07 15:20:09 +00:00
log.info('INFO: {} devices scanned in the last 5 minutes'.format(devices_scanned))
devices_scanned = 0
2015-07-13 14:59:26 +00:00
next_update = datetime.now() + timedelta(minutes=1)
2015-07-06 18:51:03 +00:00
while threading.active_count() >= amount_of_workers:
time.sleep(.5)
2015-07-13 15:10:14 +00:00
try:
cursor.execute(dev_query)
except:
log.critical('ERROR: MySQL query error. Is your schema up to date?')
2015-07-13 15:16:51 +00:00
sys.exit(2)
2015-07-13 15:10:14 +00:00
2015-07-06 18:51:03 +00:00
devices = cursor.fetchall()
2015-07-13 18:32:00 +00:00
for device_id, status, next_poll, next_discovery in devices:
# add queue lock, so we lock the next device against any other pollers
# if this fails, the device is locked by another poller already
if not getLock('queued.{}'.format(device_id)):
2015-07-13 14:10:49 +00:00
time.sleep(.5)
continue
2015-07-06 18:51:03 +00:00
if not lockFree('polling.{}'.format(device_id)):
releaseLock('queued.{}'.format(device_id))
2015-07-13 14:10:49 +00:00
time.sleep(.5)
2015-07-06 18:51:03 +00:00
continue
2015-07-06 14:47:18 +00:00
2015-07-07 17:10:36 +00:00
if next_poll and next_poll > datetime.now():
2015-07-13 14:10:49 +00:00
log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id))
sleep_until(next_poll)
2015-07-07 17:02:13 +00:00
action = 'poll'
2015-07-13 18:32:00 +00:00
if (not next_discovery or next_discovery < datetime.now()) and status == 1:
2015-07-07 17:02:13 +00:00
action = 'discovery'
log.debug('DEBUG: Starting {} of device {}'.format(action, device_id))
2015-07-07 15:20:09 +00:00
devices_scanned += 1
cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {}'.format(device_id))
cursor.fetchall()
2015-07-07 17:02:13 +00:00
t = threading.Thread(target=poll_worker, args=[device_id, action])
2015-07-06 18:51:03 +00:00
t.start()
# If we made it this far, break out of the loop and query again.
break
2015-07-13 13:48:18 +00:00
# Looping with no break causes the service to be killed by init.
# This point is only reached if the query is empty, so sleep half a second before querying again.
time.sleep(.5)
2015-07-13 14:06:57 +00:00
# Make sure we're not holding any device queue locks in this connection before querying again.
getLock('unlock.{}'.format(config['distributed_poller_name']))