switched name to pa_job_manager, feels more appropriate
This commit is contained in:
121
pa_job_manager.py
Normal file
121
pa_job_manager.py
Normal file
@@ -0,0 +1,121 @@
|
||||
###
|
||||
#
|
||||
# This file controls the 'external' job control manager, that (periodically #
|
||||
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
||||
#
|
||||
# It then stores the progress/status, etc. in job and joblog tables as needed
|
||||
# via wrapper functions.
|
||||
#
|
||||
# The whole pa_job_manager is multi-threaded, and uses the database tables for
|
||||
# state management and communication back to the pa web site
|
||||
#
|
||||
###
|
||||
|
||||
#### DDP: work out the import line for just sqlalchemy
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import Column, Integer, String, Sequence, ForeignKey, DateTime
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from shared import DB_URL
|
||||
from datetime import datetime, timedelta
|
||||
import pytz
|
||||
|
||||
|
||||
# an Manager, which the Session will use for connection resources
|
||||
some_manager = create_engine(DB_URL)
|
||||
|
||||
# create a configured "Session" class
|
||||
Session = sessionmaker(bind=some_manager)
|
||||
|
||||
# create a Session
|
||||
session = Session()
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
# global for us to keep state / let front-end know our state
|
||||
pa_eng=None
|
||||
|
||||
|
||||
#### DDP: work out the class creation line for just sqlalchemy
|
||||
class PA_JobManager(Base):
|
||||
__tablename__ = "pa_job_manager"
|
||||
id = Column(Integer, Sequence('pa_job_manager_id_seq'), primary_key=True)
|
||||
state = Column(String)
|
||||
num_active_jobs = Column(Integer)
|
||||
num_completed_jobs = Column(Integer)
|
||||
|
||||
def __repr__(self):
|
||||
return "<id={}, state={}, num_active_jobs={}, num_completed_jobs={}>".format( self.id, self.state, self.num_active_jobs, self.num_completed_jobs )
|
||||
|
||||
|
||||
class Joblog(Base):
|
||||
__tablename__ = "joblog"
|
||||
id = Column(Integer, Sequence('ill_id_seq'), primary_key=True )
|
||||
job_id = Column(Integer, ForeignKey('job.id'), primary_key=True )
|
||||
log_date = Column(DateTime(timezone=True))
|
||||
log = Column(String)
|
||||
|
||||
def __repr__(self):
|
||||
return "<id: {}, job_id: {}, log: {}".format(self.id, self.job_id, self.log )
|
||||
|
||||
################################################################################
|
||||
# Class describing Action in the database, and via sqlalchemy, connected to the DB as well
|
||||
################################################################################
|
||||
class Job(Base):
|
||||
__tablename__ = "job"
|
||||
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
|
||||
start_time = Column(DateTime(timezone=True))
|
||||
last_update = Column(DateTime(timezone=True))
|
||||
name = Column(String)
|
||||
state = Column(String)
|
||||
num_passes = Column(Integer)
|
||||
current_pass = Column(Integer)
|
||||
num_files = Column(Integer)
|
||||
current_file_num = Column(Integer)
|
||||
current_file = Column(String)
|
||||
wait_for = Column(Integer)
|
||||
pa_job_state = Column(String)
|
||||
|
||||
def __repr__(self):
|
||||
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_passes: {}, current_passes: {}, num_files: {}, current_file_num: {}, current_file: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file)
|
||||
|
||||
|
||||
def GetJobs():
|
||||
return session.query(Job).all()
|
||||
|
||||
def InitialiseManager():
|
||||
pa_eng=session.query(PA_JobManager).first()
|
||||
print(pa_eng)
|
||||
if( pa_eng == None ):
|
||||
pa_eng = PA_JobManager(state='Initialising', num_active_jobs=0, num_completed_jobs=0 )
|
||||
session.add(pa_eng)
|
||||
|
||||
pa_eng.state = 'Scanning Jobs'
|
||||
jobs=GetJobs()
|
||||
for job in jobs:
|
||||
if job.pa_job_state != 'complete':
|
||||
print("We have a job we need to handle, its current pa_eng state is {}, internal job state is: {}".format( job.pa_job_state, job.state) )
|
||||
pa_eng.num_active_jobs = pa_eng.num_active_jobs + 1
|
||||
else:
|
||||
pa_eng.num_completed_jobs = pa_eng.num_completed_jobs +1
|
||||
print("PA job manager is up")
|
||||
pa_eng.state = 'Waiting for new Jobs'
|
||||
return
|
||||
|
||||
def CreateJob():
|
||||
return
|
||||
|
||||
def UpdateJob():
|
||||
return
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("PA job manager starting")
|
||||
try:
|
||||
InitialiseManager()
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
print( "Failed to initialise PA Job Manager: {}".format(e) )
|
||||
session.rollback()
|
||||
print("Exiting for now")
|
||||
Reference in New Issue
Block a user