scheduled jobs implemented, after X days in settings we scan import, storage paths, and check the Bin for old files and actually delete them from the file system

This commit is contained in:
2022-01-13 13:27:55 +11:00
parent 592dcf546d
commit 10866c3147
4 changed files with 153 additions and 16 deletions

View File

@@ -27,7 +27,7 @@ from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
### LOCAL FILE IMPORTS ###
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName, GenThumb
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName, GenThumb, SECS_IN_A_DAY
from datetime import datetime, timedelta, date
### PYTHON LIB IMPORTS ###
@@ -241,9 +241,13 @@ class Settings(Base):
default_refimg_model = Column(Integer,ForeignKey('ai_model.id'), unique=True, nullable=False)
default_scan_model = Column(Integer,ForeignKey('ai_model.id'), unique=True, nullable=False)
default_threshold = Column(Integer)
scheduled_import_scan = Column(Integer)
scheduled_storage_scan = Column(Integer)
scheduled_bin_cleanup = Column(Integer)
bin_cleanup_file_age = Column(Integer)
def __repr__(self):
return f"<id: {self.id}, base_path: {self.base_path}, import_path: {self.import_path}, storage_path: {self.storage_path}, recycle_bin_path: {self.recycle_bin_path}, default_refimg_model: {self.default_refimg_model}, default_scan_model: {self.default_scan_model}, default_threshold: {self.default_threshold}>"
return f"<id: {self.id}, import_path: {self.import_path}, storage_path: {self.storage_path}, recycle_bin_path: {self.recycle_bin_path}, default_refimg_model: {self.default_refimg_model}, default_scan_model: {self.default_scan_model}, default_threshold: {self.default_threshold}, scheduled_import_scan:{self.scheduled_import_scan}, scheduled_storage_scan: {self.scheduled_storage_scan}, scheduled_bin_cleanup: {Self.scheduled_bin_cleanup}, bin_cleanup_file_age: {self.bin_cleanup_file_age}>"
################################################################################
# Class describing Person to Refimg link in DB via sqlalchemy
@@ -574,6 +578,44 @@ def ProcessFileForJob(job, message, current_file):
AddLogForJob(job, message )
return
##############################################################################
#
##############################################################################
def CleanFileFromBin(job, e):
# commit every 100 files to see progress being made but not hammer the database
if job.current_file_num % 100 == 0:
session.commit()
settings = session.query(Settings).first()
fname=e.FullPathOnFS()
stat = os.stat(e.FullPathOnFS())
now=time.time()
# print( f"DDP: CleanFileFromBin: We have a file {e.id} - {e.name} in {e.in_dir.rel_path} in the Bin, check if it is older than {settings.bin_cleanup_file_age}" )
# print( f" now={now} & stat.st_ctime={stat.st_ctime}, now-ctime={(now - stat.st_ctime)} or {(now - stat.st_ctime)/SECS_IN_A_DAY} days old" )
# use ctime as that will be when the file was moved into the Bin path
if (now - stat.st_ctime)/SECS_IN_A_DAY >= settings.bin_cleanup_file_age:
try:
print( f"would physically remove this file: {fname}, e={e}")
os.remove( fname )
except Exception as ex:
AddLogForJob(job, f"ERROR: Tried to delete old file: {ex}" )
RemoveFileFromDB( job, e, f"INFO: Removing file: {e.name} from system as it is older than {settings.bin_cleanup_file_age} - Age in days: {int(now - stat.st_ctime)/SECS_IN_A_DAY}" )
return
##############################################################################
# JobCleanBin(job): Job that checks to see if there are any files that are old
# and removes them for good from the Bin. Only triggered on schedule from
# Settings.scheduled_bin_cleanup days elapsed since last 'clean_bin' job
##############################################################################
def JobCleanBin(job):
JobProgressState( job, "In Progress" )
settings = session.query(Settings).first()
bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
RunFuncOnFilesInPath( job, bin_path.path_prefix, CleanFileFromBin, True )
FinishJob(job, f"Finished clean up of files older than {settings.bin_cleanup_file_age} days from Recycle Bin")
return
##############################################################################
# AddLogForJob(): add a log line to joblog, if the last time we wrote a log
# was over 5 seconds ago, then commit the log to the db, so in f/e we see
@@ -630,6 +672,8 @@ def RunJob(job):
JobRunAIOn(job)
elif job.name == "transform_image":
JobTransformImage(job)
elif job.name == "clean_bin":
JobCleanBin(job)
else:
print("ERROR: Requested to process unknown job type: {}".format(job.name))
# okay, we finished a job, so check for any jobs that are dependant on this and run them...
@@ -815,7 +859,6 @@ def AddDir(job, dirname, in_dir, rel_path, in_path ):
if dir:
e=session.query(Entry).get(dir.eid)
e.exists_on_fs=True
print("returning existing entry for AddDir" )
return dir
dir=Dir( last_import_date=0, rel_path=rel_path, in_path=in_path )
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
@@ -824,10 +867,8 @@ def AddDir(job, dirname, in_dir, rel_path, in_path ):
# no in_dir occurs when we Add the actual Dir for the Path (top of the tree)
if in_dir:
e.in_dir=in_dir
print( f"DDP: set in_dir for {e.name} with in_dir={in_dir}" )
if DEBUG:
AddLogForJob(job, f"DEBUG: Process new dir: {dirname}, rel_path={rel_path}")
print( f"DDP: Process new dir: e={e}" )
session.add(e)
return dir
@@ -897,6 +938,9 @@ def CleanUpDirInDB(job, e):
# okay remove this empty dir
RemoveEmtpyDirFromFS( job, e )
RemoveEmptyDirFromDB( job, e )
# if no in_dir, we are at the root of the path, STOP
if not e.in_dir:
return
# get an Entry from DB (in_dir is a Dir/we need the ORM entry for code to work)
parent_dir = session.query(Entry).get(e.in_dir.eid)
print( f" Dir {e.FullPathOnFS()} is in {parent_dir.FullPathOnFS()} ({parent_dir.id}) -> check next" )
@@ -910,16 +954,18 @@ def CleanUpDirInDB(job, e):
# Convenience function to remove a file from the database - and its associated links
# used when scanning and a file has been removed out from under PA, or when we remove duplicates
####################################################################################################################################
def RemoveFileFromDB(job, del_me):
def RemoveFileFromDB(job, del_me, msg):
parent_dir_e=session.query(Entry).get(del_me.in_dir.eid)
session.query(EntryDirLink).filter(EntryDirLink.entry_id==del_me.id).delete()
connected_faces=session.query(FaceFileLink).filter(FaceFileLink.file_eid==del_me.id).all()
for ffl in connected_faces:
session.query(FaceRefimgLink).filter(FaceRefimgLink.face_id==ffl.face_id).delete()
session.query(Face).filter(Face.id==ffl.face_id).delete()
# this might be a file removed from the Bin path, so also get rid of its DelFile
session.query(DelFile).filter(DelFile.file_eid==del_me.id).delete()
session.query(File).filter(File.eid==del_me.id).delete()
session.query(Entry).filter(Entry.id==del_me.id).delete()
AddLogForJob( job, f"INFO: Removing file: {del_me.name} from system as it is no longer on the file system")
AddLogForJob( job, msg )
CleanUpDirInDB(job, parent_dir_e)
return
@@ -1152,7 +1198,7 @@ def HandleAnyFSDeletions(job):
rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all()
rm_cnt=0
for rm in rms:
RemoveFileFromDB(job, rm)
RemoveFileFromDB(job, rm, f"INFO: Removing file: {rm.name} from system as it is no longer on the file system")
rm_cnt+=1
rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==dtype.id).order_by(Entry.id.desc()).all()
@@ -1920,6 +1966,65 @@ def ScanFileForPerson( job, e, force=False ):
file_h.last_ai_scan = time.time()
return
####################################################################################################################################
# DaysSinceLastScan(): which == "Import" or "Storage" and is used to match the path_type, and then the Dir entry for the root Dir
# of that path. Then calc days since last scan and return the oldest for all Dir(s) in which Path (as scan works on path, not dir)
####################################################################################################################################
def DaysSinceLastScan(which):
now=time.time()
oldest=0
dirs=session.query(Dir).join(PathDirLink).join(Path).join(PathType).filter(PathType.name==which).filter(Dir.rel_path=='').all()
for d in dirs:
last_scan_days_ago = (now - d.last_import_date) / SECS_IN_A_DAY
if last_scan_days_ago > oldest:
oldest=last_scan_days_ago
return oldest
####################################################################################################################################
# CheckAndRunBinClean(): calc days since last job ran to clean out the Recycle Bin
####################################################################################################################################
def CheckAndRunBinClean():
created_jobs=False
# get most recent clean_bin job
j = session.query(Job).filter(Job.name=='clean_bin').order_by(Job.id.desc()).first()
settings = session.query(Settings).first()
now=datetime.now(pytz.utc)
if not j or (now-j.last_update).days >= settings.scheduled_bin_cleanup:
print( f"INFO: Should force clean up bin path, del files older than {settings.bin_cleanup_file_age} days old" )
job=Job(start_time=now, last_update=now, name="clean_bin", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(job)
created_jobs=True
return created_jobs
####################################################################################################################################
# ScheduledJobs() is triggered when any job is run, or when the socket times out once a day. It uses the settings to find any time
# based jobs that should run (e.g. last scanned a path X day(s) ago, then scan now), etc. X is defined in settings
####################################################################################################################################
def ScheduledJobs():
print("DEBUG: Time to check for any scheduled jobs needing to be run" )
created_jobs=False
ndays_since_last_im_scan = DaysSinceLastScan( "Import" )
ndays_since_last_st_scan = DaysSinceLastScan( "Storage" )
settings = session.query(Settings).first()
now=datetime.now(pytz.utc)
if ndays_since_last_im_scan >= settings.scheduled_import_scan:
print( "INFO: Time to force an import scan" )
job=Job(start_time=now, last_update=now, name="scannow", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(job)
created_jobs=True
if ndays_since_last_st_scan >= settings.scheduled_storage_scan:
print( "INFO: Time to force a storage scan" )
job=Job(start_time=now, last_update=now, name="scan_sp", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(job)
created_jobs=True
if CheckAndRunBinClean():
created_jobs=True
return created_jobs
####################################################################################################################################
# MAIN - start with validation, then grab any jobs in the DB to process, then
@@ -1933,7 +2038,18 @@ if __name__ == "__main__":
HandleJobs(True)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
# force timeout every 1 day so we can run scheduled jobs
s.settimeout(SECS_IN_A_DAY)
s.listen()
while True:
conn, addr = s.accept()
HandleJobs(False)
try:
conn, addr = s.accept()
except socket.timeout:
if ScheduledJobs():
HandleJobs(False)
continue
else:
HandleJobs(False)
# in case we constantly have jobs running, the '1 day' last import might be missed, so check it after each job too
if ScheduledJobs():
HandleJobs(False)