diff --git a/pa_job_manager.py b/pa_job_manager.py index 469c914..55679d7 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -712,7 +712,7 @@ def JobsForPath( parent_job, path, ptype ): # start with import dir jex=[] jex.append( JobExtra( name="path", value=path ) ) - jex.append( JobExtra( name="path_type", value=ptype.id ) ) + jex.append( JobExtra( name="path_type", value=str(ptype.id) ) ) job1=NewJob( name="import_dir", num_files=cfn, wait_for=None, jex=jex, parent_job=parent_job, desc=f"scan for files from {ptype.name} path" ) # then get file details (hash/thumbs) @@ -723,7 +723,7 @@ def JobsForPath( parent_job, path, ptype ): # can start straight after import_dir - job1, does not need details (job2) jex=[] jex.append( JobExtra( name="person", value="all" ) ) - jex.append( JobExtra( name="path_type", value=ptype.id ) ) + jex.append( JobExtra( name="path_type", value=str(ptype.id) ) ) job3=NewJob( name="run_ai_on_path", num_files=0, wait_for=job1.id, jex=jex, parent_job=parent_job, desc=f"match faces on files from {ptype.name} path" ) # careful here, wait for get_file_details (job2), the ai job cannot cause a dup @@ -1591,7 +1591,7 @@ def GetDateFromFile(file, stat): #################################################################################################################################### def AddJexToDependantJobs(job,name,value): for j in session.query(Job).filter(Job.wait_for==job.id).all(): - jex=JobExtra( name=name, value=value ) + jex=JobExtra( name=name, value=str(value) ) j.extra.append(jex) AddJexToDependantJobs(j, name, value) return @@ -1612,11 +1612,16 @@ def WithdrawDependantJobs( job, id, reason ): # find last successful import_dir job for this path #################################################################################################################################### def find_last_time_new_files_found(job): - path=[jex.value for jex in job.extra if jex.name == "path"][0] - jobs = session.execute( f"select j.* from job j, jobextra jex1, jobextra jex2 where j.id = jex1.job_id and j.id = jex2.job_id and jex1.name ='path' and jex1.value = '{path}' and jex2.name = 'new_files'") + from sqlalchemy import text, desc + from sqlalchemy.orm import aliased - for j in jobs: - return j.last_update.timestamp() + jex1=aliased(JobExtra) + jex2=aliased(JobExtra) + path=[jex.value for jex in job.extra if jex.name == "path"][0] + + nf_job=session.query(Job).join(jex1).join(jex2).filter(jex1.name==text("'path'")).filter(jex1.value==text(f"'{path}'")).filter(jex2.name==text("'new_files'")).order_by(Job.last_update.desc()).limit(1).first() + if nf_job: + return nf_job.last_update.timestamp() return 0 #################################################################################################################################### @@ -1624,9 +1629,9 @@ def find_last_time_new_files_found(job): #################################################################################################################################### def find_last_successful_gfd_job(job): path=[jex.value for jex in job.extra if jex.name == "path"][0] - jobs=session.query(Job).join(JobExtra).filter(Job.name=="get_file_details").filter(JobExtra.value==path).filter(Job.state=='Completed').order_by(Job.id.desc()).limit(1).all() - for j in jobs: - return j.last_update.timestamp() + gfd_job=session.query(Job).join(JobExtra).filter(Job.name=="get_file_details").filter(JobExtra.value==path).filter(Job.state=='Completed').order_by(Job.id.desc()).limit(1).first() + if gfd_job: + return gfd_job.last_update.timestamp() return 0 #################################################################################################################################### @@ -1634,9 +1639,9 @@ def find_last_successful_gfd_job(job): #################################################################################################################################### def find_last_successful_ai_scan(job): path_type=[jex.value for jex in job.extra if jex.name == "path_type"][0] - jobs=session.query(Job).join(JobExtra).filter(Job.name=="run_ai_on_path").filter(JobExtra.name=='path_type',JobExtra.value==path_type).filter(Job.state=='Completed').order_by(Job.id.desc()).limit(1).all() - for j in jobs: - return j.last_update.timestamp() + ai_job=session.query(Job).join(JobExtra).filter(Job.name=="run_ai_on_path").filter(JobExtra.name=='path_type',JobExtra.value==path_type).filter(Job.state=='Completed').order_by(Job.id.desc()).limit(1).first() + if ai_job: + return ai_job.last_update.timestamp() return 0 #################################################################################################################################### @@ -1757,7 +1762,7 @@ def JobImportDir(job): dir.last_import_date = time.time() job.num_files=overall_file_cnt if found_new_files: - job.extra.append( JobExtra( name="new_files", value=found_new_files ) ) + job.extra.append( JobExtra( name="new_files", value=str(found_new_files) ) ) session.add(job) # this will invalidate pa_user_state for this path's contents (offsets are now wrong), clear them out DeleteOldPA_UserState(job)