From 6e2d04cd76a0eae4bdae0310fe3e60b92488603c Mon Sep 17 00:00:00 2001 From: Damien De Paoli Date: Thu, 12 Aug 2021 23:22:05 +1000 Subject: [PATCH] updated comments --- pa_job_manager.py | 368 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 320 insertions(+), 48 deletions(-) diff --git a/pa_job_manager.py b/pa_job_manager.py index 75a1029..c1b8f9e 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -18,7 +18,6 @@ DEBUG=1 ### SQLALCHEMY IMPORTS ### - from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean from sqlalchemy.exc import SQLAlchemyError @@ -27,10 +26,7 @@ from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session -### SQLALCHEMY IMPORTS ### - ### LOCAL FILE IMPORTS ### - from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName, GenThumb from datetime import datetime, timedelta, date @@ -65,17 +61,17 @@ some_engine = create_engine(DB_URL) #Session = sessionmaker(bind=some_engine) # create a Session - session_factory = sessionmaker(bind=some_engine) Session = scoped_session(session_factory) session = Session() +# this creates the Base (like db model in flask) Base = declarative_base() ################################################################################ -# Class describing File in the database, and via sqlalchemy, connected to the DB as well -# This has to match one-for-one the DB table +# Class describing PathType & in the database (via sqlalchemy) +# series of pre-defined types of paths (import, storage, bin) ################################################################################ class PathType(Base): __tablename__ = "path_type" @@ -86,6 +82,10 @@ class PathType(Base): return f"" +################################################################################ +# Class describing EntryDirLink & in the database (via sqlalchemy) +# connects a path with its matching entry (dir) +################################################################################ class EntryDirLink(Base): __tablename__ = "entry_dir_link" entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True ) @@ -102,6 +106,9 @@ class EntryDirLink(Base): def __repr__(self): return f"" +################################################################################ +# Class describing Path & in the database (via sqlalchemy) +################################################################################ class Path(Base): __tablename__ = "path" id = Column(Integer, Sequence('path_id_seq'), primary_key=True ) @@ -113,6 +120,13 @@ class Path(Base): def __repr__(self): return f"" +################################################################################ +# Class describing Dir & in the database (via sqlalchemy) +# rel_path: rest of dir after path, e.g. if path = /..../storage, then +# rel_path could be 2021/20210101-new-years-day-pics +# in_path: only in this structure, not DB, quick ref to the path this dir is in +# PathOnFS(): method to get path on the FS for this dir +################################################################################ class Dir(Base): __tablename__ = "dir" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) @@ -126,6 +140,15 @@ class Dir(Base): def __repr__(self): return f"" +################################################################################ +# Class describing Entry and in the DB (via sqlalchemy) +# an entry is the common bits between files and dirs +# type is a convenience var only in this class, not in DB +# {dir|file}_etails are convenience data for the relevant details from the Dir +# or File class - not in DB +# in_dir - is the Dir that this entry is located in (convenience for class only) +# FullPathOnFS(): method to get path on the FS for this Entry +################################################################################ class Entry(Base): __tablename__ = "entry" id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) @@ -151,6 +174,14 @@ class Entry(Base): def __repr__(self): return f"" +################################################################################ +# Class describing File and in the DB (via sqlalchemy) +# all files are entries, this is the extra bits only for a file, of note: +# hash is unique for files, and used to validate duplicates +# woy == week of year, all date fields are used to sort/show content. Date +# info can be from exif, or file system, or file name (rarely) +# faces: convenience field to show connected face(s) for this file +################################################################################ class File(Base): __tablename__ = "file" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) @@ -168,6 +199,12 @@ class File(Base): def __repr__(self): return f"" +################################################################################ +# Class describing DelFile and in the DB (via sqlalchemy) +# used to track deleted files so they can be restored +# and keep all associated data intact, e.g. faces, etc. are connected to file_eid +# and orig_path_prefix allows restoration to the original path/dir +################################################################################ class DelFile(Base): __tablename__ = "del_file" file_eid = Column(Integer, ForeignKey("file.eid"), primary_key=True ) @@ -176,6 +213,10 @@ class DelFile(Base): def __repr__(self): return f"" +################################################################################ +# Class describing FileType and in the DB (via sqlalchemy) +# pre-defined list of file types (image, dir, etc.) +################################################################################ class FileType(Base): __tablename__ = "file_type" id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True ) @@ -184,6 +225,9 @@ class FileType(Base): def __repr__(self): return f"" +################################################################################ +# Class describing Settings and in the DB (via sqlalchemy) +################################################################################ class Settings(Base): __tablename__ = "settings" id = Column(Integer, Sequence('settings_id_seq'), primary_key=True ) @@ -197,6 +241,9 @@ class Settings(Base): def __repr__(self): return f"" +################################################################################ +# Class describing Person to Refimg link in DB via sqlalchemy +################################################################################ class PersonRefimgLink(Base): __tablename__ = "person_refimg_link" person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True) @@ -205,6 +252,9 @@ class PersonRefimgLink(Base): def __repr__(self): return f"" +################################################################################ +# Class describing Person in DB via sqlalchemy +################################################################################ class Person(Base): __tablename__ = "person" id = Column(Integer, Sequence('person_id_seq'), primary_key=True ) @@ -216,6 +266,16 @@ class Person(Base): def __repr__(self): return f"" +################################################################################ +# Class describing Refimg in DB via sqlalchemy +# fname: original file name of refimg +# face: actual binary of numpy data for this refimg's face (always only 1) +# orig*: width/height of original image, because when we show in person, it get scaled +# face_locn: location of ace - we need to know to draw green box around face locn (with orig* above) +# thumbnail: image data of actual img. once we load refimg, we only store this data, not the orig file +# model_used: which AI model (cnn or hog) used to create face +# person: read-only convenience field not in DB, just used in html +################################################################################ class Refimg(Base): __tablename__ = "refimg" id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True ) @@ -231,6 +291,10 @@ class Refimg(Base): def __repr__(self): return f"" +################################################################################ +# Class describing AIModel in DB via sqlalchemy +# pre-defined list of models (cnn, hog) +################################################################################ class AIModel(Base): __tablename__ = "ai_model" id = Column(Integer, primary_key=True ) @@ -239,6 +303,11 @@ class AIModel(Base): def __repr__(self): return f"" +################################################################################ +# Class describing Face in the database and DB via sqlalchemy +# - face contains the binary version of numpy array so we dont need to recalc it +# - locn is the pixel coords of the face (top, right, bottom, left) +################################################################################ class Face(Base): __tablename__ = "face" id = Column(Integer, Sequence('face_id_seq'), primary_key=True ) @@ -248,6 +317,13 @@ class Face(Base): def __repr__(self): return f"".format(self.id, self.job_id, self.name, self.value ) +################################################################################ +# Class describing Job and via sqlalchemy, connected to the DB as well +################################################################################ class Job(Base): __tablename__ = "job" id = Column(Integer, Sequence('job_id_seq'), primary_key=True ) @@ -312,6 +395,13 @@ class Job(Base): def __repr__(self): return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) + +############################################################################## +# Class describing PA_JobManager_FE_Message and in the DB (via sqlalchemy) +# the job manager (this code) can send a message back to the front end via the +# DB. has to be about a specific job_id and is success/danger, etc. (alert) +# and a message +################################################################################ class PA_JobManager_FE_Message(Base): __tablename__ = "pa_job_manager_fe_message" id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True ) @@ -322,7 +412,8 @@ class PA_JobManager_FE_Message(Base): return "job id={} {} (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) ) """ - ### FIXME: wait for job3 not job2! - job4=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 ) - session.add(job4) - session.commit() - if parent_job: - AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job4.id, job4.id, job4.name, job4.wait_for ) ) - HandleJobs() + + ### FIXME: wait for job3 not job2! + job4=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 ) + session.add(job4) + session.commit() + if parent_job: + AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job4.id, job4.id, job4.name, job4.wait_for ) ) + HandleJobs() return +############################################################################## +# ProcessFileForJob(): will set current_file in job for display in f/e job +# status, and if num_files is set of this job, then increment current file num +# to keep track of how far throught the job is (for display in f/e) +# and then add a log to say we have processed this file +############################################################################## def ProcessFileForJob(job, message, current_file): job.current_file=os.path.basename(current_file) if job.num_files: @@ -420,6 +534,12 @@ def ProcessFileForJob(job, message, current_file): AddLogForJob(job, message ) return +############################################################################## +# AddLogForJob(): add a log line to joblog, if the last time we wrote a log +# was over 5 seconds ago, then commit the log to the db, so in f/e we see +# progress no matter the speed of job log output +# also when DEBUG is set, print a debug log too +############################################################################## def AddLogForJob(job, message): now=datetime.now(pytz.utc) log=Joblog( job_id=job.id, log=message, log_date=now ) @@ -439,6 +559,11 @@ def AddLogForJob(job, message): print( f"DEBUG: {message}" ) return +############################################################################## +# RunJob(): runs the actual job, based on job.name we call the appropriate function +# if job.name is not 'known', then we catch all force a FinishJob to make sure +# it does not cause the job manager to choke on the unmatched job +############################################################################## def RunJob(job): # session = Session() job.start_time=datetime.now(pytz.utc) @@ -475,12 +600,19 @@ def RunJob(job): HandleJobs() return +############################################################################## +# CancelJob(): cancel this job, and if any other job is waiting on this job, +# then cancel it too (this is done recursively) +############################################################################## def CancelJob(job,id): for j in session.query(Job).filter(Job.wait_for==id).all(): FinishJob(j, f"Job (#{j.id}) has been withdrawn as the job being waited for #{job.id} failed", "Withdrawn" ) CancelJob(j, j.id) return +############################################################################## +# FinishJob(): finish this job off (if no overrides), its just marked completed +############################################################################## def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): job.state=state job.pa_job_state=pa_job_state @@ -493,6 +625,10 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): print( f"DEBUG: {last_log}" ) return +############################################################################## +# HandleJobs(): go through each job, if it New, then tackle it -- +# TODO: why not only retrieve New jobs from DB? +############################################################################## def HandleJobs(): if DEBUG==1: print("INFO: PA job manager is scanning for new jobs to process") @@ -525,12 +661,20 @@ def HandleJobs(): print("INFO: PA job manager is waiting for a job") return +############################################################################## +# JobProgressState(): set pa_job_state to 'In Progress', and state to whatever +# is passed in (so the state in the f/e can be whatever, but the job mgr only +# has accurate pa_job_states +############################################################################## def JobProgressState( job, state ): job.pa_job_state = "In Progress" job.state=state session.commit() return +############################################################################## +# JobScanNow(): start and process the job to start scanning now (import paths) +############################################################################## def JobScanNow(job): JobProgressState( job, "In Progress" ) ProcessImportDirs(job) @@ -539,6 +683,9 @@ def JobScanNow(job): session.commit() return +############################################################################## +# JobScanStorageDir(): start and process the job to start scanning now (storage paths) +############################################################################## def JobScanStorageDir(job): JobProgressState( job, "In Progress" ) ProcessStorageDirs(job) @@ -547,6 +694,10 @@ def JobScanStorageDir(job): session.commit() return +############################################################################## +# JobForceScan(): start and process the job to force scanning now - so delete +# all attached data in DB, then scan import and storage paths +############################################################################## def JobForceScan(job): JobProgressState( job, "In Progress" ) session.query(FaceFileLink).delete() @@ -567,7 +718,10 @@ def JobForceScan(job): session.commit() return -# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists +############################################################################## +# CreateSymlink(): to serve static content of the images, we create a symlink +# from inside the static subdir of each import_path that exists +############################################################################## def CreateSymlink(job,ptype,path): path_type = session.query(PathType).get(ptype) symlink=SymlinkName(path_type.name, path, path) @@ -577,8 +731,11 @@ def CreateSymlink(job,ptype,path): os.symlink(path, symlink) return symlink -# function to create or return a Path object basedon the path prefix (pp) and the type (the type id of Import, Storage, Bin) -# if the Path is created, it also creates the Dir object (which in turn creates the Entry object) +############################################################################## +# AddPath() create or return a Path object basedon the path prefix (pp) and the +# type (the type id of Import, Storage, Bin) if the Path is created, it also +# creates the Dir object (which in turn creates the Entry object) +############################################################################## def AddPath(job, pp, type ): path_obj=session.query(Path).filter(Path.path_prefix==pp).first() if not path_obj: @@ -591,8 +748,7 @@ def AddPath(job, pp, type ): return path_obj -################################################################################################################################################################ -# +#################################################################################################################################### # Key function that runs as part of (usually) an import job. The name of the directory (dirname) is checked to see # if it already is in the database (inside of in_dir in in_path). If it is, # just return the db entry. If not, then we create a new row in Dir, that has name of dirname, has a parent directory @@ -601,9 +757,9 @@ def AddPath(job, pp, type ): # # e.g. path on FS: /home/ddp/src/photoassistant/images_to_process/ ... ends in DB as path_prefix="static/Import/images_to_process" # and we have a dir in /home/ddp/src/photoassistant/images_to_process/0000/subtest, then we call: -# AddDir( job, dirname='subtest', in_dir=Dir object for '0000', rel_path='0000/subtest', in_path=Path object for 'static/Import/images_to_process' ) -# -################################################################################################################################################################ +# AddDir( job, dirname='subtest', in_dir=Dir object for '0000', rel_path='0000/subtest', +# in_path=Path object for 'static/Import/images_to_process' ) +#################################################################################################################################### def AddDir(job, dirname, in_dir, rel_path, in_path ): dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==in_path.id).filter(Dir.rel_path==rel_path).first() if dir: @@ -622,6 +778,10 @@ def AddDir(job, dirname, in_dir, rel_path, in_path ): session.add(e) return dir +#################################################################################################################################### +# AddFile(): adds a file into the given dir (in_dir) with dates provided. If +# it is already in the DB, just return the entry +#################################################################################################################################### def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ): e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first() if e: @@ -636,8 +796,10 @@ def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ): session.add(e) return e -# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that -# is still false, has been deleted +#################################################################################################################################### +# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, +# anything that is still false, has been deleted (and will be deleted out of dir in other func as needed) +#################################################################################################################################### def ResetExistsOnFS(job, path): reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).all() for reset_dir in reset_dirs: @@ -649,16 +811,19 @@ def ResetExistsOnFS(job, path): session.add(reset_file) return +#################################################################################################################################### # Convenience function to remove a file from the database - and its associated links -# used when scanning and a file has been removed out from under PA, or -# when we remove duplicates +# used when scanning and a file has been removed out from under PA, or when we remove duplicates +#################################################################################################################################### def RemoveFileFromDB(id): session.query(EntryDirLink).filter(EntryDirLink.entry_id==id).delete() session.query(File).filter(File.eid==id).delete() session.query(Entry).filter(Entry.id==id).delete() return +#################################################################################################################################### # Actually moves the physical file from its current real directory to a subdir of the recycle bin path +#################################################################################################################################### def RemoveFileFromFS( del_me ): try: settings = session.query(Settings).first() @@ -671,9 +836,11 @@ def RemoveFileFromFS( del_me ): print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}") return +#################################################################################################################################### # Function that restores a file that was deleted (moved into the Bin) # it moves file on the filesystem back to its original path and then changes the database path from the Bin path # to the original import or storage path and appropriate dir +#################################################################################################################################### def RestoreFile(job,restore_me): try: # rel_path for a file in the Bin, is like 'Import/images_to_process/1111', so just prepend static/ @@ -720,9 +887,10 @@ def RestoreFile(job,restore_me): session.commit() return -# Function that moves a file we are "deleting" to the recycle bin, it moves the -# file on the filesystem and then changes the database path from the import or -# storage path over to the Bin path +#################################################################################################################################### +# Function that moves a file we are "deleting" to the recycle bin, it moves the file on the filesystem and then changes the +# database path from the import or storage path over to the Bin path +#################################################################################################################################### def MoveFileToRecycleBin(job,del_me): try: settings = session.query(Settings).first() @@ -763,8 +931,10 @@ def MoveFileToRecycleBin(job,del_me): AddLogForJob(job, f"Deleted file: {del_me.name} - (moved to {os.path.dirname(del_me.FullPathOnFS())})" ) return +#################################################################################################################################### # Function that moves a file into a new folder in the storage path - if needed it makes the folder on the FS, # moves the file into the folder on the FS and then changes the database path to the relevant Storage path +#################################################################################################################################### def MoveFileToNewFolderInStorage(job,move_me, dst_storage_path, dst_rel_path): print( f"MoveFileToNewFolderInStorage: {move_me} to {dst_storage_path} in new? folder: {dst_storage_path}") try: @@ -781,12 +951,6 @@ def MoveFileToNewFolderInStorage(job,move_me, dst_storage_path, dst_rel_path): # need these for AddDir calls below to work parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==dst_storage_path.id).first() - # remove static from rel path, as it will move to static/Bin anyway... -# new_rel_path=del_me.in_dir.in_path.path_prefix.replace('static/','') - # if there is a relative path on this dir, add it to the new_rel_path as there is only ever 1 Bin path -# if len(del_me.in_dir.rel_path): -# new_rel_path += '/' + del_me.in_dir.rel_path - # okay, go through new relative path and AddDir any missing subdirs of this # path (think Import/Dir1/Dir2) which b/c we have bin_path in AddDir will # create Bin/Import, Bin/Import/Dir1, Bin/Import/Dir1/Dir2 @@ -804,15 +968,19 @@ def MoveFileToNewFolderInStorage(job,move_me, dst_storage_path, dst_rel_path): AddLogForJob(job, f"{move_me.name} - (moved to {os.path.dirname(move_me.FullPathOnFS())})" ) return +#################################################################################################################################### # Convenience function to remove a dir from the database - and its associated links +#################################################################################################################################### def RemoveDirFromDB(id): session.query(EntryDirLink).filter(EntryDirLink.entry_id==id).delete() session.query(Dir).filter(Dir.eid==id).delete() session.query(Entry).filter(Entry.id==id).delete() return +#################################################################################################################################### # this routine is used when a scan finds files/dirs that have been removed # underneath PA, so it just deletes them form the DB +#################################################################################################################################### def HandleAnyFSDeletions(job): dtype=session.query(FileType).filter(FileType.name=='Directory').first() rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all() @@ -829,7 +997,9 @@ def HandleAnyFSDeletions(job): rm_cnt+=1 return rm_cnt +#################################################################################################################################### # try several ways to work out date of file created (try exif, then filename, lastly filesystem) +#################################################################################################################################### def GetDateFromFile(file, stat): # try exif try: @@ -856,6 +1026,9 @@ def GetDateFromFile(file, stat): woy=c[1] return year, month, day, woy +#################################################################################################################################### +# AddJexToDependantJobs(): if a parent job has jex, then copy them down into dependant jobs so we can just use the data +#################################################################################################################################### def AddJexToDependantJobs(job,name,value): for j in session.query(Job).filter(Job.wait_for==job.id).all(): jex=JobExtra( name=name, value=value ) @@ -863,6 +1036,10 @@ def AddJexToDependantJobs(job,name,value): AddJexToDependantJobs(j, name, value) return +#################################################################################################################################### +# JobImportDir(): job that scan import dir and processes entries in there - key function that uses os.walk() to traverse the +# file system and calls AddFile()/AddDir() as necessary +#################################################################################################################################### def JobImportDir(job): JobProgressState( job, "In Progress" ) settings = session.query(Settings).first() @@ -952,6 +1129,9 @@ def JobImportDir(job): FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)") return +#################################################################################################################################### +# RunFuncOnFilesInPath(): take a path, find the Dir Entry for it, then go through each file in the dir (use ORM here) +#################################################################################################################################### def RunFuncOnFilesInPath( job, path, file_func, count_dirs ): d = session.query(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).filter(Dir.rel_path=='').first() files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==d.eid).all() @@ -960,6 +1140,9 @@ def RunFuncOnFilesInPath( job, path, file_func, count_dirs ): return +#################################################################################################################################### +# WrapperForScanFileForPerson(): take an entry and scan it for a person (if it is an image), update job file counts as needed too +#################################################################################################################################### def WrapperForScanFileForPerson(job, entry): if entry.type.name == 'Image': if DEBUG: @@ -969,12 +1152,19 @@ def WrapperForScanFileForPerson(job, entry): job.current_file_num+=1 return +#################################################################################################################################### +# Simple function that allows us to count image files (allows us to know AI job's total # of files) +#################################################################################################################################### def AddToJobImageCount(job, entry ): if entry.type.name == 'Image': job.num_files += 1 return +#################################################################################################################################### +# JobRunAIOn(): job that grabs relevant refimgs to scan against the given set of entries (from seln made in F/E) +#################################################################################################################################### def JobRunAIOn(job): + # TODO: need to use JobInProgress... (here and other jobs?) -- so should I move this to HandleJob??? AddLogForJob(job, f"INFO: Starting job to look for faces in files...") which_person=[jex.value for jex in job.extra if jex.name == "person"][0] if which_person == "all": @@ -1018,6 +1208,10 @@ def JobRunAIOn(job): FinishJob(job, "Finished Processesing AI") return +#################################################################################################################################### +# JobRotateImage(): rotate an image by the amount requested (can also flip horizontal or vertical) +# TODO: should be JobTransformImage() ;) +#################################################################################################################################### def JobRotateImage(job): AddLogForJob(job, f"INFO: Starting rotation/flip of image file...") id=[jex.value for jex in job.extra if jex.name == "id"][0] @@ -1025,8 +1219,6 @@ def JobRotateImage(job): e=session.query(Entry).join(File).filter(Entry.id==id).first() im = Image.open( e.FullPathOnFS() ) - print(amt) - if amt == "fliph": AddLogForJob(job, f"INFO: Flipping {e.FullPathOnFS()} horizontally" ) out = im.transpose(Image.FLIP_LEFT_RIGHT) @@ -1042,6 +1234,9 @@ def JobRotateImage(job): FinishJob(job, "Finished Processesing image rotation/flip") return +#################################################################################################################################### +# GenHashAndThumb(): calc. MD5 hash on given entry and generate image thumbnail for image, or video thumbnail for video +#################################################################################################################################### def GenHashAndThumb(job, e): # commit every 100 files to see progress being made but not hammer the database if job.current_file_num % 100 == 0: @@ -1063,6 +1258,10 @@ def GenHashAndThumb(job, e): e.file_details.last_hash_date = time.time() return +#################################################################################################################################### +# ProcessFilesInDir(): take an entry, if its a Dir recurse into it, if its a +# File, call file_func on it +#################################################################################################################################### def ProcessFilesInDir(job, e, file_func, count_dirs): if DEBUG==1: print( f"DEBUG: ProcessFilesInDir: {e.FullPathOnFS()}") @@ -1077,6 +1276,9 @@ def ProcessFilesInDir(job, e, file_func, count_dirs): ProcessFilesInDir(job, sub, file_func, count_dirs) return +#################################################################################################################################### +# JobGetFileDetails(): job to get the file details (hashes & thumbnails) +#################################################################################################################################### def JobGetFileDetails(job): JobProgressState( job, "In Progress" ) path=[jex.value for jex in job.extra if jex.name == "path"][0] @@ -1092,6 +1294,9 @@ def JobGetFileDetails(job): session.commit() return +#################################################################################################################################### +# isVideo(): use MediaInfo python lib to see if the file is a video or not +#################################################################################################################################### def isVideo(file): try: fileInfo = MediaInfo.parse(file) @@ -1102,7 +1307,9 @@ def isVideo(file): except Exception: return False +#################################################################################################################################### # Returns an md5 hash of the fnames' contents +#################################################################################################################################### def md5(job, fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: @@ -1112,6 +1319,9 @@ def md5(job, fname): AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) ) return hash +#################################################################################################################################### +# isImage(): use python PIL library, if we can open it as an image, it is one, so return True +#################################################################################################################################### def isImage(file): try: Image.open(file) @@ -1119,11 +1329,19 @@ def isImage(file): except: return False +#################################################################################################################################### +# GenImageThumbnail(): log and then generate the thumb for a file +# TODO: this is now sort of pointless with moving the body to shared... +#################################################################################################################################### def GenImageThumbnail(job, file): ProcessFileForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file ) thumb, _, _ = GenThumb(file) return thumb +#################################################################################################################################### +# GenVideoThumbnail(): log and then generate the thumb for a video (this grabs the width/height of a frame from the video, +# and then reads the first few frames until the mean() fo the frame indicates its not just black frame, then make the thumbnail +#################################################################################################################################### def GenVideoThumbnail(job, file): ProcessFileForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file ) try: @@ -1151,10 +1369,12 @@ def GenVideoThumbnail(job, file): return None return thumbnail +#################################################################################################################################### # utility function to clear any other future Duplicate messages, called if we # either create a "new" CheckDups (often del/restore related), OR because we # are actualyl handling the dups now from a front-end click through to # /removedups, but some other job has since created another dup message... +#################################################################################################################################### def ClearOtherDupMessagesAndJobs(): msgs=session.query(PA_JobManager_FE_Message).join(Job).filter(Job.name=='checkdups') for msg in msgs: @@ -1164,6 +1384,9 @@ def ClearOtherDupMessagesAndJobs(): FinishJob(j, "New CheckForDups job/removal supercedes this job, withdrawing it", "Withdrawn") session.commit() +#################################################################################################################################### +# CheckForDups(): job to dig into the DB with sql, find duplicates - if there are any, pop a F/E status to say so +#################################################################################################################################### def CheckForDups(job): AddLogForJob( job, f"Check for duplicates" ) ClearOtherDupMessagesAndJobs() @@ -1178,6 +1401,10 @@ def CheckForDups(job): FinishJob(job, f"Finished looking for duplicates") return +#################################################################################################################################### +# RemoveDups(): job to go through the f/e nominated keep/del data and then put a new CheckDups() in, as the user may have only +# partially addressed them (e.g. the first 10 dups...) +#################################################################################################################################### def RemoveDups(job): AddLogForJob(job, f"INFO: Starting Remove Duplicates job...") # as checkdups covers all dups, delete all future dups messages, and Withdraw future checkdups jobs @@ -1244,6 +1471,10 @@ def RemoveDups(job): AddLogForJob(job, "adding job id={} {} to confirm there are no more duplicates".format( next_job.id, next_job.id, next_job.name ) ) return +#################################################################################################################################### +# JobMoveFiles(): moves files from a specified location (usually in the import dir) to a specificed location (usually a brand new +# folder in the storage dir) +#################################################################################################################################### def JobMoveFiles(job): AddLogForJob(job, f"INFO: Starting Move Files job...") prefix=[jex.value for jex in job.extra if jex.name == "prefix"][0] @@ -1261,6 +1492,9 @@ def JobMoveFiles(job): FinishJob(job, f"Finished move selected file(s)") return +#################################################################################################################################### +# JobDeleteFiles(): job to delete specified files (chosen in f/e) +#################################################################################################################################### def JobDeleteFiles(job): AddLogForJob(job, f"INFO: Starting Delete Files job...") for jex in job.extra: @@ -1274,6 +1508,9 @@ def JobDeleteFiles(job): FinishJob(job, f"Finished deleting selected file(s)") return +#################################################################################################################################### +# JobRestoreFiles(): if in the Bin path the user can restore specified files (chosen in f/e) +#################################################################################################################################### def JobRestoreFiles(job): AddLogForJob(job, f"INFO: Starting Restore Files job...") for jex in job.extra: @@ -1287,6 +1524,11 @@ def JobRestoreFiles(job): FinishJob(job, f"Finished restoring selected file(s)") return +#################################################################################################################################### +# InitialValidationChecks(): checks paths (and dirs) exist in DB on first run. +# IF path from settings does not exists - log it +# If there is content in the Bin already, its logs this - mostly useful when testing) +#################################################################################################################################### def InitialValidationChecks(): now=datetime.now(pytz.utc) job=Job(start_time=now, last_update=now, name="init", state="New", wait_for=None, pa_job_state="New", current_file_num=0 ) @@ -1337,6 +1579,9 @@ def InitialValidationChecks(): FinishJob(job,"Finished Initial Validation Checks") return +#################################################################################################################################### +# AddFaceToFile(): adds the specified face, location & model_used to the specified file +#################################################################################################################################### def AddFaceToFile( locn_data, face_data, file_eid, model_id ): face = Face( face=face_data.tobytes(), locn=json.dumps(locn_data) ) session.add(face) @@ -1346,11 +1591,18 @@ def AddFaceToFile( locn_data, face_data, file_eid, model_id ): session.commit() return face +#################################################################################################################################### +# DelFacesForFile(): quick func to delete any faces associated with the specified file +#################################################################################################################################### def DelFacesForFile( eid ): session.execute( f"delete from face where id in (select face_id from face_file_link where file_eid = {eid})" ) session.commit() return +#################################################################################################################################### +# MatchRefimgToFace(): take specified refimg and a face & distance and connect +# them in the DB (e.g. Mark this face as matching this refimge with this face distance) +#################################################################################################################################### def MatchRefimgToFace( refimg_id, face_id, face_dist ): # remove any match to this face from previous attempts, and 'replace' with new one session.query(FaceRefimgLink).filter(FaceRefimgLink.face_id==face_id).delete() @@ -1359,10 +1611,11 @@ def MatchRefimgToFace( refimg_id, face_id, face_dist ): session.commit() return -def UnmatchedFacesForFile( eid ): - rows = session.execute( f"select f.* from face f left join face_refimg_link frl on f.id = frl.face_id join face_file_link ffl on f.id = ffl.face_id where ffl.file_eid = {eid} and frl.refimg_id is null" ) - return rows - +#################################################################################################################################### +# BestFaceMatch(): take in required threshold for anything to be a match, then go through the 'dist' array - it contains the face +# distance for each refimg / face. So this func loops through all of them, +# return the best match (with lowest distance) by checking each one that is a match based on the threshold +#################################################################################################################################### def BestFaceMatch(dist, fid, threshold): # 1 is not a match (0 is perfect match) lowest=1.0 @@ -1374,6 +1627,20 @@ def BestFaceMatch(dist, fid, threshold): print( f"bfm: return {which}, {lowest} for {fid}" ) return which, lowest +#################################################################################################################################### +# ScanFileForPerson(): for a file, check to see if a person is matched via face_recognition +# +# NOTE: can pass force into this, but no f/e to trip this yet +# if we do not have (any) faces for this file, go get them and their locations and store them in the DB assocaited with this file +# then for each face (known/matched already or not), create a new array 'dist[refimg][face]' and run face_recognition code +# to calculate the distance between the refimg and this face, for each refimg and each face +# then invoke BestFaceMatch above to find the best match, and then store that in the DB for each face for this file +# TODO: I *think* by going face-1, face-2, etc. we could find a suboptimal solution: +# e.g face-1 (Cam 0.55, Mich 0.6) +# face-2 (Cam 0.45, Mich 0.54) +# Algo would pick Cam for face-1, delete Cam, then Mich for face-2. +# Should have chosen the other way -> might need to research best algo here +#################################################################################################################################### def ScanFileForPerson( job, e, force=False ): # get default_scan_model from settings (test this) settings = session.query(Settings).first() @@ -1418,6 +1685,7 @@ def ScanFileForPerson( job, e, force=False ): dist[r.id][face.id] = face_recognition.face_distance(unknown_face_data, [refimg_face_data]) # if you need to check face distances, uncomment this: print( f"dist={dist}" ) + # TODO: I think this next line is not needed anymore (does the same as above?) faces = session.execute( f"select f.* from face f join face_file_link ffl on f.id = ffl.face_id where ffl.file_eid = {e.id}" ) for face in faces: who, fd = BestFaceMatch(dist, face.id, threshold ) @@ -1428,6 +1696,10 @@ def ScanFileForPerson( job, e, force=False ): return +#################################################################################################################################### +# MAIN - start with validation, then grab any jobs in the DB to process, then +# go into waiting on a socket to be woken up (and then if woken, back into HandleJobs() +#################################################################################################################################### if __name__ == "__main__": print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )