remove pa_job_manager table, its not needed, and fixed bug where Gen hash optimisation did not finish job when it shoud have
This commit is contained in:
@@ -51,9 +51,6 @@ session = Session()
|
|||||||
Base = declarative_base()
|
Base = declarative_base()
|
||||||
|
|
||||||
|
|
||||||
# global for us to keep state / let front-end know our state
|
|
||||||
pa_eng=None
|
|
||||||
|
|
||||||
################################################################################
|
################################################################################
|
||||||
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
||||||
# This has to match one-for-one the DB table
|
# This has to match one-for-one the DB table
|
||||||
@@ -120,20 +117,9 @@ class Settings(Base):
|
|||||||
|
|
||||||
################################################################################
|
################################################################################
|
||||||
# classes for the job manager:
|
# classes for the job manager:
|
||||||
# PA_JobManager overall status tracking),
|
# Job (and Joblog, JobExtra) for each Job, and
|
||||||
# Job (and Joblog) for each JOb, and
|
|
||||||
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
|
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
|
||||||
################################################################################
|
################################################################################
|
||||||
class PA_JobManager(Base):
|
|
||||||
__tablename__ = "pa_job_manager"
|
|
||||||
id = Column(Integer, Sequence('pa_job_manager_id_seq'), primary_key=True)
|
|
||||||
state = Column(String)
|
|
||||||
num_active_jobs = Column(Integer)
|
|
||||||
num_completed_jobs = Column(Integer)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<id={}, state={}, num_active_jobs={}, num_completed_jobs={}>".format( self.id, self.state, self.num_active_jobs, self.num_completed_jobs )
|
|
||||||
|
|
||||||
class Joblog(Base):
|
class Joblog(Base):
|
||||||
__tablename__ = "joblog"
|
__tablename__ = "joblog"
|
||||||
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
|
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
|
||||||
@@ -182,21 +168,14 @@ class PA_JobManager_FE_Message(Base):
|
|||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
||||||
|
|
||||||
|
##############################################################################
|
||||||
|
# Util (non Class) functions
|
||||||
##############################################################################
|
##############################################################################
|
||||||
def MessageToFE( job_id, alert, message ):
|
def MessageToFE( job_id, alert, message ):
|
||||||
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
||||||
session.add(msg)
|
session.add(msg)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
def InitialiseManager():
|
|
||||||
global pa_eng
|
|
||||||
|
|
||||||
pa_eng=session.query(PA_JobManager).first()
|
|
||||||
if( pa_eng == None ):
|
|
||||||
pa_eng = PA_JobManager(state='Initialising', num_active_jobs=0, num_completed_jobs=0 )
|
|
||||||
session.add(pa_eng)
|
|
||||||
return
|
|
||||||
|
|
||||||
def ProcessImportDirs(parent_job=None):
|
def ProcessImportDirs(parent_job=None):
|
||||||
settings = session.query(Settings).first()
|
settings = session.query(Settings).first()
|
||||||
if settings == None:
|
if settings == None:
|
||||||
@@ -237,7 +216,7 @@ def AddLogForJob(job, message, current_file=''):
|
|||||||
return
|
return
|
||||||
|
|
||||||
def RunJob(job):
|
def RunJob(job):
|
||||||
session = Session()
|
# session = Session()
|
||||||
if job.name =="scannow":
|
if job.name =="scannow":
|
||||||
JobScanNow(job)
|
JobScanNow(job)
|
||||||
elif job.name =="forcescan":
|
elif job.name =="forcescan":
|
||||||
@@ -261,10 +240,8 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
|||||||
return
|
return
|
||||||
|
|
||||||
def HandleJobs():
|
def HandleJobs():
|
||||||
global pa_eng
|
|
||||||
|
|
||||||
print("INFO: PA job manager is scanning for new jobs to process")
|
print("INFO: PA job manager is scanning for new jobs to process")
|
||||||
pa_eng.state = 'Scanning Jobs'
|
|
||||||
for job in session.query(Job).all():
|
for job in session.query(Job).all():
|
||||||
if job.pa_job_state == 'New':
|
if job.pa_job_state == 'New':
|
||||||
if job.wait_for != None:
|
if job.wait_for != None:
|
||||||
@@ -292,7 +269,6 @@ def HandleJobs():
|
|||||||
except Exception as e2:
|
except Exception as e2:
|
||||||
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
||||||
print("INFO: PA job manager is waiting jobs")
|
print("INFO: PA job manager is waiting jobs")
|
||||||
pa_eng.state = 'Waiting for new Jobs'
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def JobProgressState( job, state ):
|
def JobProgressState( job, state ):
|
||||||
@@ -492,7 +468,7 @@ def JobGetFileDetails(job):
|
|||||||
if stat.st_ctime < dir.last_hash_date:
|
if stat.st_ctime < dir.last_hash_date:
|
||||||
session.add(dir)
|
session.add(dir)
|
||||||
dir.last_hash_date = time.time()
|
dir.last_hash_date = time.time()
|
||||||
AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix))
|
FinishJob(job, "{} has not changed since last hashing - finished job".format(dir.path_prefix))
|
||||||
if DEBUG==1:
|
if DEBUG==1:
|
||||||
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
|
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
|
||||||
return
|
return
|
||||||
@@ -570,12 +546,7 @@ def GenVideoThumbnail(job, file):
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
print("INFO: PA job manager starting")
|
print("INFO: PA job manager starting")
|
||||||
try:
|
ProcessImportDirs()
|
||||||
InitialiseManager()
|
|
||||||
ProcessImportDirs()
|
|
||||||
except Exception as e:
|
|
||||||
print( "ERROR: Failed to initialise PA Job Manager: {}".format(e) )
|
|
||||||
session.rollback()
|
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
||||||
s.listen()
|
s.listen()
|
||||||
|
|||||||
Reference in New Issue
Block a user