Files
photoassistant/job.py

161 lines
7.6 KiB
Python

from wtforms import SubmitField, StringField, FloatField, HiddenField, validators, Form
from flask_wtf import FlaskForm
from flask import request, render_template, redirect
from main import db, app, ma
from sqlalchemy import Sequence
from sqlalchemy.exc import SQLAlchemyError
from status import st, Status
from datetime import datetime, timedelta
from flask_login import login_required, current_user
import pytz
import socket
from shared import PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
from flask_login import login_required, current_user
# pylint: disable=no-member
################################################################################
# Class describing extra/specific info for a Job and via sqlalchemy, connected to the DB as well
################################################################################
class JobExtra(db.Model):
__tablename__ = "jobextra"
id = db.Column(db.Integer, db.Sequence('jobextra_id_seq'), primary_key=True )
job_id = db.Column(db.Integer, db.ForeignKey('job.id') )
name = db.Column(db.String)
value = db.Column(db.String)
def __repr__(self):
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
################################################################################
# Class describing logs for each job and via sqlalchemy, connected to the DB as well
################################################################################
class Joblog(db.Model):
id = db.Column(db.Integer, db.Sequence('joblog_id_seq'), primary_key=True )
job_id = db.Column(db.Integer, db.ForeignKey('job.id'), primary_key=True )
log_date = db.Column(db.DateTime(timezone=True))
log = db.Column(db.String)
def __repr__(self):
return "<id: {}, job_id: {}, log: {}".format(self.id, self.job_id, self.log )
################################################################################
# Class describing Job and via sqlalchemy, connected to the DB as well
################################################################################
class Job(db.Model):
id = db.Column(db.Integer, db.Sequence('job_id_seq'), primary_key=True )
start_time = db.Column(db.DateTime(timezone=True))
last_update = db.Column(db.DateTime(timezone=True))
name = db.Column(db.String)
state = db.Column(db.String)
num_files = db.Column(db.Integer)
current_file_num = db.Column(db.Integer)
current_file = db.Column(db.String)
wait_for = db.Column(db.Integer)
pa_job_state = db.Column(db.String)
extra = db.relationship( "JobExtra")
logs = db.relationship( "Joblog")
def __repr__(self):
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
################################################################################
# Used in main html to show a red badge of # jobs to draw attention there are
# active jobs being processed in the background
################################################################################
def GetNumActiveJobs():
ret = db.engine.execute("select count(1) from job where pa_job_state is distinct from 'Completed'").first()
return ret.count
################################################################################
# this function uses sockets to force wake the job mgr / option from Admin menu
# should never really be needed, but was useful when developing / the job
# engine got 'stuck' when jobs were run in parallel
################################################################################
def WakePAJobManager():
try:
print("Waking up PA Job Manager")
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
s.close()
except Exception as e:
st.SetMessage(f"Failed to connect to job manager, has it crashed? Exception was:{e}", "danger")
return
###############################################################################
# NewJob takes a name (which will be matched in pa_job_manager.py to run
# the appropriate job - which will update the Job() until complete
###############################################################################
def NewJob(name, num_files="0", wait_for=None, jex=None ):
job=Job(start_time='now()', last_update='now()', name=name, state="New", num_files=num_files,
current_file_num=0, current_file='',
wait_for=wait_for, pa_job_state="New" )
if jex != None:
for e in jex:
job.extra.append(e)
db.session.add(job)
db.session.commit()
WakePAJobManager()
return job
################################################################################
# /jobs -> show current settings
################################################################################
@app.route("/jobs", methods=["GET"])
@login_required
def jobs():
page_title='Job list'
jobs = Job.query.order_by(Job.id.desc()).all()
return render_template("jobs.html", jobs=jobs, page_title=page_title)
###############################################################################
# /job/<id> -> GET -> shows status/history of jobs
################################################################################
@app.route("/job/<id>", methods=["GET","POST"])
@login_required
def joblog(id):
page_title='Show Job Details'
joblog = Job.query.get(id)
log_cnt = db.session.execute( f"select count(id) from joblog where job_id = {id}" ).first()[0]
first_logs_only = True
if request.method == 'POST':
logs=Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).all()
first_logs_only = False
else:
logs=Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).limit(50).all()
if joblog.pa_job_state == "Completed":
duration=(joblog.last_update-joblog.start_time)
else:
duration=(datetime.now(pytz.utc)-joblog.start_time)
duration= duration-timedelta(microseconds=duration.microseconds)
estimate=None
duration_s = duration.total_seconds()
# if job is old, not completed, and we have num_files and current_file_num > 0 so we can work out an estimated duration
if duration_s > 300 and joblog.pa_job_state != "Completed" and joblog.current_file_num and joblog.num_files:
estimate_s = duration_s / joblog.current_file_num * joblog.num_files
estimate = timedelta( seconds=(estimate_s-duration_s) )
estimate = estimate - timedelta(microseconds=estimate.microseconds)
return render_template("joblog.html", job=joblog, logs=logs, log_cnt=log_cnt, duration=duration, page_title=page_title, first_logs_only=first_logs_only, estimate=estimate)
###############################################################################
# /job/<id> -> GET -> shows status/history of jobs
################################################################################
@app.route("/wakeup", methods=["GET"])
@login_required
def wakeup():
WakePAJobManager()
return render_template("base.html")
###############################################################################
# This func creates a new filter in jinja2 to format the time from the db in a
# way that is more readable (converted to local tz too)
################################################################################
@app.template_filter('vicdate')
def _jinja2_filter_datetime(date, fmt=None):
return date.strftime("%d/%m/%Y %I:%M:%S %p")