343 lines
16 KiB
Python
343 lines
16 KiB
Python
from wtforms import SubmitField, StringField, FloatField, HiddenField, validators, Form
|
|
from flask_wtf import FlaskForm
|
|
from flask import request, render_template, redirect, make_response, jsonify, url_for
|
|
from settings import Settings
|
|
from main import db, app, ma
|
|
from sqlalchemy import Sequence, func
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from status import st, Status
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
import socket
|
|
from shared import PA, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, NEWEST_LOG_LIMIT, OLDEST_LOG_LIMIT
|
|
from flask_login import login_required, current_user
|
|
from sqlalchemy.dialects.postgresql import INTERVAL
|
|
from sqlalchemy.sql.functions import concat
|
|
|
|
# pylint: disable=no-member
|
|
|
|
################################################################################
|
|
# Class describing extra/specific info for a Job and via sqlalchemy, connected to the DB as well
|
|
################################################################################
|
|
class JobExtra(db.Model):
|
|
__tablename__ = "jobextra"
|
|
id = db.Column(db.Integer, db.Sequence('jobextra_id_seq'), primary_key=True )
|
|
job_id = db.Column(db.Integer, db.ForeignKey('job.id') )
|
|
name = db.Column(db.String)
|
|
value = db.Column(db.String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
|
|
|
|
################################################################################
|
|
# Class describing logs for each job and via sqlalchemy, connected to the DB as well
|
|
################################################################################
|
|
class Joblog(db.Model):
|
|
id = db.Column(db.Integer, db.Sequence('joblog_id_seq'), primary_key=True )
|
|
job_id = db.Column(db.Integer, db.ForeignKey('job.id'), primary_key=True )
|
|
log_date = db.Column(db.DateTime(timezone=True))
|
|
log = db.Column(db.String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, log: {}".format(self.id, self.job_id, self.log )
|
|
|
|
################################################################################
|
|
# Class describing Job and via sqlalchemy, connected to the DB as well
|
|
################################################################################
|
|
class Job(db.Model):
|
|
id = db.Column(db.Integer, db.Sequence('job_id_seq'), primary_key=True )
|
|
start_time = db.Column(db.DateTime(timezone=True))
|
|
last_update = db.Column(db.DateTime(timezone=True))
|
|
name = db.Column(db.String)
|
|
state = db.Column(db.String)
|
|
num_files = db.Column(db.Integer)
|
|
current_file_num = db.Column(db.Integer)
|
|
current_file = db.Column(db.String)
|
|
wait_for = db.Column(db.Integer)
|
|
pa_job_state = db.Column(db.String)
|
|
|
|
extra = db.relationship( "JobExtra")
|
|
logs = db.relationship( "Joblog")
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
|
|
|
|
################################################################################
|
|
# Class describing PA_JobManager_Message and in the DB (via sqlalchemy)
|
|
# the job manager can send a message back to the front end (this code) via the
|
|
# DB. has to be about a specific job_id and is success/danger, etc. (alert)
|
|
# and a message
|
|
################################################################################
|
|
class PA_JobManager_Message(db.Model):
|
|
__tablename__ = "pa_job_manager_fe_message"
|
|
id = db.Column(db.Integer, db.Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
|
|
job_id = db.Column(db.Integer, db.ForeignKey('job.id') )
|
|
alert = db.Column(db.String)
|
|
message = db.Column(db.String)
|
|
persistent = db.Column(db.Boolean)
|
|
cant_close = db.Column(db.Boolean)
|
|
job = db.relationship ("Job" )
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, job_id: {self.job_id}, alert: {self.alert}, message: {self.message}, job: {self.job}"
|
|
|
|
|
|
################################################################################
|
|
# GetJM_Message: used in html to display any message for this front-end
|
|
################################################################################
|
|
def GetJM_Message():
|
|
msg=PA_JobManager_Message.query.first()
|
|
return msg
|
|
|
|
################################################################################
|
|
# ClearJM_Message: used in html to clear any message just displayed
|
|
################################################################################
|
|
def ClearJM_Message(id):
|
|
print(f"DDP: DID NOT clear JM message: {id}" )
|
|
return
|
|
PA_JobManager_Message.query.filter(PA_JobManager_Message.id==id).delete()
|
|
db.session.commit()
|
|
return
|
|
|
|
################################################################################
|
|
# Used in main html to show a red badge of # jobs to draw attention there are
|
|
# active jobs being processed in the background
|
|
################################################################################
|
|
def GetNumActiveJobs():
|
|
ret = db.engine.execute("select count(1) from job where pa_job_state is distinct from 'Completed'").first()
|
|
return ret.count
|
|
|
|
################################################################################
|
|
# this function uses sockets to force wake the job mgr / option from Admin menu
|
|
# should never really be needed, but was useful when developing / the job
|
|
# engine got 'stuck' when jobs were run in parallel
|
|
################################################################################
|
|
def WakePAJobManager():
|
|
try:
|
|
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.connect((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
|
s.close()
|
|
except Exception as e:
|
|
st.SetMessage(f"Failed to connect to job manager, has it crashed? Exception was:{e}", "danger")
|
|
return
|
|
|
|
###############################################################################
|
|
# NewJob takes a name (which will be matched in pa_job_manager.py to run
|
|
# the appropriate job - which will update the Job() until complete
|
|
###############################################################################
|
|
def NewJob(name, num_files="0", wait_for=None, jex=None ):
|
|
job=Job(start_time='now()', last_update='now()', name=name, state="New", num_files=num_files,
|
|
current_file_num=0, current_file='',
|
|
wait_for=wait_for, pa_job_state="New" )
|
|
if jex != None:
|
|
for e in jex:
|
|
job.extra.append(e)
|
|
|
|
db.session.add(job)
|
|
db.session.commit()
|
|
|
|
WakePAJobManager()
|
|
return job
|
|
|
|
################################################################################
|
|
# WithdrawDependantJobs: for a stale job (pa_job_mgr restarts and a job is not
|
|
# finished), this function is called if the user chooses to cancel the job. It
|
|
# cancels this job, and any dependant jobs as well
|
|
################################################################################
|
|
def WithdrawDependantJobs( job, id, reason ):
|
|
for j in Job.query.filter(Job.wait_for==id).all():
|
|
FinishJob(j, f"Job (#{j.id}) has been cancelled -- #{job.id} {reason}", "Withdrawn" )
|
|
WithdrawDependantJobs(j, j.id, reason)
|
|
return
|
|
|
|
##############################################################################
|
|
# FinishJob(): finish this job off (if no overrides), its just marked completed
|
|
##############################################################################
|
|
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
|
job.state=state
|
|
job.pa_job_state=pa_job_state
|
|
job.last_update=datetime.now(pytz.utc)
|
|
log=Joblog( job_id=job.id, log=last_log, log_date=job.last_update )
|
|
db.session.add(log)
|
|
if job.state=="Failed":
|
|
WithdrawDependantJobs( job, job.id, "dependant job failed" )
|
|
db.session.commit()
|
|
return
|
|
|
|
################################################################################
|
|
# /jobs -> show jobs (default to only showing 'non-archived' jobs -- age is in
|
|
# settings.job_archive_age
|
|
################################################################################
|
|
@app.route("/jobs", methods=["GET", "POST"])
|
|
@login_required
|
|
def jobs():
|
|
settings = Settings.query.first()
|
|
if request.method == 'POST':
|
|
page_title='Job list (all)'
|
|
jobs = Job.query.order_by(Job.id.desc()).all()
|
|
else:
|
|
page_title='Job list (recent)'
|
|
jobs = Job.query.filter( Job.last_update >= (func.now() - func.cast(concat(settings.job_archive_age, 'DAYS'), INTERVAL)) ).order_by(Job.id.desc()).all()
|
|
return render_template("jobs.html", jobs=jobs, page_title=page_title)
|
|
|
|
|
|
###############################################################################
|
|
# /job/<id> -> GET -> shows status/history of jobs
|
|
################################################################################
|
|
@app.route("/job/<id>", methods=["GET","POST"])
|
|
@login_required
|
|
def joblog(id):
|
|
joblog = Job.query.get(id)
|
|
|
|
if request.method == 'POST':
|
|
logs=Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).all()
|
|
display_more=False
|
|
order="asc"
|
|
refresh=False
|
|
else:
|
|
refresh=True
|
|
log_cnt = db.session.execute( f"select count(id) from joblog where job_id = {id}" ).first()[0]
|
|
newest_logs = Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date.desc()).limit(NEWEST_LOG_LIMIT).all()
|
|
oldest_logs = Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).limit(OLDEST_LOG_LIMIT).all()
|
|
logs=sorted( set( oldest_logs + newest_logs ), key=lambda el: el.log_date )
|
|
if log_cnt > (NEWEST_LOG_LIMIT+OLDEST_LOG_LIMIT):
|
|
display_more=True
|
|
else:
|
|
display_more=False
|
|
order="desc"
|
|
|
|
if joblog.start_time:
|
|
if joblog.pa_job_state == "Completed":
|
|
duration=(joblog.last_update-joblog.start_time)
|
|
elif joblog.start_time:
|
|
duration=(datetime.now(pytz.utc)-joblog.start_time)
|
|
duration= duration-timedelta(microseconds=duration.microseconds)
|
|
estimate=None
|
|
duration_s = duration.total_seconds()
|
|
# if job is old, not completed, and we have num_files and current_file_num > 0 so we can work out an estimated duration
|
|
if duration_s > 300 and joblog.pa_job_state != "Completed" and joblog.current_file_num and joblog.num_files:
|
|
estimate_s = duration_s / joblog.current_file_num * joblog.num_files
|
|
estimate = timedelta( seconds=(estimate_s-duration_s) )
|
|
estimate = estimate - timedelta(microseconds=estimate.microseconds)
|
|
else:
|
|
duration="N/A"
|
|
estimate=None
|
|
return render_template("joblog.html", job=joblog, logs=logs, duration=duration, display_more=display_more, order=order, estimate=estimate, refresh=refresh)
|
|
|
|
###############################################################################
|
|
# /wakeup -> GET -> forces the job manager to wake up, and check the queue
|
|
# should not be needed, but in DEV can be helpful
|
|
################################################################################
|
|
@app.route("/wakeup", methods=["GET"])
|
|
@login_required
|
|
def wakeup():
|
|
WakePAJobManager()
|
|
return redirect("/")
|
|
|
|
################################################################################
|
|
@app.route("/stale_job/<id>", methods=["POST"])
|
|
@login_required
|
|
def stale_job(id):
|
|
job=Job.query.get(id)
|
|
now=datetime.now(pytz.utc)
|
|
db.session.add(job)
|
|
if request.form['action'] == "restart":
|
|
log=Joblog( job_id=id, log="(Stale) Job restarted manually by user", log_date=now )
|
|
job.pa_job_state='New'
|
|
job.state='New'
|
|
# reset state of job too, it should seem as new, and recalc/reset these items
|
|
job.num_files=0
|
|
job.current_file=''
|
|
job.current_file_num=0
|
|
job.last_update=now
|
|
db.session.add(log)
|
|
elif request.form['action'] == "cancel":
|
|
WithdrawDependantJobs( job, job.id, "(Stale) Job withdrawn manually by user" )
|
|
FinishJob(job, f"Job (#{job.id}) (Stale) Job withdrawn manually by user", "Withdrawn" )
|
|
|
|
# clear out message for this job being stale (and do this via raw sql to
|
|
# avoid circulr import)
|
|
db.engine.execute( f"delete from pa_job_manager_fe_message where job_id = {id}" )
|
|
|
|
db.session.commit()
|
|
WakePAJobManager()
|
|
return redirect("/jobs")
|
|
|
|
################################################################################
|
|
# list jobs that are maked stale
|
|
################################################################################
|
|
@app.route("/stale_jobs", methods=["GET"])
|
|
@login_required
|
|
def stale_jobs():
|
|
page_title='Stale job list'
|
|
jobs = Job.query.filter(Job.pa_job_state=='Stale').order_by(Job.id.desc()).all()
|
|
return render_template("jobs.html", jobs=jobs, page_title=page_title)
|
|
|
|
|
|
################################################################################
|
|
# retrieve any log entries across ALL jobs that relate to this entry
|
|
# used in viewer on button click
|
|
################################################################################
|
|
@app.route("/joblog_search", methods=["POST"])
|
|
@login_required
|
|
def joblog_search():
|
|
eid=request.form['eid']
|
|
ent_cursor=db.engine.execute( f"select name from entry where id = {eid}" )
|
|
for ent in ent_cursor:
|
|
jobs_cursor=db.engine.execute( f"select l.log, j.id, j.name, j.state, l.log_date from joblog l, job j where l.job_id = j.id and l.log ilike '%%{ent[0]}%%' order by l.log_date")
|
|
|
|
# turn DB output into json and return it to the f/e
|
|
ret='[ '
|
|
first_job=1
|
|
last_job_id = -1
|
|
for j in jobs_cursor:
|
|
if not first_job:
|
|
ret +=", "
|
|
ret+= '{'
|
|
ret+= f'"id":"{j.id}", '
|
|
ret+= f'"name":"{j.name}", '
|
|
ret+= f'"log_date":"{j.log_date}", '
|
|
ret+= f'"log": "{j.log}"'
|
|
ret+= '}'
|
|
first_job=0
|
|
ret+= ' ]'
|
|
return ret
|
|
|
|
|
|
###############################################################################
|
|
# / -> POST -> looks for pa_job_manager status to F/E jobs and sends json of
|
|
# them back to F/E (called form internal/js/jobs.js:CheckForJobs()
|
|
################################################################################
|
|
@app.route("/checkforjobs", methods=["POST"])
|
|
@login_required
|
|
def CheckForJobs():
|
|
num=GetNumActiveJobs()
|
|
print( f"called: /checkforjobs -- num={num}" )
|
|
sts=[]
|
|
for msg in PA_JobManager_Message.query.all():
|
|
u='<a class="link-light" href="' + url_for('joblog', id=msg.job_id) + '">Job #' + str(msg.job_id) + '</a>: '
|
|
sts.append( { 'message': u+msg.message, 'alert': msg.alert, 'job_id': msg.job_id, 'persistent': msg.persistent, 'cant_close': msg.cant_close } )
|
|
return make_response( jsonify( num_active_jobs=num, sts=sts ) )
|
|
|
|
###############################################################################
|
|
# / -> POST -> looks for pa_job_manager status to F/E jobs and sends json of
|
|
# them back to F/E (called form internal/js/jobs.js:CheckForJobs()
|
|
################################################################################
|
|
@app.route("/clearmsgforjob/<id>", methods=["POST"])
|
|
@login_required
|
|
def ClearMessageForJob(id):
|
|
PA_JobManager_Message.query.filter(PA_JobManager_Message.job_id==id).delete()
|
|
db.session.commit()
|
|
# no real need for this response, as if it succeeded/failed the F/E ignores it
|
|
return make_response( jsonify( status="success" ) )
|
|
|
|
###############################################################################
|
|
# This func creates a new filter in jinja2 to format the time from the db in a
|
|
# way that is more readable (converted to local tz too)
|
|
################################################################################
|
|
@app.template_filter('vicdate')
|
|
def _jinja2_filter_datetime(date, fmt=None):
|
|
if date:
|
|
return date.strftime("%d/%m/%Y %I:%M:%S %p")
|
|
else:
|
|
return "N/A"
|