diff --git a/pa_job_manager.py b/pa_job_manager.py index c50c9b0..ff1544c 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -547,6 +547,18 @@ def JobsForPaths( parent_job, paths, ptype ): AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) ) """ + +# make a wrapper to do these few lines (up to run_ai_on) in pa_job_manager and then call it in ai.py in the f/end and here... + + jex.append( JobExtra( name=f"person", value="all" ) ) + paths=Path.query.join(PathType).filter(PathType.name=='Import').all() + path_cnt=0 + for p in paths: + d = Dir.query.join(PathDirLink).filter(PathDirLink.path_id==p.id).filter(Dir.rel_path=='').first() + jex.append( JobExtra( name=f"eid-{path_cnt}", value=f"{d.eid}" ) ) + path_cnt+=1 + job=NewJob( "run_ai_on", 0, None, jex ) + jex3=JobExtra( name="path", value=path ) job3=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 ) job3.extra.append(jex4) @@ -588,18 +600,23 @@ def CleanFileFromBin(job, e): settings = session.query(Settings).first() fname=e.FullPathOnFS() - stat = os.stat(e.FullPathOnFS()) - now=time.time() -# print( f"DDP: CleanFileFromBin: We have a file {e.id} - {e.name} in {e.in_dir.rel_path} in the Bin, check if it is older than {settings.bin_cleanup_file_age}" ) -# print( f" now={now} & stat.st_ctime={stat.st_ctime}, now-ctime={(now - stat.st_ctime)} or {(now - stat.st_ctime)/SECS_IN_A_DAY} days old" ) - # use ctime as that will be when the file was moved into the Bin path - if (now - stat.st_ctime)/SECS_IN_A_DAY >= settings.bin_cleanup_file_age: - try: - os.remove( fname ) - except Exception as ex: - AddLogForJob(job, f"ERROR: Tried to delete old file: {ex}" ) + try: + stat = os.stat(e.FullPathOnFS()) + now=time.time() + # use ctime as that will be when the file was moved into the Bin path + if (now - stat.st_ctime)/SECS_IN_A_DAY >= settings.bin_cleanup_file_age: + try: + os.remove( fname ) + except Exception as ex: + AddLogForJob(job, f"ERROR: Tried to delete old file: {ex}" ) + RemoveFileFromDB( job, e, f"INFO: Removing file: {e.name} from system as it is older than {settings.bin_cleanup_file_age} - Age in days: {int(now - stat.st_ctime)/SECS_IN_A_DAY}" ) + # if the file is no longer on the FS, then remove it. + except FileNotFoundError: + RemoveFileFromDB( job, e, f"INFO: Removing file: {e.name} from DB as it is has already been removed from the filesystem" ) + # some other exception, just log and bail + except Exception as ex: + AddLogForJob(job, f"ERROR: Failed to find/stat old file - NOT removing it - {ex}" ) - RemoveFileFromDB( job, e, f"INFO: Removing file: {e.name} from system as it is older than {settings.bin_cleanup_file_age} - Age in days: {int(now - stat.st_ctime)/SECS_IN_A_DAY}" ) return ############################################################################## @@ -912,6 +929,8 @@ def ResetExistsOnFS(job, path): def RemoveEmtpyDirFromFS( job, del_me ): try: os.rmdir( del_me.FullPathOnFS() ) + except FileNotFoundError: + AddLogForJob( job, f"INFO: Dir already removed -- {del_me.FullPathOnFS()}") except Exception as e: AddLogForJob( job, f"ERROR: Failed to remove dir from filesystem - which={del_me.FullPathOnFS()}, err: {e}") return @@ -921,7 +940,7 @@ def RemoveEmptyDirFromDB( job, del_me ): session.query(PathDirLink).filter(PathDirLink.dir_eid==del_me.id).delete() session.query(Dir).filter(Dir.eid==del_me.id).delete() session.query(Entry).filter(Entry.id==del_me.id).delete() - AddLogForJob( job, f"INFO: Removing {del_me.FullPathOnFS()} from system as removing duplicates has left it empty" ) + AddLogForJob( job, f"INFO: Removing {del_me.FullPathOnFS()} from system as removing files has left it empty" ) return #################################################################################################################################### @@ -2014,12 +2033,12 @@ def ScheduledJobs(): settings = session.query(Settings).first() now=datetime.now(pytz.utc) if ndays_since_last_im_scan >= settings.scheduled_import_scan: - print( "INFO: Time to force an import scan" ) + print( f"INFO: Time to force an import scan, last scan was {ndays_since_last_im_scan} days ago" ) job=Job(start_time=now, last_update=now, name="scannow", state="New", wait_for=None, pa_job_state="New", current_file_num=0 ) session.add(job) created_jobs=True if ndays_since_last_st_scan >= settings.scheduled_storage_scan: - print( "INFO: Time to force a storage scan" ) + print( f"INFO: Time to force a storage scan, last scan was {ndays_since_last_st_scan}" ) job=Job(start_time=now, last_update=now, name="scan_sp", state="New", wait_for=None, pa_job_state="New", current_file_num=0 ) session.add(job) created_jobs=True @@ -2046,7 +2065,12 @@ if __name__ == "__main__": while True: try: conn, addr = s.accept() + if DEBUG: + print( f"accept finished, tout={s.timeout}" ) + except socket.timeout: + if DEBUG: + print( f"timeout occurred, tout={s.timeout}" ) if ScheduledJobs(): HandleJobs(False) continue