From 221ab03dbb5f67220242a485aa6e63bf30907549 Mon Sep 17 00:00:00 2001 From: Damien De Paoli Date: Sat, 23 Jan 2021 00:32:17 +1100 Subject: [PATCH] fixed BUG-8 job failures should cascade cancels. Also put back dependency for AI job to wait for GetFileDetails (might not really be needed as I think Cams code just calls exif rotate itself anyway, also took out my temporary block of running the slow AI code. Need to discuss how to optimise it so it does not re-run parts it should know, e.g. refimg stuff once / not again unless ref imgs changed, ditto file/ai scan , etc. --- BUGs | 1 - pa_job_manager.py | 25 ++++++++++++------------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/BUGs b/BUGs index 98a1e83..d27f64b 100644 --- a/BUGs +++ b/BUGs @@ -1,4 +1,3 @@ ### Next: 15 BUG-4: Duration is borked and comes out as -1 day under jobs (windows created jobs ONLY) - BUG-8: cascading failure jobs are needed BUG-11: Ai ref img jobs are looping, needs fix diff --git a/pa_job_manager.py b/pa_job_manager.py index 9d2f4ec..c3574ba 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -246,7 +246,7 @@ def ProcessImportDirs(parent_job=None): if parent_job: AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) ) jex3=JobExtra( name="path", value=path ) - job3=Job(start_time='now()', last_update='now()', name="processai", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 ) + job3=Job(start_time='now()', last_update='now()', name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 ) job3.extra.append(jex3) session.add(job3) session.commit() @@ -287,11 +287,21 @@ def RunJob(job): HandleJobs() return +def CancelJob(job,id): + for j in session.query(Job).filter(Job.wait_for==id).all(): + if DEBUG==1: + print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(j.id, job.id) ) + FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) + CancelJob(j, j.id) + return + def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): job.state=state job.pa_job_state=pa_job_state job.last_update=datetime.now(pytz.utc) AddLogForJob(job, last_log) + if job.state=="Failed": + CancelJob(job,job.id) return def HandleJobs(): @@ -416,10 +426,6 @@ def JobImportDir(job): print("DEBUG: Checking Import Directory: {}".format( path ) ) if not os.path.exists( path ): FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) - for j in session.query(Job).filter(Job.wait_for==job.id).all(): - if DEBUG==1: - print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) - FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) return symlink=CreateSymlink(job,path) overall_file_cnt=0 @@ -467,10 +473,6 @@ def JobImportDir(job): return def JobProcessAI(job): - print("DDP: disabled while fixing other bugs") - FinishJob(job, "Finished Processesing AI") - return - path=[jex.value for jex in job.extra if jex.name == "path"][0] path = SymlinkName(path, '/') print('REMOVE AFTER TESTING ON WINDOWS... path=',path) @@ -559,11 +561,8 @@ def ProcessFilesInDir(job, e, file_func): else: dir=session.query(Dir).filter(Dir.eid==e.id).first() job.current_file_num+=1 - # if this func returns - if not go_into_dir_func(job,dir): - return for sub in dir.files: - ProcessFilesInDir(job, sub, file_func, go_into_dir_func) + ProcessFilesInDir(job, sub, file_func) def JobGetFileDetails(job): JobProgressState( job, "In Progress" )