diff --git a/TODO b/TODO index 936415a..a91f49d 100644 --- a/TODO +++ b/TODO @@ -1,8 +1,4 @@ ## GENERAL - * optimise run_ai_on (and for that matter getfiledetails, etc.)... - - e.g. with last scan*, STORE: new files? - - if last scan no new files, dont getfiledetails, don't re-run ai job - * per file you could select an unknown face and add it as a ref img to an existing person, or make a new person and attach? * [DONE] order/ find face with largest size and at least show that as unmatched - could also try to check it vs. other faces, if it matches more than say 10? we offer it up as a required ref img, then cut that face (with margin) out and use it is a new ref image / person diff --git a/pa_job_manager.py b/pa_job_manager.py index 517e19b..a4fe134 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -13,11 +13,6 @@ # pylint: disable=no-member - -# global debug setting -if os.environ['FLASK_ENV'] != "production": - DEBUG=True - ### SQLALCHEMY IMPORTS ### from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean, func @@ -52,6 +47,10 @@ import sys import json +# global debug setting +if 'FLASK_ENV' not in os.environ or os.environ['FLASK_ENV'] != "production": + DEBUG=True + # this is required to handle the duplicate processing code sys.setrecursionlimit(50000) @@ -685,16 +684,6 @@ def RunJob(job): HandleJobs(False) return -############################################################################## -# CancelJob(): cancel this job, and if any other job is waiting on this job, -# then cancel it too (this is done recursively) -############################################################################## -def CancelJob(job,id): - for j in session.query(Job).filter(Job.wait_for==id).all(): - FinishJob(j, f"Job (#{j.id}) has been withdrawn as the job being waited for #{job.id} failed", "Withdrawn" ) - CancelJob(j, j.id) - return - ############################################################################## # FinishJob(): finish this job off (if no overrides), its just marked completed ############################################################################## @@ -704,7 +693,7 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): job.last_update=datetime.now(pytz.utc) AddLogForJob(job, last_log) if job.state=="Failed": - CancelJob(job,job.id) + WithdrawDependantJobs( job, job.id, "failed" ): session.commit() if DEBUG: print( f"DEBUG: {last_log}" ) @@ -1254,6 +1243,17 @@ def AddJexToDependantJobs(job,name,value): AddJexToDependantJobs(j, name, value) return +#################################################################################################################################### +# WithdrawDependantJobs(): cancel/withdraw this dependant job, and if any other job is waiting on it then +# cancel it too (this is done recursively) +#################################################################################################################################### +def WithdrawDependantJobs( job, id, reason ): + for j in session.query(Job).filter(Job.wait_for==id).all(): + FinishJob(j, f"Job (#{j.id}) has been withdrawn -- #{job.id} {reason}", "Withdrawn" ) + WithdrawDependantJobs(j, j.id, reason) + return + + #################################################################################################################################### # JobImportDir(): job that scan import dir and processes entries in there - key function that uses os.walk() to traverse the # file system and calls AddFile()/AddDir() as necessary @@ -1268,6 +1268,7 @@ def JobImportDir(job): if DEBUG: print( f"DEBUG: Checking Directory: {path}" ) if not os.path.exists( path ): + WithdrawDependantJobs( job, job.id, "scan job found no new files to process" ) FinishJob( job, f"Finished Importing: {path} -- Path does not exist", "Failed" ) return symlink=SymlinkName(ptype.name, path, path) @@ -1302,6 +1303,9 @@ def JobImportDir(job): AddLogForJob(job, f"Found {overall_file_cnt} file(s) to process") session.commit() + # we will store this into the job extras as the final result of whether this job found any new files, + # if it did not, then the dependant jobs dont need to really run - use to optimise them + found_new_files=0 # root == path of dir, files are in dir... subdirs are in dir for root, subdirs, files in ftree: # already create root above to work out num_files for whole os.walk @@ -1331,6 +1335,7 @@ def JobImportDir(job): year, month, day, woy = GetDateFromFile(fname, stat) e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy ) + found_new_files += 1 else: if DEBUG: print( f"DEBUG: { basename} - {stat.st_ctime} is OLDER than {dir.last_import_date}" ) @@ -1344,7 +1349,9 @@ def JobImportDir(job): rm_cnt=HandleAnyFSDeletions(job) - FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)") + if found_new_files == 0: + WithdrawDependantJobs( job, job.id, "scan job found no new files to process" ) + FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Found {found_new_files} new files, Removed {rm_cnt} file(s)") return ####################################################################################################################################