spiderweb/webapp.py
coulisse ccab3fc220 2.2
2021-12-19 10:17:09 +00:00

319 lines
11 KiB
Python

__author__ = 'IU1BOW - Corrado'
import os
import flask
from flask import request, render_template, jsonify
from flask_wtf.csrf import CSRFProtect
from flask_minify import minify
import json
import time, threading
import logging
import logging.config
from lib.dxtelnet import who
from lib.adxo import get_adxo_events
from lib.qry import query_manager
from lib.cty import prefix_table
logging.config.fileConfig("cfg/webapp_log_config.ini", disable_existing_loggers=True)
logger = logging.getLogger(__name__)
logger.info("Start")
app = flask.Flask(__name__)
app.config["DEBUG"] = False
app.config['SECRET_KEY'] = 'secret!'
app.config.update(
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Strict',
)
csrf = CSRFProtect(app)
minify(app=app, html=True, js=True,cssless=False)
#minify(app=app, html=False, js=False,cssless=False)
#load config file
with open('cfg/config.json') as json_data_file:
cfg = json.load(json_data_file)
logging.debug("CFG:")
logging.debug(cfg)
#load bands file
with open('cfg/bands.json') as json_bands:
band_frequencies = json.load(json_bands)
#load mode file
with open('cfg/modes.json') as json_modes:
modes_frequencies = json.load(json_modes)
#load continents-cq file
with open('cfg/continents.json') as json_continents:
continents_cq = json.load(json_continents)
#read and set default for enabling cq filter
if cfg.get('enable_cq_filter'):
enable_cq_filter=cfg['enable_cq_filter'].upper()
else:
enable_cq_filter='N'
#define country table for search info on callsigns
pfxt=prefix_table()
#create object query manager
qm=query_manager()
#find id in json : ie frequency / continent
def find_id_json(json_object, name):
return [obj for obj in json_object if obj['id']==name][0]
#the main query to show spots
#it gets url parameter in order to apply the build the right query
#and apply the filter required. It returns a json with the spots
def spotquery():
try:
#get url parameters
band=(request.args.getlist('b')) #band filter
dere=(request.args.getlist('e')) #DE continent filter
dxre=(request.args.getlist('x')) #Dx continent filter
mode=(request.args.getlist('m')) #mode filter
decq=(request.args.getlist('qe')) #DE cq zone filter
dxcq=(request.args.getlist('qx')) #DX cq zone filter
callsign=request.args.get('c') #search specific callsign
query_string=''
if callsign:
#construct the query, to show last 6 month
if len(callsign)<=14:
query_string="(SELECT rowid, spotter AS de, freq, spotcall AS dx, comment AS comm, time, spotdxcc from dxcluster.spot WHERE spotter='"+callsign+"'"
query_string+=" ORDER BY rowid desc limit 10)"
query_string+=" UNION "
query_string+="(SELECT rowid, spotter AS de, freq, spotcall AS dx, comment AS comm, time, spotdxcc from dxcluster.spot WHERE spotcall='"+callsign+"'"
query_string+=" ORDER BY rowid desc limit 10);"
else:
logging.warning('callsign too long')
else:
#construct band query decoding frequencies with json file
band_qry_string = ' AND (('
for i in range(len(band)):
freq=find_id_json(band_frequencies["bands"],band[i])
if i > 0:
band_qry_string += ') OR ('
band_qry_string += 'freq BETWEEN ' + str(freq["min"]) + ' AND ' + str(freq["max"])
band_qry_string += '))'
#construct mode query
mode_qry_string = ' AND (('
for i in range(len(mode)):
single_mode=find_id_json(modes_frequencies["modes"],mode[i])
if i > 0:
mode_qry_string +=') OR ('
for j in range(len(single_mode["freq"])):
if j > 0:
mode_qry_string +=') OR ('
mode_qry_string += 'freq BETWEEN ' +str(single_mode["freq"][j]["min"]) + ' AND ' + str(single_mode["freq"][j]["max"])
mode_qry_string += '))'
#construct DE continent region query
dere_qry_string = ' AND spottercq IN ('
for i in range(len(dere)):
continent=find_id_json(continents_cq["continents"],dere[i])
if i > 0:
dere_qry_string +=','
dere_qry_string += str(continent["cq"])
dere_qry_string +=')'
#construct DX continent region query
dxre_qry_string = ' AND spotcq IN ('
for i in range(len(dxre)):
continent=find_id_json(continents_cq["continents"],dxre[i])
if i > 0:
dxre_qry_string +=','
dxre_qry_string += str(continent["cq"])
dxre_qry_string +=')'
if enable_cq_filter == 'Y':
#construct de cq query
decq_qry_string = ''
if len(decq)==1:
if decq[0].isnumeric():
decq_qry_string = ' AND spottercq =' + decq[0]
#construct dx cq query
dxcq_qry_string = ''
if len(dxcq)==1:
if dxcq[0].isnumeric():
dxcq_qry_string = ' AND spotcq =' + dxcq[0]
query_string="SELECT rowid, spotter AS de, freq, spotcall AS dx, comment AS comm, time, spotdxcc from dxcluster.spot WHERE 1=1"
if len(band) > 0:
query_string += band_qry_string
if len(mode) > 0:
query_string += mode_qry_string
if len(dere) > 0:
query_string += dere_qry_string
if len(dxre) > 0:
query_string += dxre_qry_string
if enable_cq_filter == 'Y':
if len(decq_qry_string) > 0:
query_string += decq_qry_string
if len(dxcq_qry_string) > 0:
query_string += dxcq_qry_string
query_string += " ORDER BY rowid desc limit 50;"
logger.debug(query_string)
qm.qry(query_string)
data=qm.get_data()
row_headers=qm.get_headers()
logger.debug("query done")
logger.debug (data)
if data is None or len(data)==0:
logger.warning("no data found")
payload=[]
for result in data:
# create dictionary from recorset
main_result=dict(zip(row_headers,result))
# find the country in prefix table
search_prefix=pfxt.find(main_result["dx"])
# merge recordset and contry prefix
#payload.append({**main_result, **search_prefix})
try:
main_result["country"]=search_prefix["country"]
main_result["iso"]=search_prefix["iso"]
except KeyError:
main_result["country"]='Pirate country'
main_result["iso"]=''
print (main_result)
payload.append({**main_result})
return payload
except Exception as e:
logger.error(e)
#find adxo events
adxo_events=None
def get_adxo():
global adxo_events
adxo_events=get_adxo_events()
threading.Timer(12*3600,get_adxo).start()
get_adxo()
@app.route('/spotlist', methods=['GET'])
def spotlist():
response=flask.Response(json.dumps(spotquery()))
return response
def who_is_connected():
host_port=cfg['telnet'].split(':')
response=who(host_port[0],host_port[1],cfg['mycallsign'])
return response
@app.route('/', methods=['GET'])
@app.route('/index.html', methods=['GET'])
def spots():
payload=spotquery()
response=flask.Response(render_template('index.html',mycallsign=cfg['mycallsign'],telnet=cfg['telnet'],mail=cfg['mail'],menu_list=cfg['menu']['menu_list'],enable_cq_filter=enable_cq_filter,payload=payload,timer_interval=cfg['timer']['interval'],adxo_events=adxo_events))
return response
@app.route('/service-worker.js', methods=['GET'])
def sw():
return app.send_static_file('service-worker.js')
@app.route('/offline.html')
def root():
return app.send_static_file('html/offline.html')
@app.route('/plotlist', methods=['GET'])
def plotlist():
#get url parameters
idxfile=os.path.join(app.root_path,os.path.basename(app.static_url_path),'plots','plots.json')
if os.path.exists(idxfile):
with open(idxfile,'r') as jsonfile:
json_content = json.load(jsonfile)
else:
json_content={}
response=json_content
return response
@app.route('/plots.html')
def plots():
payload=plotlist()
whoj=who_is_connected()
response=flask.Response(render_template('plots.html',mycallsign=cfg['mycallsign'],telnet=cfg['telnet'],mail=cfg['mail'],menu_list=cfg['menu']['menu_list'],payload=payload,timer_interval=cfg['plot_refresh_timer']['interval'],who=whoj))
return response
@app.route('/cookies.html', methods=['GET'])
def cookies():
response=flask.Response(render_template('cookies.html',mycallsign=cfg['mycallsign'],telnet=cfg['telnet'],mail=cfg['mail'],menu_list=cfg['menu']['menu_list']))
return response
@app.route('/privacy.html', methods=['GET'])
def privacy():
response=flask.Response(render_template('privacy.html',mycallsign=cfg['mycallsign'],telnet=cfg['telnet'],mail=cfg['mail'],menu_list=cfg['menu']['menu_list']))
return response
@app.route('/sitemap.xml')
def sitemap():
return app.send_static_file('sitemap.xml')
@app.route('/callsign.html', methods=['GET'])
def callsign():
payload=spotquery()
#country_data=load_country()
callsign=request.args.get('c')
response=flask.Response(render_template('callsign.html',mycallsign=cfg['mycallsign'],telnet=cfg['telnet'],mail=cfg['mail'],menu_list=cfg['menu']['menu_list'],payload=payload,timer_interval=cfg['timer']['interval'],callsign=callsign,adxo_events=adxo_events))
return response
#API that search a callsign and return all informations about that
@app.route('/callsign', methods=['GET'])
def find_callsign():
callsign=request.args.get('c')
response=pfxt.find(callsign)
if response is None:
response=flask.Response(status=204)
return response
@app.context_processor
def inject_template_scope():
injections = dict()
def cookies_check():
value = request.cookies.get('cookie_consent')
return value == 'true'
injections.update(cookies_check=cookies_check)
return injections
@app.after_request
def add_security_headers(resp):
# resp.headers['Content-Security-Policy']='script-src \'self\' cdnjs.cloudflare.com cdn.jsdelivr.net \'unsafe-inline\''
resp.headers['Strict-Transport-Security']='max-age=1000'
resp.headers['X-Xss-Protection']='1; mode=block'
resp.headers['X-Frame-Options']='SAMEORIGIN'
resp.headers['X-Content-Type-Options']='nosniff'
resp.headers['Referrer-Policy']='strict-origin-when-cross-origin'
#resp.headers['Cache-Control']='no-store, max-age=0'
resp.headers['Cache-Control']='no-cache, no-store, must-revalidate'
resp.headers['Pragma']='no-cache'
# resp.headers['Access-Control-Allow-Origin']='https://cdnjs.cloudflare.com'
return resp
if __name__ == '__main__':
app.run(host='0.0.0.0')