Merge pull request #1838 from clinta/poller-service2

poller-service with all locking in Python
This commit is contained in:
Daniel Preussker 2015-09-24 19:22:50 +00:00
commit 11a754546d
2 changed files with 161 additions and 102 deletions

View File

@ -4,7 +4,7 @@
The Poller service is an alternative to polling and discovery cron jobs and provides support for distributed polling without memcache. It is multi-threaded and runs continuously discovering and polling devices with the oldest data attempting to honor the polling frequency configured in `config.php`. This service replaces all the required cron jobs except for `/opt/librenms/daily.sh` and `/opt/librenms/alerts.php`.
Configure the maximum number of threads for the service in `$config['poller_service_workers']`. Configure the minimum desired polling frequency in `$config['poller_service_poll_frequency']` and the minimum desired discovery frequency in `$config['poller_service_discover_frequency']`. The service will not poll or discover devices which have data newer than this this configured age in seconds. Configure how frequently the service will attempt to poll devices which are down in `$config['poller_service_down_retry']`.
Configure the maximum number of threads for the service in `$config['poller_service_workers']`. Configure the minimum desired polling frequency in `$config['poller_service_poll_frequency']` and the minimum desired discovery frequency in `$config['poller_service_discover_frequency']`. The service will not poll or discover devices which have data newer than this this configured age in seconds. Configure how frequently the service will attempt to poll devices which are down in `$config['poller_service_down_retry']`. If you have enough pollers that the worker threads run out of work, the service will query looking for devices every `$config['poller_service_retry_query']` seconds.
The poller service is designed to gracefully degrade. If not all devices can be polled within the configured frequency, the service will continuously poll devices refreshing as frequently as possible using the configured number of threads.
@ -18,6 +18,8 @@ $config['poller_service_workers'] = 16;
$config['poller_service_poll_frequency'] = 300;
$config['poller_service_discover_frequency'] = 21600;
$config['poller_service_down_retry'] = 60;
$config['poller_service_retry_query'] = 1;
$config['poller_service_single_connection'] = false;
```
## Distributed Polling
@ -26,6 +28,9 @@ Distributed polling is possible, and uses the same configuration options as are
## Multi-Master MySQL considerations
Because locks are not replicated in Multi-Master MySQL configurations, if you are using such a configuration, you will need to make sure that all pollers are using the same MySQL server.
## Single Connection
If you are running MariaDB 10.2 or newer, you can tell poller-service to use a single mysql connectino for managing locks by setting `$config['poller_service_single_connection']` to `true`. *DO NOT* configure this for any version of MariaDB less than 10.2 or any version of MySQL.
## Service Installation
An upstart configuration `poller-service.conf` is provided. To install run `ln -s /opt/librenms/poller-service.conf /etc/init/poller-service.conf`. The service will start on boot and can be started manually by running `start poller-service`. If you recieve an error that the service does not exist, run `initctl reload-configuration`. The service is configured to run as the user `librenms` and will fail if that user does not exist.

View File

@ -33,6 +33,7 @@ import MySQLdb
import logging
import logging.handlers
from datetime import datetime, timedelta
from collections import namedtuple
log = logging.getLogger('poller-service')
log.setLevel(logging.DEBUG)
@ -48,6 +49,54 @@ config_file = install_dir + '/config.php'
log.info('INFO: Starting poller-service')
class DB:
conn = None
def __init__(self):
self.in_use = threading.Lock()
self.connect()
def connect(self):
self.in_use.acquire(True)
while True:
try:
self.conn.close()
except:
pass
try:
if db_port == 0:
self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
else:
self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
break
except (AttributeError, MySQLdb.OperationalError):
log.warning('WARNING: MySQL Error, reconnecting.')
time.sleep(.5)
self.conn.autocommit(True)
self.conn.ping(True)
self.in_use.release()
def query(self, sql):
self.in_use.acquire(True)
while True:
try:
cursor = self.conn.cursor()
cursor.execute(sql)
ret = cursor.fetchall()
cursor.close()
self.in_use.release()
return ret
except (AttributeError, MySQLdb.OperationalError):
log.warning('WARNING: MySQL Operational Error during query, reconnecting.')
self.in_use.release()
self.connect()
except (AttributeError, MySQLdb.ProgrammingError):
log.warning('WARNING: MySQL Programming Error during query, attempting query again.')
cursor.close()
def get_config_data():
config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir]
try:
@ -130,55 +179,34 @@ except KeyError:
down_retry = 60
try:
if db_port == 0:
db = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
else:
db = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
db.autocommit(True)
cursor = db.cursor()
except:
log.critical("ERROR: Could not connect to MySQL database!")
sys.exit(2)
retry_query = int(config['poller_service_retry_query'])
if retry_query == 0:
retry_query = 1
except KeyError:
retry_query = 1
try:
single_connection = bool(config['poller_service_single_connection'])
except KeyError:
single_connection = False
db = DB()
def poll_worker(device_id, action):
try:
start_time = time.time()
path = poller_path
if action == 'discovery':
path = discover_path
command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id)
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
if elapsed_time < 300:
log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time))
else:
log.warning("WARNING: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time))
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
def lockFree(lock):
global cursor
def lockFree(lock, db=db):
query = "SELECT IS_FREE_LOCK('{0}')".format(lock)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
return db.query(query)[0][0] == 1
def getLock(lock):
global cursor
def getLock(lock, db=db):
query = "SELECT GET_LOCK('{0}', 0)".format(lock)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
return db.query(query)[0][0] == 1
def releaseLock(lock):
global cursor
def releaseLock(lock, db=db):
query = "SELECT RELEASE_LOCK('{0}')".format(lock)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
cursor = db.query(query)
return db.query(query)[0][0] == 1
def sleep_until(timestamp):
@ -202,7 +230,7 @@ dev_query = ('SELECT device_id, status,
' INTERVAL last_polled_timetaken SECOND '
' ), '
' INTERVAL {0} SECOND) '
' AS DATETIME(0) '
' AS DATETIME '
') AS next_poll, '
'CAST( '
' DATE_ADD( '
@ -211,96 +239,122 @@ dev_query = ('SELECT device_id, status,
' INTERVAL last_discovered_timetaken SECOND '
' ), '
' INTERVAL {1} SECOND) '
' AS DATETIME(0) '
' AS DATETIME '
') as next_discovery '
'FROM devices WHERE '
'disabled = 0 '
'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("queued.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("poll.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("discovery.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("queue.", device_id)) '
'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) '
' OR last_poll_attempted IS NULL ) '
'{3} '
'ORDER BY next_poll asc '
'LIMIT 5 ').format(poll_frequency,
'LIMIT 1 ').format(poll_frequency,
discover_frequency,
down_retry,
poller_group)
threads = 0
next_update = datetime.now() + timedelta(minutes=1)
devices_scanned = 0
while True:
cur_threads = threading.active_count()
if cur_threads != threads:
threads = cur_threads
log.debug('DEBUG: {0} threads currently active'.format(threads))
dont_query_until = datetime.fromtimestamp(0)
if next_update < datetime.now():
seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds
update_query = ('INSERT INTO pollers(poller_name, '
' last_polled, '
' devices, '
' time_taken) '
' values("{0}", NOW(), "{1}", "{2}") '
'ON DUPLICATE KEY UPDATE '
' last_polled=values(last_polled), '
' devices=values(devices), '
' time_taken=values(time_taken) ').format(config['distributed_poller_name'].strip(),
devices_scanned,
seconds_taken)
try:
cursor.execute(update_query)
except:
log.critical('ERROR: MySQL query error. Is your schema up to date?')
sys.exit(2)
cursor.fetchall()
log.info('INFO: {0} devices scanned in the last minute'.format(devices_scanned))
devices_scanned = 0
next_update = datetime.now() + timedelta(minutes=1)
def poll_worker():
global dev_query
global devices_scanned
global dont_query_until
global single_connection
thread_id = threading.current_thread().name
while threading.active_count() >= amount_of_workers or not lockFree('schema_update'):
time.sleep(.5)
if single_connection:
global db
else:
db = DB()
try:
cursor.execute(dev_query)
except:
log.critical('ERROR: MySQL query error. Is your schema up to date?')
sys.exit(2)
devices = cursor.fetchall()
for device_id, status, next_poll, next_discovery in devices:
# add queue lock, so we lock the next device against any other pollers
# if this fails, the device is locked by another poller already
if not getLock('queued.{0}'.format(device_id)):
while True:
if datetime.now() < dont_query_until:
time.sleep(1)
continue
if not lockFree('polling.{0}'.format(device_id)):
releaseLock('queued.{0}'.format(device_id))
dev_row = db.query(dev_query)
if len(dev_row) < 1:
dont_query_until = datetime.now() + timedelta(seconds=retry_query)
time.sleep(1)
continue
device_id, status, next_poll, next_discovery = dev_row[0]
if not getLock('queue.{0}'.format(device_id), db):
releaseLock('queue.{0}'.format(device_id), db)
continue
if next_poll and next_poll > datetime.now():
log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id))
log.debug('DEBUG: Thread {0} Sleeping until {1} before polling {2}'.format(thread_id, next_poll, device_id))
sleep_until(next_poll)
action = 'poll'
if (not next_discovery or next_discovery < datetime.now()) and status == 1:
action = 'discovery'
log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id))
log.debug('DEBUG: Thread {0} Starting {1} of device {2}'.format(thread_id, action, device_id))
devices_scanned += 1
cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id))
cursor.fetchall()
db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id))
t = threading.Thread(target=poll_worker, args=[device_id, action])
t.start()
if not getLock('{0}.{1}'.format(action, device_id), db):
releaseLock('{0}.{1}'.format(action, device_id), db)
releaseLock('queue.{0}'.format(device_id), db)
continue
# If we made it this far, break out of the loop and query again.
break
releaseLock('queue.{0}'.format(device_id), db)
try:
start_time = time.time()
path = poller_path
if action == 'discovery':
path = discover_path
command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id)
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
if elapsed_time < 300:
log.debug("DEBUG: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time))
else:
log.warning("WARNING: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time))
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
finally:
releaseLock('{0}.{1}'.format(action, device_id), db)
# This point is only reached if the query is empty, so sleep half a second before querying again.
time.sleep(.5)
for i in range(0, amount_of_workers):
t = threading.Thread(target=poll_worker)
t.name = i
t.daemon = True
t.start()
# Make sure we're not holding any device queue locks in this connection before querying again
# by locking a different string.
getLock('unlock.{0}'.format(config['distributed_poller_name']))
while True:
sleep_until(next_update)
seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds
update_query = ('INSERT INTO pollers(poller_name, '
' last_polled, '
' devices, '
' time_taken) '
' values("{0}", NOW(), "{1}", "{2}") '
'ON DUPLICATE KEY UPDATE '
' last_polled=values(last_polled), '
' devices=values(devices), '
' time_taken=values(time_taken) ').format(config['distributed_poller_name'].strip(),
devices_scanned,
seconds_taken)
try:
db.query(update_query)
except:
log.critical('ERROR: MySQL query error. Is your schema up to date?')
sys.exit(2)
log.info('INFO: {0} devices scanned in the last minute'.format(devices_scanned))
devices_scanned = 0
next_update = datetime.now() + timedelta(minutes=1)