From 49c786f394ffae4a322dbfc0d6956228427509a8 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Tue, 8 Sep 2015 13:51:10 -0400 Subject: [PATCH] make more resiliant to MySQL disconnects --- poller-service.py | 73 ++++++++++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/poller-service.py b/poller-service.py index 3f72ca4b94..a11064e3fb 100755 --- a/poller-service.py +++ b/poller-service.py @@ -49,6 +49,27 @@ config_file = install_dir + '/config.php' log.info('INFO: Starting poller-service') +class DB: + conn = None + + def connect(self): + if db_port == 0: + self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) + else: + self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) + self.conn.autocommit(True) + + def query(self, sql): + try: + cursor = self.conn.cursor() + cursor.execute(sql) + except (AttributeError, MySQLdb.OperationalError): + self.connect() + cursor = self.conn.cursor() + cursor.execute(sql) + return cursor + + def get_config_data(): config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir] try: @@ -130,25 +151,7 @@ try: except KeyError: down_retry = 60 -def connectDB(): - try: - if db_port == 0: - db_inst = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) - else: - db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) - db_inst.autocommit(True) - cursor_inst = db_inst.cursor() - ret = namedtuple('db_connection', ['db', 'cursor']) - ret.db = db_inst - ret.cursor = cursor_inst - return ret - except: - log.critical("ERROR: Could not connect to MySQL database!") - sys.exit(2) - -db_connection = connectDB() -db = db_connection.db -cursor = db_connection.cursor +db = DB() def poll_worker(device_id, action): try: @@ -172,25 +175,23 @@ def poll_worker(device_id, action): -def lockFree(lock, cursor=cursor): +def lockFree(lock, db=db): query = "SELECT IS_FREE_LOCK('{0}')".format(lock) - cursor.execute(query) + cursor = db.query(query) return cursor.fetchall()[0][0] == 1 -def getLock(lock, cursor=cursor): +def getLock(lock, db=db): query = "SELECT GET_LOCK('{0}', 0)".format(lock) - cursor.execute(query) + cursor = db.query(query) return cursor.fetchall()[0][0] == 1 thread_cursors = [] for i in range(0, amount_of_workers): - thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'cursor', 'db'])) + thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'db'])) thread_cursors[i].in_use = False - thread_db_connection = connectDB() - thread_cursors[i].cursor = thread_db_connection.cursor - thread_cursors[i].db = thread_db_connection.db + thread_cursors[i].db = DB() def getThreadQueueLock(device_id): @@ -200,7 +201,7 @@ def getThreadQueueLock(device_id): for thread_cursor in thread_cursors: if not thread_cursor.in_use: thread_cursor.in_use = 'queue.{0}'.format(device_id) - if getLock('queue.{0}'.format(device_id), thread_cursor.cursor): + if getLock('queue.{0}'.format(device_id), thread_cursor.db): return True else: thread_cursor.in_use = False @@ -214,8 +215,8 @@ def getThreadActionLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == 'queue.{0}'.format(device_id): thread_cursor.in_use = '{0}.{1}'.format(action, device_id) - releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor) - if getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor): + releaseLock('queue.{0}'.format(device_id), thread_cursor.db) + if getLock('{0}.{1}'.format(action, device_id), thread_cursor.db): return True else: thread_cursor.in_use = False @@ -223,9 +224,9 @@ def getThreadActionLock(device_id, action): return False -def releaseLock(lock, cursor=cursor): +def releaseLock(lock, db=db): query = "SELECT RELEASE_LOCK('{0}')".format(lock) - cursor.execute(query) + cursor = db.query(query) return cursor.fetchall()[0][0] == 1 @@ -234,7 +235,7 @@ def releaseThreadLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == '{0}.{1}'.format(action, device_id): thread_cursor.in_use = False - return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) + return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.db) return False @@ -308,7 +309,7 @@ while True: devices_scanned, seconds_taken) try: - cursor.execute(update_query) + cursor = db.query(update_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) @@ -318,7 +319,7 @@ while True: next_update = datetime.now() + timedelta(minutes=1) try: - cursor.execute(dev_query) + cursor = db.query(dev_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) @@ -339,7 +340,7 @@ while True: log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id)) devices_scanned += 1 - cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) + cursor = db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) cursor.fetchall() if not getThreadActionLock(device_id, action):