thread locking in python

This commit is contained in:
Clint Armstrong 2015-09-02 11:54:27 -04:00
parent 49c786f394
commit 31cb8dddb7

View File

@ -49,27 +49,6 @@ config_file = install_dir + '/config.php'
log.info('INFO: Starting poller-service')
class DB:
conn = None
def connect(self):
if db_port == 0:
self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
else:
self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
self.conn.autocommit(True)
def query(self, sql):
try:
cursor = self.conn.cursor()
cursor.execute(sql)
except (AttributeError, MySQLdb.OperationalError):
self.connect()
cursor = self.conn.cursor()
cursor.execute(sql)
return cursor
def get_config_data():
config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir]
try:
@ -151,7 +130,20 @@ try:
except KeyError:
down_retry = 60
db = DB()
def connectDB():
try:
if db_port == 0:
db_inst = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
else:
db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
db_inst.autocommit(True)
cursor_inst = db_inst.cursor()
return cursor_inst
except:
log.critical("ERROR: Could not connect to MySQL database!")
sys.exit(2)
cursor = connectDB()
def poll_worker(device_id, action):
try:
@ -160,7 +152,9 @@ def poll_worker(device_id, action):
if action == 'discovery':
path = discover_path
command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id)
if getThreadLock('{0}.{1}'.format(action, device_id)):
subprocess.check_call(command, shell=True)
releaseThreadLock('{0}.{1}'.format(action, device_id))
elapsed_time = int(time.time() - start_time)
if elapsed_time < 300:
log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time))
@ -170,72 +164,48 @@ def poll_worker(device_id, action):
raise
except:
pass
finally:
releaseThreadLock(device_id, action)
def lockFree(lock, db=db):
def lockFree(lock, cursor=cursor):
query = "SELECT IS_FREE_LOCK('{0}')".format(lock)
cursor = db.query(query)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
def getLock(lock, db=db):
def getLock(lock, cursor=cursor):
query = "SELECT GET_LOCK('{0}', 0)".format(lock)
cursor = db.query(query)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
thread_cursors = []
for i in range(0, amount_of_workers):
thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'db']))
thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor']))
thread_cursors[i].in_use = False
thread_cursors[i].db = DB()
thread_cursors[i].cursor = connectDB
def getThreadQueueLock(device_id):
def getThreadLock(lock):
global thread_cursors
# This is how threads are limited, by the numver of cursors available
while True:
for thread_cursor in thread_cursors:
if not thread_cursor.in_use:
thread_cursor.in_use = 'queue.{0}'.format(device_id)
if getLock('queue.{0}'.format(device_id), thread_cursor.db):
return True
else:
thread_cursor.in_use = False
return False
time.sleep(.5)
def getThreadActionLock(device_id, action):
global thread_cursors
# This is how threads are limited, by the numver of cursors available
for thread_cursor in thread_cursors:
if thread_cursor.in_use == 'queue.{0}'.format(device_id):
thread_cursor.in_use = '{0}.{1}'.format(action, device_id)
releaseLock('queue.{0}'.format(device_id), thread_cursor.db)
if getLock('{0}.{1}'.format(action, device_id), thread_cursor.db):
return True
else:
thread_cursor.in_use = False
return False
thread_cursor.in_use = lock
return getLock(lock, thread_cursor.cursor)
return False
def releaseLock(lock, db=db):
def releaseLock(lock, cursor=cursor):
query = "SELECT RELEASE_LOCK('{0}')".format(lock)
cursor = db.query(query)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
def releaseThreadLock(device_id, action):
def releaseThreadLock(lock):
global thread_cursors
for thread_cursor in thread_cursors:
if thread_cursor.in_use == '{0}.{1}'.format(action, device_id):
if thread_cursor.in_use == lock:
thread_cursor.in_use = False
return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.db)
return releaseLock(lock, thread_cursor.cursor)
return False
@ -260,7 +230,7 @@ dev_query = ('SELECT device_id, status,
' INTERVAL last_polled_timetaken SECOND '
' ), '
' INTERVAL {0} SECOND) '
' AS DATETIME '
' AS DATETIME(0) '
') AS next_poll, '
'CAST( '
' DATE_ADD( '
@ -269,13 +239,12 @@ dev_query = ('SELECT device_id, status,
' INTERVAL last_discovered_timetaken SECOND '
' ), '
' INTERVAL {1} SECOND) '
' AS DATETIME '
' AS DATETIME(0) '
') as next_discovery '
'FROM devices WHERE '
'disabled = 0 '
'AND IS_FREE_LOCK(CONCAT("poll.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("discovery.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("queue.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("queued.", device_id)) '
'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) '
' OR last_poll_attempted IS NULL ) '
'{3} '
@ -293,7 +262,7 @@ while True:
cur_threads = threading.active_count()
if cur_threads != threads:
threads = cur_threads
log.debug('DEBUG: {0} threads currently active'.format(str(threads - 1)))
log.debug('DEBUG: {0} threads currently active'.format(threads))
if next_update < datetime.now():
seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds
@ -309,7 +278,7 @@ while True:
devices_scanned,
seconds_taken)
try:
cursor = db.query(update_query)
cursor.execute(update_query)
except:
log.critical('ERROR: MySQL query error. Is your schema up to date?')
sys.exit(2)
@ -318,15 +287,23 @@ while True:
devices_scanned = 0
next_update = datetime.now() + timedelta(minutes=1)
while threading.active_count() >= amount_of_workers or not lockFree('schema_update'):
time.sleep(.5)
try:
cursor = db.query(dev_query)
cursor.execute(dev_query)
except:
log.critical('ERROR: MySQL query error. Is your schema up to date?')
sys.exit(2)
devices = cursor.fetchall()
for device_id, status, next_poll, next_discovery in devices:
if not getThreadQueueLock(device_id):
# add queue lock, so we lock the next device against any other pollers
# if this fails, the device is locked by another poller already
if not getLock('queued.{0}'.format(device_id)):
continue
if not lockFree('polling.{0}'.format(device_id)):
releaseLock('queued.{0}'.format(device_id))
continue
if next_poll and next_poll > datetime.now():
@ -340,12 +317,9 @@ while True:
log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id))
devices_scanned += 1
cursor = db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id))
cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id))
cursor.fetchall()
if not getThreadActionLock(device_id, action):
continue
t = threading.Thread(target=poll_worker, args=[device_id, action])
t.start()