diff --git a/BUGs b/BUGs index 98a1e83..d27f64b 100644 --- a/BUGs +++ b/BUGs @@ -1,4 +1,3 @@ ### Next: 15 BUG-4: Duration is borked and comes out as -1 day under jobs (windows created jobs ONLY) - BUG-8: cascading failure jobs are needed BUG-11: Ai ref img jobs are looping, needs fix diff --git a/pa_job_manager.py b/pa_job_manager.py index 9d2f4ec..c3574ba 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -246,7 +246,7 @@ def ProcessImportDirs(parent_job=None): if parent_job: AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) ) jex3=JobExtra( name="path", value=path ) - job3=Job(start_time='now()', last_update='now()', name="processai", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 ) + job3=Job(start_time='now()', last_update='now()', name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 ) job3.extra.append(jex3) session.add(job3) session.commit() @@ -287,11 +287,21 @@ def RunJob(job): HandleJobs() return +def CancelJob(job,id): + for j in session.query(Job).filter(Job.wait_for==id).all(): + if DEBUG==1: + print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(j.id, job.id) ) + FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) + CancelJob(j, j.id) + return + def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): job.state=state job.pa_job_state=pa_job_state job.last_update=datetime.now(pytz.utc) AddLogForJob(job, last_log) + if job.state=="Failed": + CancelJob(job,job.id) return def HandleJobs(): @@ -416,10 +426,6 @@ def JobImportDir(job): print("DEBUG: Checking Import Directory: {}".format( path ) ) if not os.path.exists( path ): FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) - for j in session.query(Job).filter(Job.wait_for==job.id).all(): - if DEBUG==1: - print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) - FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) return symlink=CreateSymlink(job,path) overall_file_cnt=0 @@ -467,10 +473,6 @@ def JobImportDir(job): return def JobProcessAI(job): - print("DDP: disabled while fixing other bugs") - FinishJob(job, "Finished Processesing AI") - return - path=[jex.value for jex in job.extra if jex.name == "path"][0] path = SymlinkName(path, '/') print('REMOVE AFTER TESTING ON WINDOWS... path=',path) @@ -559,11 +561,8 @@ def ProcessFilesInDir(job, e, file_func): else: dir=session.query(Dir).filter(Dir.eid==e.id).first() job.current_file_num+=1 - # if this func returns - if not go_into_dir_func(job,dir): - return for sub in dir.files: - ProcessFilesInDir(job, sub, file_func, go_into_dir_func) + ProcessFilesInDir(job, sub, file_func) def JobGetFileDetails(job): JobProgressState( job, "In Progress" )