### # # This file controls the 'external' job control manager, that (periodically # # looks / somehow is pushed an event?) picks up new jobs, and processes them. # # It then stores the progress/status, etc. in job and joblog tables as needed # via wrapper functions. # # The whole pa_job_manager is multi-threaded, and uses the database tables for # state management and communication back to the pa web site # ### #### DDP: work out the import line for just sqlalchemy from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, ForeignKey, DateTime from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from shared import DB_URL from datetime import datetime, timedelta import pytz # an Manager, which the Session will use for connection resources some_manager = create_engine(DB_URL) # create a configured "Session" class Session = sessionmaker(bind=some_manager) # create a Session session = Session() Base = declarative_base() # global for us to keep state / let front-end know our state pa_eng=None #### DDP: work out the class creation line for just sqlalchemy class PA_JobManager(Base): __tablename__ = "pa_job_manager" id = Column(Integer, Sequence('pa_job_manager_id_seq'), primary_key=True) state = Column(String) num_active_jobs = Column(Integer) num_completed_jobs = Column(Integer) def __repr__(self): return "".format( self.id, self.state, self.num_active_jobs, self.num_completed_jobs ) class Joblog(Base): __tablename__ = "joblog" id = Column(Integer, Sequence('ill_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id'), primary_key=True ) log_date = Column(DateTime(timezone=True)) log = Column(String) def __repr__(self): return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file) def GetJobs(): return session.query(Job).all() def InitialiseManager(): pa_eng=session.query(PA_JobManager).first() print(pa_eng) if( pa_eng == None ): pa_eng = PA_JobManager(state='Initialising', num_active_jobs=0, num_completed_jobs=0 ) session.add(pa_eng) pa_eng.state = 'Scanning Jobs' jobs=GetJobs() for job in jobs: if job.pa_job_state != 'complete': print("We have a job we need to handle, its current pa_eng state is {}, internal job state is: {}".format( job.pa_job_state, job.state) ) pa_eng.num_active_jobs = pa_eng.num_active_jobs + 1 else: pa_eng.num_completed_jobs = pa_eng.num_completed_jobs +1 print("PA job manager is up") pa_eng.state = 'Waiting for new Jobs' return def CreateJob(): return def UpdateJob(): return if __name__ == "__main__": print("PA job manager starting") try: InitialiseManager() session.commit() except Exception as e: print( "Failed to initialise PA Job Manager: {}".format(e) ) session.rollback() print("Exiting for now")