Files
photoassistant/job.py

217 lines
9.9 KiB
Python

from wtforms import SubmitField, StringField, FloatField, HiddenField, validators, Form
from flask_wtf import FlaskForm
from flask import request, render_template, redirect
from settings import Settings
from main import db, app, ma
from sqlalchemy import Sequence, func
from sqlalchemy.exc import SQLAlchemyError
from status import st, Status
from datetime import datetime, timedelta
from flask_login import login_required, current_user
import pytz
import socket
from shared import PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, NEWEST_LOG_LIMIT, OLDEST_LOG_LIMIT
from flask_login import login_required, current_user
from sqlalchemy.dialects.postgresql import INTERVAL
from sqlalchemy.sql.functions import concat
# pylint: disable=no-member
################################################################################
# Class describing extra/specific info for a Job and via sqlalchemy, connected to the DB as well
################################################################################
class JobExtra(db.Model):
__tablename__ = "jobextra"
id = db.Column(db.Integer, db.Sequence('jobextra_id_seq'), primary_key=True )
job_id = db.Column(db.Integer, db.ForeignKey('job.id') )
name = db.Column(db.String)
value = db.Column(db.String)
def __repr__(self):
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
################################################################################
# Class describing logs for each job and via sqlalchemy, connected to the DB as well
################################################################################
class Joblog(db.Model):
id = db.Column(db.Integer, db.Sequence('joblog_id_seq'), primary_key=True )
job_id = db.Column(db.Integer, db.ForeignKey('job.id'), primary_key=True )
log_date = db.Column(db.DateTime(timezone=True))
log = db.Column(db.String)
def __repr__(self):
return "<id: {}, job_id: {}, log: {}".format(self.id, self.job_id, self.log )
################################################################################
# Class describing Job and via sqlalchemy, connected to the DB as well
################################################################################
class Job(db.Model):
id = db.Column(db.Integer, db.Sequence('job_id_seq'), primary_key=True )
start_time = db.Column(db.DateTime(timezone=True))
last_update = db.Column(db.DateTime(timezone=True))
name = db.Column(db.String)
state = db.Column(db.String)
num_files = db.Column(db.Integer)
current_file_num = db.Column(db.Integer)
current_file = db.Column(db.String)
wait_for = db.Column(db.Integer)
pa_job_state = db.Column(db.String)
extra = db.relationship( "JobExtra")
logs = db.relationship( "Joblog")
def __repr__(self):
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
################################################################################
# Used in main html to show a red badge of # jobs to draw attention there are
# active jobs being processed in the background
################################################################################
def GetNumActiveJobs():
ret = db.engine.execute("select count(1) from job where pa_job_state is distinct from 'Completed'").first()
return ret.count
################################################################################
# this function uses sockets to force wake the job mgr / option from Admin menu
# should never really be needed, but was useful when developing / the job
# engine got 'stuck' when jobs were run in parallel
################################################################################
def WakePAJobManager():
try:
print("Waking up PA Job Manager")
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
s.close()
except Exception as e:
st.SetMessage(f"Failed to connect to job manager, has it crashed? Exception was:{e}", "danger")
return
###############################################################################
# NewJob takes a name (which will be matched in pa_job_manager.py to run
# the appropriate job - which will update the Job() until complete
###############################################################################
def NewJob(name, num_files="0", wait_for=None, jex=None ):
job=Job(start_time='now()', last_update='now()', name=name, state="New", num_files=num_files,
current_file_num=0, current_file='',
wait_for=wait_for, pa_job_state="New" )
if jex != None:
for e in jex:
job.extra.append(e)
db.session.add(job)
db.session.commit()
WakePAJobManager()
return job
################################################################################
# /jobs -> show jobs (default to only showing 'non-archived' jobs -- age is in
# settings.job_archive_age
################################################################################
@app.route("/jobs", methods=["GET", "POST"])
@login_required
def jobs():
settings = Settings.query.first()
if request.method == 'POST':
page_title='Job list (all)'
jobs = Job.query.order_by(Job.id.desc()).all()
else:
page_title='Job list (recent)'
jobs = Job.query.filter( Job.last_update >= (func.now() - func.cast(concat(settings.job_archive_age, 'DAYS'), INTERVAL)) ).order_by(Job.id.desc()).all()
return render_template("jobs.html", jobs=jobs, page_title=page_title)
###############################################################################
# /job/<id> -> GET -> shows status/history of jobs
################################################################################
@app.route("/job/<id>", methods=["GET","POST"])
@login_required
def joblog(id):
joblog = Job.query.get(id)
if request.method == 'POST':
logs=Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).all()
display_more=False
order="asc"
refresh=False
else:
refresh=True
log_cnt = db.session.execute( f"select count(id) from joblog where job_id = {id}" ).first()[0]
newest_logs = Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date.desc()).limit(NEWEST_LOG_LIMIT).all()
oldest_logs = Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).limit(OLDEST_LOG_LIMIT).all()
logs=sorted( set( oldest_logs + newest_logs ), key=lambda el: el.log_date )
if log_cnt > (NEWEST_LOG_LIMIT+OLDEST_LOG_LIMIT):
display_more=True
else:
display_more=False
order="desc"
if joblog.pa_job_state == "Completed":
duration=(joblog.last_update-joblog.start_time)
else:
duration=(datetime.now(pytz.utc)-joblog.start_time)
duration= duration-timedelta(microseconds=duration.microseconds)
estimate=None
duration_s = duration.total_seconds()
# if job is old, not completed, and we have num_files and current_file_num > 0 so we can work out an estimated duration
if duration_s > 300 and joblog.pa_job_state != "Completed" and joblog.current_file_num and joblog.num_files:
estimate_s = duration_s / joblog.current_file_num * joblog.num_files
estimate = timedelta( seconds=(estimate_s-duration_s) )
estimate = estimate - timedelta(microseconds=estimate.microseconds)
return render_template("joblog.html", job=joblog, logs=logs, duration=duration, display_more=display_more, order=order, estimate=estimate, refresh=refresh)
###############################################################################
# /wakeup -> GET -> forces the job manager to wake up, and check the queue
# should not be needed, but in DEV can be helpful
################################################################################
@app.route("/wakeup", methods=["GET"])
@login_required
def wakeup():
WakePAJobManager()
return render_template("base.html")
################################################################################
@app.route("/stale_job/<id>", methods=["POST"])
@login_required
def stale_job(id):
print( f"Handle Stale Job#{id} -> {request.form['action']} it")
job=Job.query.get(id)
now=datetime.now(pytz.utc)
db.session.add(job)
if request.form['action'] == "restart":
log=Joblog( job_id=id, log="(Stale) Job restarted manually by user", log_date=now )
job.pa_job_state='New'
job.state='New'
elif request.form['action'] == "cancel":
log=Joblog( job_id=id, log="(Stale) Job withdrawn manually by user", log_date=now )
job.pa_job_state='Completed'
job.state='Withdrawn'
job.last_update=now
db.session.add(log)
# clear out message for this job being stale (and do this via raw sql to
# avoid circulr import)
db.engine.execute( f"delete from pa_job_manager_fe_message where job_id = {id}" )
db.session.commit()
WakePAJobManager()
return redirect("/jobs")
################################################################################
@app.route("/stale_jobs", methods=["GET"])
@login_required
def stale_jobs():
page_title='Stale job list'
jobs = Job.query.filter(Job.pa_job_state=='Stale').order_by(Job.id.desc()).all()
return render_template("jobs.html", jobs=jobs, page_title=page_title)
###############################################################################
# This func creates a new filter in jinja2 to format the time from the db in a
# way that is more readable (converted to local tz too)
################################################################################
@app.template_filter('vicdate')
def _jinja2_filter_datetime(date, fmt=None):
return date.strftime("%d/%m/%Y %I:%M:%S %p")