From 10866c3147c61e41b2818a5490ec9edbc723985c Mon Sep 17 00:00:00 2001 From: Damien De Paoli Date: Thu, 13 Jan 2022 13:27:55 +1100 Subject: [PATCH] scheduled jobs implemented, after X days in settings we scan import, storage paths, and check the Bin for old files and actually delete them from the file system --- TODO | 11 ++-- pa_job_manager.py | 136 ++++++++++++++++++++++++++++++++++++++++++---- settings.py | 20 ++++++- shared.py | 2 + 4 files changed, 153 insertions(+), 16 deletions(-) diff --git a/TODO b/TODO index 39d4cc6..5906013 100644 --- a/TODO +++ b/TODO @@ -1,4 +1,11 @@ ## GENERAL + * only show say last week of jobs, or last 50? and archive the rest into an archived_jobs table + need scheduled jobs: + - [DONE] force scans of import/storage paths + - [DONE] delete old files from Recycle Bin + - need to archive jobs + + * per file you could select an unknown face and add it as a ref img to an existing person, or make a new person and attach? * when search, have a way to hide deleted files @@ -52,10 +59,6 @@ *** Need to use thread-safe sessions per Thread, half-assed version did not work - * put nightly? job to scan import dir, clean out old recycle bin entries - * put weekly? job to scan storage dir - * put weekly? job to scan storage dir - Admin -> do I want to have admin roles/users? -> purge deleted files (and associated DB data) needs a dbox or privs diff --git a/pa_job_manager.py b/pa_job_manager.py index 2dc1f31..686ba23 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -27,7 +27,7 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session ### LOCAL FILE IMPORTS ### -from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName, GenThumb +from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName, GenThumb, SECS_IN_A_DAY from datetime import datetime, timedelta, date ### PYTHON LIB IMPORTS ### @@ -241,9 +241,13 @@ class Settings(Base): default_refimg_model = Column(Integer,ForeignKey('ai_model.id'), unique=True, nullable=False) default_scan_model = Column(Integer,ForeignKey('ai_model.id'), unique=True, nullable=False) default_threshold = Column(Integer) + scheduled_import_scan = Column(Integer) + scheduled_storage_scan = Column(Integer) + scheduled_bin_cleanup = Column(Integer) + bin_cleanup_file_age = Column(Integer) def __repr__(self): - return f"" + return f"" ################################################################################ # Class describing Person to Refimg link in DB via sqlalchemy @@ -574,6 +578,44 @@ def ProcessFileForJob(job, message, current_file): AddLogForJob(job, message ) return +############################################################################## +# +############################################################################## +def CleanFileFromBin(job, e): + # commit every 100 files to see progress being made but not hammer the database + if job.current_file_num % 100 == 0: + session.commit() + + settings = session.query(Settings).first() + fname=e.FullPathOnFS() + stat = os.stat(e.FullPathOnFS()) + now=time.time() +# print( f"DDP: CleanFileFromBin: We have a file {e.id} - {e.name} in {e.in_dir.rel_path} in the Bin, check if it is older than {settings.bin_cleanup_file_age}" ) +# print( f" now={now} & stat.st_ctime={stat.st_ctime}, now-ctime={(now - stat.st_ctime)} or {(now - stat.st_ctime)/SECS_IN_A_DAY} days old" ) + # use ctime as that will be when the file was moved into the Bin path + if (now - stat.st_ctime)/SECS_IN_A_DAY >= settings.bin_cleanup_file_age: + try: + print( f"would physically remove this file: {fname}, e={e}") + os.remove( fname ) + except Exception as ex: + AddLogForJob(job, f"ERROR: Tried to delete old file: {ex}" ) + + RemoveFileFromDB( job, e, f"INFO: Removing file: {e.name} from system as it is older than {settings.bin_cleanup_file_age} - Age in days: {int(now - stat.st_ctime)/SECS_IN_A_DAY}" ) + return + +############################################################################## +# JobCleanBin(job): Job that checks to see if there are any files that are old +# and removes them for good from the Bin. Only triggered on schedule from +# Settings.scheduled_bin_cleanup days elapsed since last 'clean_bin' job +############################################################################## +def JobCleanBin(job): + JobProgressState( job, "In Progress" ) + settings = session.query(Settings).first() + bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first() + RunFuncOnFilesInPath( job, bin_path.path_prefix, CleanFileFromBin, True ) + FinishJob(job, f"Finished clean up of files older than {settings.bin_cleanup_file_age} days from Recycle Bin") + return + ############################################################################## # AddLogForJob(): add a log line to joblog, if the last time we wrote a log # was over 5 seconds ago, then commit the log to the db, so in f/e we see @@ -630,6 +672,8 @@ def RunJob(job): JobRunAIOn(job) elif job.name == "transform_image": JobTransformImage(job) + elif job.name == "clean_bin": + JobCleanBin(job) else: print("ERROR: Requested to process unknown job type: {}".format(job.name)) # okay, we finished a job, so check for any jobs that are dependant on this and run them... @@ -815,7 +859,6 @@ def AddDir(job, dirname, in_dir, rel_path, in_path ): if dir: e=session.query(Entry).get(dir.eid) e.exists_on_fs=True - print("returning existing entry for AddDir" ) return dir dir=Dir( last_import_date=0, rel_path=rel_path, in_path=in_path ) dtype=session.query(FileType).filter(FileType.name=='Directory').first() @@ -824,10 +867,8 @@ def AddDir(job, dirname, in_dir, rel_path, in_path ): # no in_dir occurs when we Add the actual Dir for the Path (top of the tree) if in_dir: e.in_dir=in_dir - print( f"DDP: set in_dir for {e.name} with in_dir={in_dir}" ) if DEBUG: AddLogForJob(job, f"DEBUG: Process new dir: {dirname}, rel_path={rel_path}") - print( f"DDP: Process new dir: e={e}" ) session.add(e) return dir @@ -897,6 +938,9 @@ def CleanUpDirInDB(job, e): # okay remove this empty dir RemoveEmtpyDirFromFS( job, e ) RemoveEmptyDirFromDB( job, e ) + # if no in_dir, we are at the root of the path, STOP + if not e.in_dir: + return # get an Entry from DB (in_dir is a Dir/we need the ORM entry for code to work) parent_dir = session.query(Entry).get(e.in_dir.eid) print( f" Dir {e.FullPathOnFS()} is in {parent_dir.FullPathOnFS()} ({parent_dir.id}) -> check next" ) @@ -910,16 +954,18 @@ def CleanUpDirInDB(job, e): # Convenience function to remove a file from the database - and its associated links # used when scanning and a file has been removed out from under PA, or when we remove duplicates #################################################################################################################################### -def RemoveFileFromDB(job, del_me): +def RemoveFileFromDB(job, del_me, msg): parent_dir_e=session.query(Entry).get(del_me.in_dir.eid) session.query(EntryDirLink).filter(EntryDirLink.entry_id==del_me.id).delete() connected_faces=session.query(FaceFileLink).filter(FaceFileLink.file_eid==del_me.id).all() for ffl in connected_faces: session.query(FaceRefimgLink).filter(FaceRefimgLink.face_id==ffl.face_id).delete() session.query(Face).filter(Face.id==ffl.face_id).delete() + # this might be a file removed from the Bin path, so also get rid of its DelFile + session.query(DelFile).filter(DelFile.file_eid==del_me.id).delete() session.query(File).filter(File.eid==del_me.id).delete() session.query(Entry).filter(Entry.id==del_me.id).delete() - AddLogForJob( job, f"INFO: Removing file: {del_me.name} from system as it is no longer on the file system") + AddLogForJob( job, msg ) CleanUpDirInDB(job, parent_dir_e) return @@ -1152,7 +1198,7 @@ def HandleAnyFSDeletions(job): rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all() rm_cnt=0 for rm in rms: - RemoveFileFromDB(job, rm) + RemoveFileFromDB(job, rm, f"INFO: Removing file: {rm.name} from system as it is no longer on the file system") rm_cnt+=1 rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==dtype.id).order_by(Entry.id.desc()).all() @@ -1920,6 +1966,65 @@ def ScanFileForPerson( job, e, force=False ): file_h.last_ai_scan = time.time() return +#################################################################################################################################### +# DaysSinceLastScan(): which == "Import" or "Storage" and is used to match the path_type, and then the Dir entry for the root Dir +# of that path. Then calc days since last scan and return the oldest for all Dir(s) in which Path (as scan works on path, not dir) +#################################################################################################################################### +def DaysSinceLastScan(which): + now=time.time() + oldest=0 + dirs=session.query(Dir).join(PathDirLink).join(Path).join(PathType).filter(PathType.name==which).filter(Dir.rel_path=='').all() + for d in dirs: + last_scan_days_ago = (now - d.last_import_date) / SECS_IN_A_DAY + if last_scan_days_ago > oldest: + oldest=last_scan_days_ago + return oldest + +#################################################################################################################################### +# CheckAndRunBinClean(): calc days since last job ran to clean out the Recycle Bin +#################################################################################################################################### +def CheckAndRunBinClean(): + created_jobs=False + # get most recent clean_bin job + j = session.query(Job).filter(Job.name=='clean_bin').order_by(Job.id.desc()).first() + settings = session.query(Settings).first() + + now=datetime.now(pytz.utc) + if not j or (now-j.last_update).days >= settings.scheduled_bin_cleanup: + print( f"INFO: Should force clean up bin path, del files older than {settings.bin_cleanup_file_age} days old" ) + job=Job(start_time=now, last_update=now, name="clean_bin", state="New", wait_for=None, pa_job_state="New", current_file_num=0 ) + session.add(job) + created_jobs=True + return created_jobs + +#################################################################################################################################### +# ScheduledJobs() is triggered when any job is run, or when the socket times out once a day. It uses the settings to find any time +# based jobs that should run (e.g. last scanned a path X day(s) ago, then scan now), etc. X is defined in settings +#################################################################################################################################### +def ScheduledJobs(): + print("DEBUG: Time to check for any scheduled jobs needing to be run" ) + + created_jobs=False + + ndays_since_last_im_scan = DaysSinceLastScan( "Import" ) + ndays_since_last_st_scan = DaysSinceLastScan( "Storage" ) + + settings = session.query(Settings).first() + now=datetime.now(pytz.utc) + if ndays_since_last_im_scan >= settings.scheduled_import_scan: + print( "INFO: Time to force an import scan" ) + job=Job(start_time=now, last_update=now, name="scannow", state="New", wait_for=None, pa_job_state="New", current_file_num=0 ) + session.add(job) + created_jobs=True + if ndays_since_last_st_scan >= settings.scheduled_storage_scan: + print( "INFO: Time to force a storage scan" ) + job=Job(start_time=now, last_update=now, name="scan_sp", state="New", wait_for=None, pa_job_state="New", current_file_num=0 ) + session.add(job) + created_jobs=True + if CheckAndRunBinClean(): + created_jobs=True + return created_jobs + #################################################################################################################################### # MAIN - start with validation, then grab any jobs in the DB to process, then @@ -1933,7 +2038,18 @@ if __name__ == "__main__": HandleJobs(True) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) + # force timeout every 1 day so we can run scheduled jobs + s.settimeout(SECS_IN_A_DAY) s.listen() while True: - conn, addr = s.accept() - HandleJobs(False) + try: + conn, addr = s.accept() + except socket.timeout: + if ScheduledJobs(): + HandleJobs(False) + continue + else: + HandleJobs(False) + # in case we constantly have jobs running, the '1 day' last import might be missed, so check it after each job too + if ScheduledJobs(): + HandleJobs(False) diff --git a/settings.py b/settings.py index c661d50..73f463b 100644 --- a/settings.py +++ b/settings.py @@ -1,4 +1,4 @@ -from wtforms import SubmitField, StringField, FloatField, HiddenField, validators, Form, SelectField +from wtforms import SubmitField, StringField, IntegerField, FloatField, HiddenField, validators, Form, SelectField from flask_wtf import FlaskForm from flask import request, render_template, redirect, url_for from main import db, app, ma @@ -33,9 +33,13 @@ class Settings(db.Model): default_refimg_model = db.Column(db.Integer,db.ForeignKey('ai_model.id'), unique=True, nullable=False) default_scan_model = db.Column(db.Integer,db.ForeignKey('ai_model.id'), unique=True, nullable=False) default_threshold = db.Column(db.Integer) + scheduled_import_scan = db.Column(db.Integer) + scheduled_storage_scan = db.Column(db.Integer) + scheduled_bin_cleanup = db.Column(db.Integer) + bin_cleanup_file_age = db.Column(db.Integer) def __repr__(self): - return f"" + return f"" ################################################################################ # Helper class that inherits a .dump() method to turn class Settings into json / useful in jinja2 @@ -60,6 +64,10 @@ class SettingsForm(FlaskForm): default_refimg_model = SelectField( 'Default model to use for reference images', choices=[(c.id, c.name) for c in AIModel.query.order_by('id')] ) default_scan_model = SelectField( 'Default model to use for all scanned images', choices=[(c.id, c.name) for c in AIModel.query.order_by('id')] ) default_threshold = StringField('Face Distance threshold (below is a match):', [validators.DataRequired()]) + scheduled_import_scan = IntegerField('Days between forced scan of import path', [validators.DataRequired()]) + scheduled_storage_scan = IntegerField('Days between forced scan of storage path', [validators.DataRequired()]) + scheduled_bin_cleanup = IntegerField('Days between checking to clean Recycle Bin:', [validators.DataRequired()]) + bin_cleanup_file_age = IntegerField('Age of files to clean out of the Recycle Bin', [validators.DataRequired()]) submit = SubmitField('Save' ) ################################################################################ @@ -78,6 +86,10 @@ def settings(): HELP['default_refimg_model']="Default face recognition model used for reference images - cnn is slower/more accurate, hog is faster/less accurate - we scan (small) refimg once, so cnn is okay" HELP['default_scan_model']="Default face recognition model used for scanned images - cnn is slower/more accurate, hog is faster/less accurate - we scan (large) scanned images lots, so cnn NEEDS gpu/mem" HELP['default_threshold']="The distance below which a face is considered a match. The default is usually 0.6, we are trying for about 0.55 with kids. YMMV" + HELP['scheduled_import_scan']="The # of days between forced scans of the import path for new images" + HELP['scheduled_storage_scan']="The # of days between forced scans of the storage path for any file system changes outside of Photo Assistant" + HELP['scheduled_bin_cleanup']="The # of days between running a job to delete old files from the Recycle Bin" + HELP['bin_cleanup_file_age']="The # of days a file has to exist in the Recycle Bin before it can be really deleted" if request.method == 'POST' and form.validate(): try: @@ -94,6 +106,10 @@ def settings(): s.default_refimg_model = request.form['default_refimg_model'] s.default_scan_model = request.form['default_scan_model'] s.default_threshold = request.form['default_threshold'] + s.scheduled_import_scan = request.form['scheduled_import_scan'] + s.scheduled_storage_scan = request.form['scheduled_storage_scan'] + s.scheduled_bin_cleanup = request.form['scheduled_bin_cleanup'] + s.bin_cleanup_file_age = request.form['bin_cleanup_file_age'] db.session.commit() return redirect( url_for( 'settings' ) ) except SQLAlchemyError as e: diff --git a/shared.py b/shared.py index b90cee8..b1f14b6 100644 --- a/shared.py +++ b/shared.py @@ -23,6 +23,8 @@ ICON["Import"]="import" ICON["Storage"]="db" ICON["Bin"]="trash" +SECS_IN_A_DAY = 86400 + # check where we are running, if laptop, then run web server and db on localhost if hostname == "lappy": PA_JOB_MANAGER_HOST="localhost"