from wtforms import SubmitField, StringField, FloatField, HiddenField, validators, Form from flask_wtf import FlaskForm from flask import request, render_template from main import db, app, ma from sqlalchemy import Sequence from sqlalchemy.exc import SQLAlchemyError from status import st, Status from datetime import datetime, timedelta from flask_login import login_required, current_user import pytz import socket from shared import PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT from flask_login import login_required, current_user # pylint: disable=no-member ################################################################################ # Class describing extra/specific info for a Job and via sqlalchemy, connected to the DB as well ################################################################################ class JobExtra(db.Model): __tablename__ = "jobextra" id = db.Column(db.Integer, db.Sequence('jobextra_id_seq'), primary_key=True ) job_id = db.Column(db.Integer, db.ForeignKey('job.id') ) name = db.Column(db.String) value = db.Column(db.String) def __repr__(self): return "".format(self.id, self.job_id, self.name, self.value ) ################################################################################ # Class describing logs for each job and via sqlalchemy, connected to the DB as well ################################################################################ class Joblog(db.Model): id = db.Column(db.Integer, db.Sequence('joblog_id_seq'), primary_key=True ) job_id = db.Column(db.Integer, db.ForeignKey('job.id'), primary_key=True ) log_date = db.Column(db.DateTime(timezone=True)) log = db.Column(db.String) def __repr__(self): return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) ################################################################################ # Used in main html to show a red badge of # jobs to draw attention there are # active jobs being processed in the background ################################################################################ def GetNumActiveJobs(): ret = db.engine.execute("select count(1) from job where pa_job_state is distinct from 'Completed'").first() return ret.count ################################################################################ # this function uses sockets to force wake the job mgr / option from Admin menu # should never really be needed, but was useful when developing / the job # engine got 'stuck' when jobs were run in parallel ################################################################################ def WakePAJobManager(): try: print("Waking up PA Job Manager") s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) s.close() except Exception as e: st.SetMessage(f"Failed to connect to job manager, has it crashed? Exception was:{e}", "danger") return ############################################################################### # NewJob takes a name (which will be matched in pa_job_manager.py to run # the appropriate job - which will update the Job() until complete ############################################################################### def NewJob(name, num_files="0", wait_for=None, jex=None ): job=Job(start_time='now()', last_update='now()', name=name, state="New", num_files=num_files, current_file_num=0, current_file='', wait_for=wait_for, pa_job_state="New" ) if jex != None: for e in jex: job.extra.append(e) db.session.add(job) db.session.commit() WakePAJobManager() return job ################################################################################ # /jobs -> show current settings ################################################################################ @app.route("/jobs", methods=["GET"]) @login_required def jobs(): page_title='Job list' jobs = Job.query.order_by(Job.id.desc()).all() return render_template("jobs.html", jobs=jobs, page_title=page_title) ############################################################################### # /job/ -> GET -> shows status/history of jobs ################################################################################ @app.route("/job/", methods=["GET","POST"]) @login_required def joblog(id): page_title='Show Job Details' joblog = Job.query.get(id) log_cnt = db.session.execute( f"select count(id) from joblog where job_id = {id}" ).first()[0] first_logs_only = True if request.method == 'POST': logs=Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).all() first_logs_only = False else: logs=Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).limit(50).all() if joblog.pa_job_state == "Completed": duration=(joblog.last_update-joblog.start_time) else: duration=(datetime.now(pytz.utc)-joblog.start_time) duration= duration-timedelta(microseconds=duration.microseconds) estimate=None duration_s = duration.total_seconds() # if job is old, not completed, and we have num_files and current_file_num > 0 so we can work out an estimated duration if duration_s > 300 and joblog.pa_job_state != "Completed" and joblog.current_file_num and joblog.num_files: estimate_s = duration_s / joblog.current_file_num * joblog.num_files estimate = timedelta( seconds=(estimate_s-duration_s) ) estimate = estimate - timedelta(microseconds=estimate.microseconds) return render_template("joblog.html", job=joblog, logs=logs, log_cnt=log_cnt, duration=duration, page_title=page_title, first_logs_only=first_logs_only, estimate=estimate) ############################################################################### # /wakeup -> GET -> forces the job manager to wake up, and check the queue # should not be needed, but in DEV can be helpful ################################################################################ @app.route("/wakeup", methods=["GET"]) @login_required def wakeup(): WakePAJobManager() return render_template("base.html") ################################################################################ @app.route("/stale_job/", methods=["GET", "POST"]) @login_required def stale_job(id): print( f"Handle Stale Job#{id} -> {request.form['action']} it") job=Job.query.get(id) now=datetime.now(pytz.utc) db.session.add(job) if request.form['action'] == "restart": log=Joblog( job_id=id, log="(Stale) Job restarted manually by user", log_date=now ) job.pa_job_state='New' job.state='New' elif request.form['action'] == "cancel": log=Joblog( job_id=id, log="(Stale) Job withdrawn manually by user", log_date=now ) job.pa_job_state='Completed' job.state='Withdrawn' job.last_update=now db.session.add(log) # clear out message for this job being stale (and do this via raw sql to # avoid circulr import) db.engine.execute( f"delete from pa_job_manager_fe_message where job_id = {id}" ) db.session.commit() WakePAJobManager() return render_template("base.html") ################################################################################ @app.route("/stale_jobs", methods=["GET", "POST"]) @login_required def stale_jobs(): page_title='Stale job list' jobs = Job.query.filter(Job.pa_job_state=='Stale').order_by(Job.id.desc()).all() return render_template("jobs.html", jobs=jobs, page_title=page_title) ############################################################################### # This func creates a new filter in jinja2 to format the time from the db in a # way that is more readable (converted to local tz too) ################################################################################ @app.template_filter('vicdate') def _jinja2_filter_datetime(date, fmt=None): return date.strftime("%d/%m/%Y %I:%M:%S %p")