fix BUG-124 - pa job mgr does not have timestamps, vi creating/use of PAprint()
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
#
|
||||
|
||||
#
|
||||
# This file controls the 'external' job control manager, that (periodically #
|
||||
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
||||
@@ -508,6 +508,12 @@ class PA_JobManager_FE_Message(Base):
|
||||
return "<id: {}, job_id: {}, level: {}, message: {}".format(self.id, self.job_id, self.level, self.message)
|
||||
|
||||
|
||||
##############################################################################
|
||||
# Class describing PA_UserState and in the DB (via sqlalchemy)
|
||||
# the state for a User defines a series of remembered states for a user
|
||||
# to optimise their viewing, etc. If we scan and fine new files, we need to
|
||||
# invalidate these cached values, so we have this class here just for that
|
||||
##############################################################################
|
||||
class PA_UserState(Base):
|
||||
__tablename__ = "pa_user_state"
|
||||
id = Column(Integer, Sequence('pa_user_state_id_seq'), primary_key=True )
|
||||
@@ -538,6 +544,19 @@ class PA_UserState(Base):
|
||||
return f"<pa_user_dn: {self.pa_user_dn}, path_type: {self.path_type}, noo: {self.noo}, grouping: {self.grouping}, how_many: {self.how_many}, st_offset: {self.st_offset}, size: {self.size}, folders: {self.folders}, root: {self.root}, cwd: {self.cwd}, view_eid: {self.view_eid}, orig_ptype: {self.orig_ptype}, orig_search_term: {self.orig_search_term}, orig_url: {self.orig_url}, current={self.current}, first_eid={self.first_eid}, last_eid={self.last_eid}, num_entries={self.num_entries}>"
|
||||
|
||||
|
||||
##############################################################################
|
||||
# PAprint(): convenience function to prepend a timestamp to a printed string
|
||||
##############################################################################
|
||||
def PAprint( msg ):
|
||||
now=datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
|
||||
if 'INFO:' in msg or 'WARN:' in msg:
|
||||
msg = ' '+msg
|
||||
if 'DEBUG:' in msg or 'ERROR:' in msg or 'WARN:' in msg or 'INFO:' in msg:
|
||||
print( f"{now} {msg}" )
|
||||
else:
|
||||
print( f"{now} DEBUG: {msg}" )
|
||||
return
|
||||
|
||||
##############################################################################
|
||||
# NewJob(): convenience function to create a job, appropriately
|
||||
##############################################################################
|
||||
@@ -577,7 +596,7 @@ def MessageToFE( job_id, message, level, persistent, cant_close ):
|
||||
def SettingsRBPath():
|
||||
settings = session.query(Settings).first()
|
||||
if settings == None:
|
||||
print("ERROR: Cannot create file data with no settings / recycle bin path is missing")
|
||||
PAprint("ERROR: Cannot create file data with no settings / recycle bin path is missing")
|
||||
return None
|
||||
# path setting is an absolute path, just use it, otherwise prepend base_path first
|
||||
if settings.recycle_bin_path[0] == '/':
|
||||
@@ -615,7 +634,7 @@ def ProcessRecycleBinDir(job):
|
||||
def SettingsSPath():
|
||||
settings = session.query(Settings).first()
|
||||
if settings == None or settings.storage_path == "":
|
||||
print("ERROR: Cannot create file data with no settings / storage path is missing")
|
||||
PAprint("ERROR: Cannot create file data with no settings / storage path is missing")
|
||||
return None
|
||||
if settings.storage_path[0] == '/':
|
||||
path=settings.storage_path
|
||||
@@ -642,10 +661,9 @@ def ProcessStorageDirs(parent_job):
|
||||
# SettingsIPath(): return import path (abs or add base_path if needed)
|
||||
##############################################################################
|
||||
def SettingsIPath():
|
||||
paths=[]
|
||||
settings = session.query(Settings).first()
|
||||
if not settings or settings.import_path == "":
|
||||
print("ERROR: Cannot create file data with no settings / import path is missing")
|
||||
PAprint("ERROR: Cannot create file data with no settings / import path is missing")
|
||||
return None
|
||||
if settings.import_path[0] == '/':
|
||||
path=settings.import_path
|
||||
@@ -839,7 +857,7 @@ def JobMetadata(job):
|
||||
def AddLogForJob(job, message):
|
||||
now=datetime.now(pytz.utc)
|
||||
log=Joblog( job_id=job.id, log=message, log_date=now )
|
||||
job.last_update=datetime.now(pytz.utc)
|
||||
job.last_update=now
|
||||
session.add(log)
|
||||
# some logs have DEBUG: in front, so clean that up
|
||||
message = message.replace("DEBUG:", "" )
|
||||
@@ -851,7 +869,7 @@ def AddLogForJob(job, message):
|
||||
else:
|
||||
job.last_commit = now
|
||||
if DEBUG:
|
||||
print( f"DEBUG: {message}" )
|
||||
PAprint( f"{message}" )
|
||||
return
|
||||
|
||||
##############################################################################
|
||||
@@ -920,7 +938,7 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed", level=
|
||||
session.commit()
|
||||
MessageToFE( job_id=job.id, message=last_log, level=level, persistent=persistent, cant_close=cant_close )
|
||||
if DEBUG:
|
||||
print( f"DEBUG: {last_log}" )
|
||||
PAprint( f"DEBUG: {last_log}" )
|
||||
return
|
||||
|
||||
##############################################################################
|
||||
@@ -931,13 +949,13 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed", level=
|
||||
##############################################################################
|
||||
def HandleJobs(first_run=False):
|
||||
if first_run:
|
||||
print("INFO: PA job manager is starting up - check for stale jobs" )
|
||||
PAprint("INFO: PA job manager is starting up - check for stale jobs" )
|
||||
else:
|
||||
if DEBUG:
|
||||
print("DEBUG: PA job manager is scanning for new jobs to process")
|
||||
PAprint("DEBUG: PA job manager is scanning for new jobs to process")
|
||||
for job in session.query(Job).filter(Job.pa_job_state != 'Complete').all():
|
||||
if first_run and job.pa_job_state == 'In Progress':
|
||||
print( f"INFO: Found stale job#{job.id} - {job.name}" )
|
||||
PAprint( f"INFO: Found stale job#{job.id} - {job.name}" )
|
||||
job.pa_job_state = 'Stale'
|
||||
session.add(job)
|
||||
AddLogForJob( job, "ERROR: Job has been marked stale as it did not complete" )
|
||||
@@ -949,17 +967,17 @@ def HandleJobs(first_run=False):
|
||||
j2 = session.query(Job).get(job.wait_for)
|
||||
if not j2:
|
||||
AddLogForJob( job, f"ERROR: waiting for a job#({job.wait_for}) that does not exist? ")
|
||||
print(f"ERROR: job.wait_for ({job.wait_for}) does not exist in below? " )
|
||||
PAprint(f"ERROR: job.wait_for ({job.wait_for}) does not exist in below? " )
|
||||
for j in session.query(Job).all():
|
||||
print(f"ERROR: j={j.id}")
|
||||
PAprint(f"ERROR: j={j.id}")
|
||||
continue
|
||||
if j2.pa_job_state != 'Completed':
|
||||
continue
|
||||
|
||||
# use this to remove threads for easier debugging, and errors will stacktrace to the console
|
||||
if DEBUG:
|
||||
print("*************************************")
|
||||
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
|
||||
PAprint("*************************************")
|
||||
PAprint("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
|
||||
RunJob(job)
|
||||
else:
|
||||
try:
|
||||
@@ -969,8 +987,8 @@ def HandleJobs(first_run=False):
|
||||
try:
|
||||
MessageToFE( job_id=job.id, level="danger", message="Failed with: {} (try job log for details)".format(e), persistent=True, cant_close=False )
|
||||
except Exception as e2:
|
||||
print("ERROR: Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
||||
print("INFO: PA job manager is waiting for a job")
|
||||
PAprint("ERROR: Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
||||
PAprint("INFO: PA job manager is waiting for a job")
|
||||
return
|
||||
|
||||
##############################################################################
|
||||
@@ -1134,7 +1152,7 @@ def CreateSymlink(job,ptype,path):
|
||||
path_type = session.query(PathType).get(ptype)
|
||||
symlink=SymlinkName(path_type.name, path, path)
|
||||
if not os.path.exists(symlink):
|
||||
print( f"INFO: symlink does not exist, actually creating it -- s={symlink}" )
|
||||
PAprint( f"INFO: symlink does not exist, actually creating it -- s={symlink}" )
|
||||
try:
|
||||
# SAFE: SafePath() on init forces symlink to be safe
|
||||
os.makedirs( os.path.dirname(symlink), mode=0o777, exist_ok=True )
|
||||
@@ -1252,25 +1270,25 @@ def RemoveEmptyDirFromDB( job, del_me ):
|
||||
####################################################################################################################################
|
||||
def CleanUpDirInDB(job, e):
|
||||
session.commit()
|
||||
print( f"CleanUpDirInDB(): checking dir: {e.FullPathOnFS()} ({e.id})" )
|
||||
PAprint( f"CleanUpDirInDB(): checking dir: {e.FullPathOnFS()} ({e.id})" )
|
||||
content = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==e.id).first()
|
||||
if not content:
|
||||
print( f" Dir {e.FullPathOnFS()} - {e.id} is empty - removing it" )
|
||||
print( f" Entry {e}" )
|
||||
PAprint( f" Dir {e.FullPathOnFS()} - {e.id} is empty - removing it" )
|
||||
PAprint( f" Entry {e}" )
|
||||
# if no in_dir, we are at the root of the path, STOP
|
||||
if not e.in_dir:
|
||||
print( " Parent is empty, so NEVER delete this entry, returning" )
|
||||
PAprint( " Parent is empty, so NEVER delete this entry, returning" )
|
||||
return
|
||||
# okay remove this empty dir
|
||||
RemoveEmtpyDirFromFS( job, e )
|
||||
RemoveEmptyDirFromDB( job, e )
|
||||
# get an Entry from DB (in_dir is a Dir/we need the ORM entry for code to work)
|
||||
parent_dir = session.query(Entry).get(e.in_dir.eid)
|
||||
print( f" Dir {e.FullPathOnFS()} is in {parent_dir.FullPathOnFS()} ({parent_dir.id}) -> check next" )
|
||||
PAprint( f" Dir {e.FullPathOnFS()} is in {parent_dir.FullPathOnFS()} ({parent_dir.id}) -> check next" )
|
||||
# check to see if removing the empty dir has left the parent dir empty
|
||||
CleanUpDirInDB(job, parent_dir)
|
||||
else:
|
||||
print( f"There is content (first entry: {content.name}) in {e.FullPathOnFS()} - finished for this dir" )
|
||||
PAprint( f"There is content (first entry: {content.name}) in {e.FullPathOnFS()} - finished for this dir" )
|
||||
return
|
||||
|
||||
####################################################################################################################################
|
||||
@@ -1360,7 +1378,7 @@ def MoveFileToRecycleBin(job,del_me):
|
||||
# SAFE: as SafePaths(rbpath) combined with data I control in this func (explicit remove of 'static/' + DB entry path)
|
||||
os.replace( src, dst )
|
||||
if DEBUG:
|
||||
print( f"MoveFileToRecycleBin({job.id},{del_me.name}): os.replace {src} with {dst} " )
|
||||
PAprint( f"MoveFileToRecycleBin({job.id},{del_me.name}): os.replace {src} with {dst} " )
|
||||
except Exception as e:
|
||||
AddLogForJob( job, f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}")
|
||||
|
||||
@@ -1414,7 +1432,7 @@ def MoveFileToRecycleBin(job,del_me):
|
||||
####################################################################################################################################
|
||||
def MoveEntriesToOtherFolder(job, move_me, dst_storage_path, dst_rel_path):
|
||||
if DEBUG:
|
||||
print( f"DEBUG: MoveEntriesToOtherFolder( job={job.id}, move_me={move_me.name}, dst_storage_path={dst_storage_path.id}, dst_rel_path={dst_rel_path})")
|
||||
PAprint( f"DEBUG: MoveEntriesToOtherFolder( job={job.id}, move_me={move_me.name}, dst_storage_path={dst_storage_path.id}, dst_rel_path={dst_rel_path})")
|
||||
orig_name=move_me.name
|
||||
orig_fs_pos=move_me.FullPathOnFS()
|
||||
|
||||
@@ -1514,16 +1532,16 @@ def CreateFSLocation( job, dst_path, dst_locn ):
|
||||
####################################################################################################################################
|
||||
def ResetAnySubdirPaths( moving_dir, dst_storage_path, parent_rel_path ):
|
||||
if DEBUG:
|
||||
print( f"ResetAnySubdirPaths( {moving_dir.name}, {dst_storage_path.path_prefix}, {parent_rel_path} )" )
|
||||
PAprint( f"ResetAnySubdirPaths( {moving_dir.name}, {dst_storage_path.path_prefix}, {parent_rel_path} )" )
|
||||
sub_dirs = session.query(Entry).join(FileType).join(EntryDirLink).filter(EntryDirLink.dir_eid==moving_dir.id).filter(FileType.name=='Directory').all()
|
||||
for sub in sub_dirs:
|
||||
if DEBUG:
|
||||
print( f"ResetAnySubdirPaths: WAS sub={sub.name}, ip={sub.in_dir.in_path.path_prefix}, rp={sub.dir_details.rel_path}" )
|
||||
PAprint( f"ResetAnySubdirPaths: WAS sub={sub.name}, ip={sub.in_dir.in_path.path_prefix}, rp={sub.dir_details.rel_path}" )
|
||||
sub.in_path = dst_storage_path
|
||||
sub.dir_details.in_path = dst_storage_path
|
||||
sub.dir_details.rel_path = parent_rel_path + '/' + sub.name
|
||||
if DEBUG:
|
||||
print( f"ResetAnySubdirPaths: NOW sub={sub.name}, ip={sub.in_dir.in_path.path_prefix}, rp={sub.dir_details.rel_path}" )
|
||||
PAprint( f"ResetAnySubdirPaths: NOW sub={sub.name}, ip={sub.in_dir.in_path.path_prefix}, rp={sub.dir_details.rel_path}" )
|
||||
ResetAnySubdirPaths( sub, dst_storage_path, sub.dir_details.rel_path )
|
||||
return
|
||||
|
||||
@@ -1669,7 +1687,7 @@ def JobImportDir(job):
|
||||
ptype = session.query(PathType).get(path_type)
|
||||
AddLogForJob(job, f"Checking {ptype.name} Directory: {path}" )
|
||||
if DEBUG:
|
||||
print( f"DEBUG: Checking Directory: {path}" )
|
||||
PAprint( f"DEBUG: Checking Directory: {path}" )
|
||||
if not os.path.exists( path ):
|
||||
WithdrawDependantJobs( job, job.id, "scan job found no new files to process" )
|
||||
FinishJob( job, f"Finished Importing: {path} -- Path does not exist", "Failed" )
|
||||
@@ -1740,7 +1758,7 @@ def JobImportDir(job):
|
||||
# use ctime as even a metadata change (mv'd file on the fs, or a perms change) needs to be checked
|
||||
if stat.st_ctime > dir.last_import_date:
|
||||
if DEBUG:
|
||||
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
|
||||
PAprint("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
|
||||
if isImage(fname):
|
||||
type_str = 'Image'
|
||||
elif isVideo(fname):
|
||||
@@ -1754,7 +1772,7 @@ def JobImportDir(job):
|
||||
found_new_files += 1
|
||||
else:
|
||||
if DEBUG:
|
||||
print( f"DEBUG: { basename} - {stat.st_ctime} is OLDER than {dir.last_import_date}" )
|
||||
PAprint( f"DEBUG: { basename} - {stat.st_ctime} is OLDER than {dir.last_import_date}" )
|
||||
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==basename,Dir.eid==dir.eid).first()
|
||||
e.exists_on_fs=True
|
||||
job.current_file=basename
|
||||
@@ -1901,7 +1919,7 @@ def JobTransformImage(job):
|
||||
id=[jex.value for jex in job.extra if jex.name == "id"][0]
|
||||
amt=[jex.value for jex in job.extra if jex.name == "amt"][0]
|
||||
e=session.query(Entry).join(File).filter(Entry.id==id).first()
|
||||
print( f"JobTransformImage: job={job.id}, id={id}, amt={amt}" )
|
||||
PAprint( f"JobTransformImage: job={job.id}, id={id}, amt={amt}" )
|
||||
|
||||
if amt == "fliph":
|
||||
AddLogForJob(job, f"INFO: Flipping {e.FullPathOnFS()} horizontally" )
|
||||
@@ -1916,11 +1934,11 @@ def JobTransformImage(job):
|
||||
else:
|
||||
AddLogForJob(job, f"INFO: Rotating {e.FullPathOnFS()} by {amt} degrees" )
|
||||
p = subprocess.run([PA_EXIF_ROTATER, e.FullPathOnFS(), amt ], capture_output=True )
|
||||
print(p)
|
||||
PAprint(p)
|
||||
settings = session.query(Settings).first()
|
||||
e.file_details.thumbnail, _ , _ = GenThumb( e.FullPathOnFS(), settings.auto_rotate )
|
||||
e.file_details.hash = md5( job, e )
|
||||
print( f"JobTransformImage DONE thumb: job={job.id}, id={id}, amt={amt}" )
|
||||
PAprint( f"JobTransformImage DONE thumb: job={job.id}, id={id}, amt={amt}" )
|
||||
session.add(e)
|
||||
FinishJob(job, "Finished Processesing image rotation/flip")
|
||||
return
|
||||
@@ -1941,7 +1959,7 @@ def GenHashAndThumb(job, e):
|
||||
# use mtime as only if the content is different do we need to redo the hash
|
||||
if stat.st_mtime < e.file_details.last_hash_date:
|
||||
if DEBUG:
|
||||
print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
|
||||
PAprint(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
|
||||
job.current_file_num+=1
|
||||
return
|
||||
|
||||
@@ -1949,7 +1967,7 @@ def GenHashAndThumb(job, e):
|
||||
# same hash and we already have a thumbnail-> just return
|
||||
if new_hash == e.file_details.hash and e.file_details.thumbnail:
|
||||
if DEBUG:
|
||||
print(f"OPTIM: GenHashAndThumb {e.name} md5 is same - likely a mv on filesystem so skip md5/thumb")
|
||||
PAprint(f"OPTIM: GenHashAndThumb {e.name} md5 is same - likely a mv on filesystem so skip md5/thumb")
|
||||
job.current_file_num+=1
|
||||
return
|
||||
e.file_details.hash = new_hash
|
||||
@@ -1967,7 +1985,7 @@ def GenHashAndThumb(job, e):
|
||||
####################################################################################################################################
|
||||
def ProcessFilesInDir(job, e, file_func, count_dirs):
|
||||
if DEBUG:
|
||||
print( f"DEBUG: ProcessFilesInDir: {e.FullPathOnFS()}")
|
||||
PAprint( f"DEBUG: ProcessFilesInDir: {e.FullPathOnFS()}")
|
||||
if e.type.name != 'Directory':
|
||||
file_func(job, e)
|
||||
else:
|
||||
@@ -1987,7 +2005,7 @@ def JobGetFileDetails(job):
|
||||
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
||||
path_prefix=[jex.value for jex in job.extra if jex.name == "path_prefix"][0]
|
||||
if DEBUG:
|
||||
print( f"DEBUG: JobGetFileDetails for path={path_prefix}" )
|
||||
PAprint( f"DEBUG: JobGetFileDetails for path={path_prefix}" )
|
||||
p=session.query(Path).filter(Path.path_prefix==path_prefix).first()
|
||||
job.current_file_num = 0
|
||||
job.num_files = p.num_files
|
||||
@@ -2251,11 +2269,11 @@ def CopyOverrides():
|
||||
session.execute( text( f"delete from {tbl}" ) )
|
||||
session.commit()
|
||||
except Exception as ex:
|
||||
print( f"ERROR: there are existing tmp tables when processing metadata. This SHOULD NEVER HAPPEN - manual intervention needed" )
|
||||
print( f"ERROR: most likely the job manager was killed during processing metadata - you may want to manually put" )
|
||||
print( f"ERROR: the contents of the 'tmp_*' tables back into their corresponding official metadata tables " )
|
||||
print( f"ERROR: and try to restart the job manager" )
|
||||
print( f"ERROR: orig ex: {ex}" )
|
||||
PAprint( f"ERROR: there are existing tmp tables when processing metadata. This SHOULD NEVER HAPPEN - manual intervention needed" )
|
||||
PAprint( f"ERROR: most likely the job manager was killed during processing metadata - you may want to manually put" )
|
||||
PAprint( f"ERROR: the contents of the 'tmp_*' tables back into their corresponding official metadata tables " )
|
||||
PAprint( f"ERROR: and try to restart the job manager" )
|
||||
PAprint( f"ERROR: orig ex: {ex}" )
|
||||
exit( 1 )
|
||||
return
|
||||
|
||||
@@ -2269,8 +2287,8 @@ def GetFaceInMetadata(fname):
|
||||
face_data=file_h.read(-1)
|
||||
file_h.close()
|
||||
except Exception as ex:
|
||||
print( f"ERROR: FATAL tried to read in override data and cant read content" )
|
||||
print( f"ERROR: manual intervention needed - exc={ex}" )
|
||||
PAprint( f"ERROR: FATAL tried to read in override data and cant read content" )
|
||||
PAprint( f"ERROR: manual intervention needed - exc={ex}" )
|
||||
exit(1)
|
||||
return face_data
|
||||
|
||||
@@ -2302,7 +2320,7 @@ def ReloadMetadata(job):
|
||||
otype = session.query(FaceOverrideType).filter(FaceOverrideType.name==type_name).one()
|
||||
face_data=GetFaceInMetadata(fname)
|
||||
if DEBUG:
|
||||
print( f"Found metadata showing Override of type: {type_name}" )
|
||||
PAprint( f"DEBUG: Found metadata showing Override of type: {type_name}" )
|
||||
|
||||
# check that both the id and data match - if so make new FaceNoMatch otherwise Disco*FaceNoMatch
|
||||
face=session.query( Face ).filter( Face.id==face_id ).filter( Face.face == face_data ). first()
|
||||
@@ -2315,7 +2333,7 @@ def ReloadMetadata(job):
|
||||
# SAFE: as SafePaths(mpath) combined with data I control in this func
|
||||
os.replace( fname, f'{mpath}no_match_overrides/0_{otype.name}_{md5face(face_data)}' )
|
||||
except Exception as ex:
|
||||
print( f"ERROR: renaming no-match metadata on filesystem failed: {ex}" )
|
||||
PAprint( f"ERROR: renaming no-match metadata on filesystem failed: {ex}" )
|
||||
|
||||
# process Metadata on FS for force_match_overrides (disco ones, will have 0 as face_id)
|
||||
fnames = glob.glob( f'{mpath}force_match_overrides/*' )
|
||||
@@ -2326,11 +2344,11 @@ def ReloadMetadata(job):
|
||||
person_tag=match.group(2)
|
||||
p = session.query(Person).filter(Person.tag==person_tag).first()
|
||||
if not p:
|
||||
print( f"There is a metadata override on the file system for person: {person_tag} - but they are no longer in the DB - skip" )
|
||||
PAprint( f"INFO: There is a metadata override on the file system for person: {person_tag} - but they are no longer in the DB - skip" )
|
||||
continue
|
||||
face_data=GetFaceInMetadata(fname)
|
||||
if DEBUG:
|
||||
print( f"Found metadata showing Override match for person: {person_tag}" )
|
||||
PAprint( f"DEBUG: Found metadata showing Override match for person: {person_tag}" )
|
||||
|
||||
# check that both the id and data match - if so make new FaceNoMatch otherwise Disco*FaceNoMatch
|
||||
face=session.query( Face ).filter( Face.id==face_id ).filter( Face.face == face_data ).first()
|
||||
@@ -2344,13 +2362,13 @@ def ReloadMetadata(job):
|
||||
# SAFE: as SafePaths(mpath) combined with data I control in this func
|
||||
os.replace( fname, f'{mpath}force_match_overrides/0_{p.tag}_{md5face(face_data)}' )
|
||||
except Exception as ex:
|
||||
print( f"ERROR: renaming force-match metadata on filesystem failed: {ex}" )
|
||||
PAprint( f"ERROR: renaming force-match metadata on filesystem failed: {ex}" )
|
||||
|
||||
|
||||
# now process each of the tmp tables for anything that was in the DB but not on FS (e.g rm'd metadata)
|
||||
overrides=session.execute( text( "select face_id, type_id from tmp_face_no_match_override" ) )
|
||||
for o in overrides:
|
||||
print( f"F Force Match: o.face_id={o.face_id}" )
|
||||
PAprint( f"F Force Match: o.face_id={o.face_id}" )
|
||||
print( f"F No Match: o.type_id={o.type_id}" )
|
||||
nmo=session.query(FaceNoMatchOverride).filter(FaceNoMatchOverride.face_id==o.face_id).filter(FaceNoMatchOverride.type_id==o.type_id).first()
|
||||
if not nmo:
|
||||
@@ -2358,22 +2376,25 @@ def ReloadMetadata(job):
|
||||
|
||||
overrides=session.execute( text( "select face_id, person_id from tmp_face_force_match_override" ) )
|
||||
for o in overrides:
|
||||
print( f"F Force Match: o.face_id={o.face_id}" )
|
||||
print( f"F Force Match: o.person_id={o.person_id}" )
|
||||
if DEBUG:
|
||||
PAprint( f"F Force Match: o.face_id={o.face_id}" )
|
||||
PAprint( f"F Force Match: o.person_id={o.person_id}" )
|
||||
fmo=session.query(FaceForceMatchOverride).filter(FaceForceMatchOverride.face_id==o.face_id,FaceForceMatchOverride.person_id==o.person_id).first()
|
||||
if not fmo:
|
||||
session.add( FaceForceMatchOverride( face_id=o.face_id, person_id=o.person_id ) )
|
||||
|
||||
overrides=session.execute( text( "select face, type_id from tmp_disconnected_no_match_override" ) )
|
||||
for o in overrides:
|
||||
print( f"D No Match: o.type_id={o.type_id}" )
|
||||
if DEBUG:
|
||||
PAprint( f"D No Match: o.type_id={o.type_id}" )
|
||||
dnmo=session.query(DisconnectedNoMatchOverride).filter(DisconnectedNoMatchOverride.face==o.face).filter(DisconnectedNoMatchOverride.type_id==o.type_id).first()
|
||||
if not dnmo:
|
||||
session.add( DisconnectedNoMatchOverride( face=o.face, type_id=o.type_id ) )
|
||||
|
||||
overrides=session.execute( text( "select face, person_id from tmp_disconnected_force_match_override" ) )
|
||||
for o in overrides:
|
||||
print( f"D Force Match: o.person_id={o.person_id}" )
|
||||
if DEBUG:
|
||||
PAprint( f"D Force Match: o.person_id={o.person_id}" )
|
||||
dfmo=session.query(DisconnectedForceMatchOverride).filter(DisconnectedForceMatchOverride.face==o.face).filter(DisconnectedForceMatchOverride.person_id==o.person_id).first()
|
||||
if not dfmo:
|
||||
session.add( DisconnectedForceMatchOverride( face=o.face, person_id=o.person_id ) )
|
||||
@@ -2394,7 +2415,6 @@ def ReloadMetadata(job):
|
||||
# If there is content in the Bin already, its logs this - mostly useful when testing)
|
||||
####################################################################################################################################
|
||||
def InitialValidationChecks():
|
||||
now=datetime.now(pytz.utc)
|
||||
job=NewJob( name="init", num_files=0, wait_for=None, jex=None, parent_job=None, desc="initialise photo assistant" )
|
||||
job.start_time=datetime.now(pytz.utc)
|
||||
JobProgressState( job, "In Progress" )
|
||||
@@ -2407,12 +2427,12 @@ def InitialValidationChecks():
|
||||
if len(dirs) + len(files) > 0:
|
||||
AddLogForJob(job, "INFO: the bin path contains content, cannot process to know where original deletes were form - skipping content!" )
|
||||
AddLogForJob(job, "TODO: could be smart about what is known in the DB vs on the FS, and change below to an ERROR if it is one")
|
||||
AddLogForJob(job, "WARNING: IF the files in the bin are in the DB (succeeded from GUI deletes) then this is okay, otherwise you should delete contents form the recycle bin and restart the job manager)" )
|
||||
AddLogForJob(job, "WARN: IF the files in the bin are in the DB (succeeded from GUI deletes) then this is okay, otherwise you should delete contents form the recycle bin and restart the job manager)" )
|
||||
# create symlink and Path/Dir if needed
|
||||
ProcessRecycleBinDir(job)
|
||||
rbp_exists=1
|
||||
except Exception as ex:
|
||||
print( f"FATAL ERROR: Failed to walk the recycle bin at {path} Err:{ex}" )
|
||||
PAprint( f"FATAL ERROR: Failed to walk the recycle bin at {path} Err:{ex}" )
|
||||
else:
|
||||
AddLogForJob(job, "ERROR: The bin path in settings does not exist - Please fix now");
|
||||
sp_exists=0
|
||||
@@ -2483,7 +2503,7 @@ def AddFaceToFile( locn_data, face_data, file_eid, model_id, settings ):
|
||||
# SAFE: as SafePaths(mpath) combined with data I control in this func
|
||||
os.replace( fname, new_fname )
|
||||
except Exception as ex:
|
||||
print( f"ERROR: AddFaceToFile-face connects to 'disconnected-force-match' metadata, but fixing the filesystem metadata failed: {ex}" )
|
||||
PAprint( f"ERROR: AddFaceToFile-face connects to 'disconnected-force-match' metadata, but fixing the filesystem metadata failed: {ex}" )
|
||||
|
||||
dnmo=session.query(DisconnectedNoMatchOverride).filter(DisconnectedNoMatchOverride.face==face.face).first()
|
||||
if dnmo:
|
||||
@@ -2499,7 +2519,7 @@ def AddFaceToFile( locn_data, face_data, file_eid, model_id, settings ):
|
||||
# SAFE: as SafePaths(mpath) combined with data I control in this func
|
||||
os.replace( fname, new_fname )
|
||||
except Exception as ex:
|
||||
print( f"ERROR: AddFaceToFile-face connects to 'disconnected-no-match' metadata, but fixing the filesystem metadata failed: {ex}" )
|
||||
PAprint( f"ERROR: AddFaceToFile-face connects to 'disconnected-no-match' metadata, but fixing the filesystem metadata failed: {ex}" )
|
||||
|
||||
return
|
||||
|
||||
@@ -2699,7 +2719,7 @@ def CheckAndRunBinClean():
|
||||
|
||||
now=datetime.now(pytz.utc)
|
||||
if not j or (now-j.last_update).days >= settings.scheduled_bin_cleanup:
|
||||
print( f"INFO: Should force clean up bin path, del files older than {settings.bin_cleanup_file_age} days old" )
|
||||
PAprint( f"INFO: Should force clean up bin path, del files older than {settings.bin_cleanup_file_age} days old" )
|
||||
NewJob( name="clean_bin", num_files=0, wait_for=None, jex=None, parent_job=None, desc="periodic clean up on Bin path" )
|
||||
created_jobs=True
|
||||
return created_jobs
|
||||
@@ -2709,7 +2729,7 @@ def CheckAndRunBinClean():
|
||||
# based jobs that should run (e.g. last scanned a path X day(s) ago, then scan now), etc. X is defined in settings
|
||||
####################################################################################################################################
|
||||
def ScheduledJobs():
|
||||
print("DEBUG: Time to check for any scheduled jobs needing to be run" )
|
||||
PAprint("DEBUG: Time to check for any scheduled jobs needing to be run" )
|
||||
|
||||
created_jobs=False
|
||||
|
||||
@@ -2719,11 +2739,11 @@ def ScheduledJobs():
|
||||
settings = session.query(Settings).first()
|
||||
now=datetime.now(pytz.utc)
|
||||
if ndays_since_last_im_scan >= settings.scheduled_import_scan:
|
||||
print( f"INFO: Time to force an import scan, last scan was {ndays_since_last_im_scan} days ago" )
|
||||
PAprint( f"INFO: Time to force an import scan, last scan was {ndays_since_last_im_scan} days ago" )
|
||||
NewJob( name="scan_ip", num_files=0, wait_for=None, jex=None, parent_job=None, desc="periodic clean scan for new files in Import path" )
|
||||
created_jobs=True
|
||||
if ndays_since_last_st_scan >= settings.scheduled_storage_scan:
|
||||
print( f"INFO: Time to force a storage scan, last scan was {ndays_since_last_st_scan}" )
|
||||
PAprint( f"INFO: Time to force a storage scan, last scan was {ndays_since_last_st_scan}" )
|
||||
NewJob( name="scan_sp", num_files=0, wait_for=None, jex=None, parent_job=None, desc="periodic clean scan for new files in Storage path" )
|
||||
created_jobs=True
|
||||
if CheckAndRunBinClean():
|
||||
@@ -2736,7 +2756,7 @@ def ScheduledJobs():
|
||||
# go into waiting on a socket to be woken up (and then if woken, back into HandleJobs()
|
||||
####################################################################################################################################
|
||||
if __name__ == "__main__":
|
||||
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
|
||||
PAprint("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
|
||||
|
||||
InitialValidationChecks()
|
||||
|
||||
@@ -2750,11 +2770,11 @@ if __name__ == "__main__":
|
||||
try:
|
||||
conn, addr = s.accept()
|
||||
if DEBUG:
|
||||
print( f"accept finished, tout={s.timeout}" )
|
||||
PAprint( f"accept finished, tout={s.timeout}" )
|
||||
|
||||
except socket.timeout:
|
||||
if DEBUG:
|
||||
print( f"timeout occurred, tout={s.timeout}" )
|
||||
PAprint( f"timeout occurred, tout={s.timeout}" )
|
||||
if ScheduledJobs():
|
||||
HandleJobs(False)
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user