From 826e2fd4219c820bf8fcb43504b373fd9f1dce2a Mon Sep 17 00:00:00 2001 From: Damien De Paoli Date: Sat, 16 Jan 2021 17:46:54 +1100 Subject: [PATCH] switched name to pa_job_manager, feels more appropriate --- pa_job_manager.py | 121 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 pa_job_manager.py diff --git a/pa_job_manager.py b/pa_job_manager.py new file mode 100644 index 0000000..46681e4 --- /dev/null +++ b/pa_job_manager.py @@ -0,0 +1,121 @@ +### +# +# This file controls the 'external' job control manager, that (periodically # +# looks / somehow is pushed an event?) picks up new jobs, and processes them. +# +# It then stores the progress/status, etc. in job and joblog tables as needed +# via wrapper functions. +# +# The whole pa_job_manager is multi-threaded, and uses the database tables for +# state management and communication back to the pa web site +# +### + +#### DDP: work out the import line for just sqlalchemy +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, Integer, String, Sequence, ForeignKey, DateTime +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from shared import DB_URL +from datetime import datetime, timedelta +import pytz + + +# an Manager, which the Session will use for connection resources +some_manager = create_engine(DB_URL) + +# create a configured "Session" class +Session = sessionmaker(bind=some_manager) + +# create a Session +session = Session() + +Base = declarative_base() + + +# global for us to keep state / let front-end know our state +pa_eng=None + + +#### DDP: work out the class creation line for just sqlalchemy +class PA_JobManager(Base): + __tablename__ = "pa_job_manager" + id = Column(Integer, Sequence('pa_job_manager_id_seq'), primary_key=True) + state = Column(String) + num_active_jobs = Column(Integer) + num_completed_jobs = Column(Integer) + + def __repr__(self): + return "".format( self.id, self.state, self.num_active_jobs, self.num_completed_jobs ) + + +class Joblog(Base): + __tablename__ = "joblog" + id = Column(Integer, Sequence('ill_id_seq'), primary_key=True ) + job_id = Column(Integer, ForeignKey('job.id'), primary_key=True ) + log_date = Column(DateTime(timezone=True)) + log = Column(String) + + def __repr__(self): + return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file) + + +def GetJobs(): + return session.query(Job).all() + +def InitialiseManager(): + pa_eng=session.query(PA_JobManager).first() + print(pa_eng) + if( pa_eng == None ): + pa_eng = PA_JobManager(state='Initialising', num_active_jobs=0, num_completed_jobs=0 ) + session.add(pa_eng) + + pa_eng.state = 'Scanning Jobs' + jobs=GetJobs() + for job in jobs: + if job.pa_job_state != 'complete': + print("We have a job we need to handle, its current pa_eng state is {}, internal job state is: {}".format( job.pa_job_state, job.state) ) + pa_eng.num_active_jobs = pa_eng.num_active_jobs + 1 + else: + pa_eng.num_completed_jobs = pa_eng.num_completed_jobs +1 + print("PA job manager is up") + pa_eng.state = 'Waiting for new Jobs' + return + +def CreateJob(): + return + +def UpdateJob(): + return + +if __name__ == "__main__": + print("PA job manager starting") + try: + InitialiseManager() + session.commit() + except Exception as e: + print( "Failed to initialise PA Job Manager: {}".format(e) ) + session.rollback() + print("Exiting for now")