make more resiliant to MySQL disconnects

This commit is contained in:
Clint Armstrong 2015-09-08 13:51:10 -04:00
parent 68a970663f
commit 49c786f394

View File

@ -49,6 +49,27 @@ config_file = install_dir + '/config.php'
log.info('INFO: Starting poller-service')
class DB:
conn = None
def connect(self):
if db_port == 0:
self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
else:
self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
self.conn.autocommit(True)
def query(self, sql):
try:
cursor = self.conn.cursor()
cursor.execute(sql)
except (AttributeError, MySQLdb.OperationalError):
self.connect()
cursor = self.conn.cursor()
cursor.execute(sql)
return cursor
def get_config_data():
config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir]
try:
@ -130,25 +151,7 @@ try:
except KeyError:
down_retry = 60
def connectDB():
try:
if db_port == 0:
db_inst = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
else:
db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
db_inst.autocommit(True)
cursor_inst = db_inst.cursor()
ret = namedtuple('db_connection', ['db', 'cursor'])
ret.db = db_inst
ret.cursor = cursor_inst
return ret
except:
log.critical("ERROR: Could not connect to MySQL database!")
sys.exit(2)
db_connection = connectDB()
db = db_connection.db
cursor = db_connection.cursor
db = DB()
def poll_worker(device_id, action):
try:
@ -172,25 +175,23 @@ def poll_worker(device_id, action):
def lockFree(lock, cursor=cursor):
def lockFree(lock, db=db):
query = "SELECT IS_FREE_LOCK('{0}')".format(lock)
cursor.execute(query)
cursor = db.query(query)
return cursor.fetchall()[0][0] == 1
def getLock(lock, cursor=cursor):
def getLock(lock, db=db):
query = "SELECT GET_LOCK('{0}', 0)".format(lock)
cursor.execute(query)
cursor = db.query(query)
return cursor.fetchall()[0][0] == 1
thread_cursors = []
for i in range(0, amount_of_workers):
thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'cursor', 'db']))
thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'db']))
thread_cursors[i].in_use = False
thread_db_connection = connectDB()
thread_cursors[i].cursor = thread_db_connection.cursor
thread_cursors[i].db = thread_db_connection.db
thread_cursors[i].db = DB()
def getThreadQueueLock(device_id):
@ -200,7 +201,7 @@ def getThreadQueueLock(device_id):
for thread_cursor in thread_cursors:
if not thread_cursor.in_use:
thread_cursor.in_use = 'queue.{0}'.format(device_id)
if getLock('queue.{0}'.format(device_id), thread_cursor.cursor):
if getLock('queue.{0}'.format(device_id), thread_cursor.db):
return True
else:
thread_cursor.in_use = False
@ -214,8 +215,8 @@ def getThreadActionLock(device_id, action):
for thread_cursor in thread_cursors:
if thread_cursor.in_use == 'queue.{0}'.format(device_id):
thread_cursor.in_use = '{0}.{1}'.format(action, device_id)
releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor)
if getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor):
releaseLock('queue.{0}'.format(device_id), thread_cursor.db)
if getLock('{0}.{1}'.format(action, device_id), thread_cursor.db):
return True
else:
thread_cursor.in_use = False
@ -223,9 +224,9 @@ def getThreadActionLock(device_id, action):
return False
def releaseLock(lock, cursor=cursor):
def releaseLock(lock, db=db):
query = "SELECT RELEASE_LOCK('{0}')".format(lock)
cursor.execute(query)
cursor = db.query(query)
return cursor.fetchall()[0][0] == 1
@ -234,7 +235,7 @@ def releaseThreadLock(device_id, action):
for thread_cursor in thread_cursors:
if thread_cursor.in_use == '{0}.{1}'.format(action, device_id):
thread_cursor.in_use = False
return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor)
return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.db)
return False
@ -308,7 +309,7 @@ while True:
devices_scanned,
seconds_taken)
try:
cursor.execute(update_query)
cursor = db.query(update_query)
except:
log.critical('ERROR: MySQL query error. Is your schema up to date?')
sys.exit(2)
@ -318,7 +319,7 @@ while True:
next_update = datetime.now() + timedelta(minutes=1)
try:
cursor.execute(dev_query)
cursor = db.query(dev_query)
except:
log.critical('ERROR: MySQL query error. Is your schema up to date?')
sys.exit(2)
@ -339,7 +340,7 @@ while True:
log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id))
devices_scanned += 1
cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id))
cursor = db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id))
cursor.fetchall()
if not getThreadActionLock(device_id, action):