from wtforms import SubmitField, StringField, FloatField, HiddenField, validators, Form from flask_wtf import FlaskForm from flask import request, render_template, redirect, make_response, jsonify, url_for from settings import Settings from main import db, app, ma from sqlalchemy import Sequence, func from sqlalchemy.exc import SQLAlchemyError from status import st, Status from datetime import datetime, timedelta import pytz import socket from shared import PA, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, NEWEST_LOG_LIMIT, OLDEST_LOG_LIMIT from flask_login import login_required, current_user from sqlalchemy.dialects.postgresql import INTERVAL from sqlalchemy.sql.functions import concat # pylint: disable=no-member ################################################################################ # Class describing extra/specific info for a Job and via sqlalchemy, connected to the DB as well ################################################################################ class JobExtra(db.Model): __tablename__ = "jobextra" id = db.Column(db.Integer, db.Sequence('jobextra_id_seq'), primary_key=True ) job_id = db.Column(db.Integer, db.ForeignKey('job.id') ) name = db.Column(db.String) value = db.Column(db.String) def __repr__(self): return "".format(self.id, self.job_id, self.name, self.value ) ################################################################################ # Class describing logs for each job and via sqlalchemy, connected to the DB as well ################################################################################ class Joblog(db.Model): id = db.Column(db.Integer, db.Sequence('joblog_id_seq'), primary_key=True ) job_id = db.Column(db.Integer, db.ForeignKey('job.id'), primary_key=True ) log_date = db.Column(db.DateTime(timezone=True)) log = db.Column(db.String) def __repr__(self): return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) ################################################################################ # Class describing PA_JobManager_Message and in the DB (via sqlalchemy) # the job manager can send a message back to the front end (this code) via the # DB. has to be about a specific job_id and is success/danger, etc. (alert) # and a message ################################################################################ class PA_JobManager_Message(db.Model): __tablename__ = "pa_job_manager_fe_message" id = db.Column(db.Integer, db.Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True ) job_id = db.Column(db.Integer, db.ForeignKey('job.id') ) alert = db.Column(db.String) message = db.Column(db.String) job = db.relationship ("Job" ) def __repr__(self): return f" show jobs (default to only showing 'non-archived' jobs -- age is in # settings.job_archive_age ################################################################################ @app.route("/jobs", methods=["GET", "POST"]) @login_required def jobs(): settings = Settings.query.first() if request.method == 'POST': page_title='Job list (all)' jobs = Job.query.order_by(Job.id.desc()).all() else: page_title='Job list (recent)' jobs = Job.query.filter( Job.last_update >= (func.now() - func.cast(concat(settings.job_archive_age, 'DAYS'), INTERVAL)) ).order_by(Job.id.desc()).all() return render_template("jobs.html", jobs=jobs, page_title=page_title) ############################################################################### # /job/ -> GET -> shows status/history of jobs ################################################################################ @app.route("/job/", methods=["GET","POST"]) @login_required def joblog(id): joblog = Job.query.get(id) if request.method == 'POST': logs=Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).all() display_more=False order="asc" refresh=False else: refresh=True log_cnt = db.session.execute( f"select count(id) from joblog where job_id = {id}" ).first()[0] newest_logs = Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date.desc()).limit(NEWEST_LOG_LIMIT).all() oldest_logs = Joblog.query.filter(Joblog.job_id==id).order_by(Joblog.log_date).limit(OLDEST_LOG_LIMIT).all() logs=sorted( set( oldest_logs + newest_logs ), key=lambda el: el.log_date ) if log_cnt > (NEWEST_LOG_LIMIT+OLDEST_LOG_LIMIT): display_more=True else: display_more=False order="desc" if joblog.start_time: if joblog.pa_job_state == "Completed": duration=(joblog.last_update-joblog.start_time) elif joblog.start_time: duration=(datetime.now(pytz.utc)-joblog.start_time) duration= duration-timedelta(microseconds=duration.microseconds) estimate=None duration_s = duration.total_seconds() # if job is old, not completed, and we have num_files and current_file_num > 0 so we can work out an estimated duration if duration_s > 300 and joblog.pa_job_state != "Completed" and joblog.current_file_num and joblog.num_files: estimate_s = duration_s / joblog.current_file_num * joblog.num_files estimate = timedelta( seconds=(estimate_s-duration_s) ) estimate = estimate - timedelta(microseconds=estimate.microseconds) else: duration="N/A" estimate=None return render_template("joblog.html", job=joblog, logs=logs, duration=duration, display_more=display_more, order=order, estimate=estimate, refresh=refresh) ############################################################################### # /wakeup -> GET -> forces the job manager to wake up, and check the queue # should not be needed, but in DEV can be helpful ################################################################################ @app.route("/wakeup", methods=["GET"]) @login_required def wakeup(): WakePAJobManager() return redirect("/") ################################################################################ @app.route("/stale_job/", methods=["POST"]) @login_required def stale_job(id): job=Job.query.get(id) now=datetime.now(pytz.utc) db.session.add(job) if request.form['action'] == "restart": log=Joblog( job_id=id, log="(Stale) Job restarted manually by user", log_date=now ) job.pa_job_state='New' job.state='New' # reset state of job too, it should seem as new, and recalc/reset these items job.num_files=0 job.current_file='' job.current_file_num=0 job.last_update=now db.session.add(log) elif request.form['action'] == "cancel": WithdrawDependantJobs( job, job.id, "(Stale) Job withdrawn manually by user" ) FinishJob(job, f"Job (#{job.id}) (Stale) Job withdrawn manually by user", "Withdrawn" ) # clear out message for this job being stale (and do this via raw sql to # avoid circulr import) db.engine.execute( f"delete from pa_job_manager_fe_message where job_id = {id}" ) db.session.commit() WakePAJobManager() return redirect("/jobs") ################################################################################ # list jobs that are maked stale ################################################################################ @app.route("/stale_jobs", methods=["GET"]) @login_required def stale_jobs(): page_title='Stale job list' jobs = Job.query.filter(Job.pa_job_state=='Stale').order_by(Job.id.desc()).all() return render_template("jobs.html", jobs=jobs, page_title=page_title) ################################################################################ # retrieve any log entries across ALL jobs that relate to this entry # used in viewer on button click ################################################################################ @app.route("/joblog_search", methods=["POST"]) @login_required def joblog_search(): eid=request.form['eid'] ent_cursor=db.engine.execute( f"select name from entry where id = {eid}" ) for ent in ent_cursor: jobs_cursor=db.engine.execute( f"select l.log, j.id, j.name, j.state, l.log_date from joblog l, job j where l.job_id = j.id and l.log ilike '%%{ent[0]}%%' order by l.log_date") # turn DB output into json and return it to the f/e ret='[ ' first_job=1 last_job_id = -1 for j in jobs_cursor: if not first_job: ret +=", " ret+= '{' ret+= f'"id":"{j.id}", ' ret+= f'"name":"{j.name}", ' ret+= f'"log_date":"{j.log_date}", ' ret+= f'"log": "{j.log}"' ret+= '}' first_job=0 ret+= ' ]' return ret ############################################################################### # / -> POST -> looks for pa_job_manager status to F/E jobs and sends json of # them back to F/E (called form internal/js/jobs.js:CheckForJobs() ################################################################################ @app.route("/checkforjobs", methods=["POST"]) @login_required def CheckForJobs(): num=GetNumActiveJobs() sts=[] print("CheckForJobs called" ) for msg in PA_JobManager_Message.query.all(): print("there is a PA_J_MGR status message" ) u='Job # ' + str(msg.job_id) + ': ' sts.append( { 'message': u+msg.message, 'alert': msg.alert } ) return make_response( jsonify( num_active_jobs=num, sts=sts ) ) ############################################################################### # This func creates a new filter in jinja2 to format the time from the db in a # way that is more readable (converted to local tz too) ################################################################################ @app.template_filter('vicdate') def _jinja2_filter_datetime(date, fmt=None): if date: return date.strftime("%d/%m/%Y %I:%M:%S %p") else: return "N/A"