updated comments

This commit is contained in:
2021-08-12 23:22:05 +10:00
parent 0180703ae4
commit 6e2d04cd76

View File

@@ -18,7 +18,6 @@
DEBUG=1
### SQLALCHEMY IMPORTS ###
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean
from sqlalchemy.exc import SQLAlchemyError
@@ -27,10 +26,7 @@ from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
### SQLALCHEMY IMPORTS ###
### LOCAL FILE IMPORTS ###
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName, GenThumb
from datetime import datetime, timedelta, date
@@ -65,17 +61,17 @@ some_engine = create_engine(DB_URL)
#Session = sessionmaker(bind=some_engine)
# create a Session
session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)
session = Session()
# this creates the Base (like db model in flask)
Base = declarative_base()
################################################################################
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
# This has to match one-for-one the DB table
# Class describing PathType & in the database (via sqlalchemy)
# series of pre-defined types of paths (import, storage, bin)
################################################################################
class PathType(Base):
__tablename__ = "path_type"
@@ -86,6 +82,10 @@ class PathType(Base):
return f"<id: {self.id}, name={self.name}"
################################################################################
# Class describing PathDirLink & in the database (via sqlalchemy)
# connects a path with its matching entry (dir)
################################################################################
class PathDirLink(Base):
__tablename__ = "path_dir_link"
path_id = Column(Integer, ForeignKey("path.id"), primary_key=True )
@@ -94,6 +94,10 @@ class PathDirLink(Base):
def __repr__(self):
return f"<path_id: {self.path_id}, dir_eid: {self.dir_eid}>"
################################################################################
# Class describing EntryDirLink & in the database (via sqlalchemy)
# connects a path with its matching entry (dir)
################################################################################
class EntryDirLink(Base):
__tablename__ = "entry_dir_link"
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
@@ -102,6 +106,9 @@ class EntryDirLink(Base):
def __repr__(self):
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
################################################################################
# Class describing Path & in the database (via sqlalchemy)
################################################################################
class Path(Base):
__tablename__ = "path"
id = Column(Integer, Sequence('path_id_seq'), primary_key=True )
@@ -113,6 +120,13 @@ class Path(Base):
def __repr__(self):
return f"<id: {self.id}, type={self.type}, path_prefix={self.path_prefix}>"
################################################################################
# Class describing Dir & in the database (via sqlalchemy)
# rel_path: rest of dir after path, e.g. if path = /..../storage, then
# rel_path could be 2021/20210101-new-years-day-pics
# in_path: only in this structure, not DB, quick ref to the path this dir is in
# PathOnFS(): method to get path on the FS for this dir
################################################################################
class Dir(Base):
__tablename__ = "dir"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
@@ -126,6 +140,15 @@ class Dir(Base):
def __repr__(self):
return f"<eid: {self.eid}, rel_path: {self.rel_path}, in_path={self.in_path}, last_import_date: {self.last_import_date}>"
################################################################################
# Class describing Entry and in the DB (via sqlalchemy)
# an entry is the common bits between files and dirs
# type is a convenience var only in this class, not in DB
# {dir|file}_etails are convenience data for the relevant details from the Dir
# or File class - not in DB
# in_dir - is the Dir that this entry is located in (convenience for class only)
# FullPathOnFS(): method to get path on the FS for this Entry
################################################################################
class Entry(Base):
__tablename__ = "entry"
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
@@ -151,6 +174,14 @@ class Entry(Base):
def __repr__(self):
return f"<id: {self.id}, name: {self.name}, type={self.type}, exists_on_fs={self.exists_on_fs}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}>"
################################################################################
# Class describing File and in the DB (via sqlalchemy)
# all files are entries, this is the extra bits only for a file, of note:
# hash is unique for files, and used to validate duplicates
# woy == week of year, all date fields are used to sort/show content. Date
# info can be from exif, or file system, or file name (rarely)
# faces: convenience field to show connected face(s) for this file
################################################################################
class File(Base):
__tablename__ = "file"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
@@ -168,6 +199,12 @@ class File(Base):
def __repr__(self):
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, last_hash_date: {self.last_hash_date}>"
################################################################################
# Class describing DelFile and in the DB (via sqlalchemy)
# used to track deleted files so they can be restored
# and keep all associated data intact, e.g. faces, etc. are connected to file_eid
# and orig_path_prefix allows restoration to the original path/dir
################################################################################
class DelFile(Base):
__tablename__ = "del_file"
file_eid = Column(Integer, ForeignKey("file.eid"), primary_key=True )
@@ -176,6 +213,10 @@ class DelFile(Base):
def __repr__(self):
return f"<file_eid: {self.file_eid}, orig_path_prefix={self.orig_path_prefix}>"
################################################################################
# Class describing FileType and in the DB (via sqlalchemy)
# pre-defined list of file types (image, dir, etc.)
################################################################################
class FileType(Base):
__tablename__ = "file_type"
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
@@ -184,6 +225,9 @@ class FileType(Base):
def __repr__(self):
return f"<id: {self.id}, name={self.name}>"
################################################################################
# Class describing Settings and in the DB (via sqlalchemy)
################################################################################
class Settings(Base):
__tablename__ = "settings"
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
@@ -197,6 +241,9 @@ class Settings(Base):
def __repr__(self):
return f"<id: {self.id}, import_path: {self.import_path}, recycle_bin_path: {self.recycle_bin_path}, default_refimg_model: {self.default_refimg_model}, default_scan_model: {self.default_scan_model}, default_threshold: {self.default_threshold}>"
################################################################################
# Class describing Person to Refimg link in DB via sqlalchemy
################################################################################
class PersonRefimgLink(Base):
__tablename__ = "person_refimg_link"
person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True)
@@ -205,6 +252,9 @@ class PersonRefimgLink(Base):
def __repr__(self):
return f"<person_id: {self.person_id}, refimg_id: {self.refimg_id}>"
################################################################################
# Class describing Person in DB via sqlalchemy
################################################################################
class Person(Base):
__tablename__ = "person"
id = Column(Integer, Sequence('person_id_seq'), primary_key=True )
@@ -216,6 +266,16 @@ class Person(Base):
def __repr__(self):
return f"<tag: {self.tag}, firstname: {self.firstname}, surname: {self.surname}, refimg: {self.refimg}>"
################################################################################
# Class describing Refimg in DB via sqlalchemy
# fname: original file name of refimg
# face: actual binary of numpy data for this refimg's face (always only 1)
# orig*: width/height of original image, because when we show in person, it get scaled
# face_locn: location of ace - we need to know to draw green box around face locn (with orig* above)
# thumbnail: image data of actual img. once we load refimg, we only store this data, not the orig file
# model_used: which AI model (cnn or hog) used to create face
# person: read-only convenience field not in DB, just used in html
################################################################################
class Refimg(Base):
__tablename__ = "refimg"
id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True )
@@ -231,6 +291,10 @@ class Refimg(Base):
def __repr__(self):
return f"<id: {self.id}, fname: {self.fname}, created_on: {self.created_on}>"
################################################################################
# Class describing AIModel in DB via sqlalchemy
# pre-defined list of models (cnn, hog)
################################################################################
class AIModel(Base):
__tablename__ = "ai_model"
id = Column(Integer, primary_key=True )
@@ -239,6 +303,11 @@ class AIModel(Base):
def __repr__(self):
return f"<id: {self.id}, name: {self.name}>"
################################################################################
# Class describing Face in the database and DB via sqlalchemy
# - face contains the binary version of numpy array so we dont need to recalc it
# - locn is the pixel coords of the face (top, right, bottom, left)
################################################################################
class Face(Base):
__tablename__ = "face"
id = Column(Integer, Sequence('face_id_seq'), primary_key=True )
@@ -248,6 +317,13 @@ class Face(Base):
def __repr__(self):
return f"<id: {self.id}, face={self.face}"
################################################################################
# Class describing FaceFileLink in the database and DB via sqlalchemy
# each face comes from a file and used a model to find the face
# this is not perfect, each face in the same file is always foudn with the same
# model - so really should have ModelFileLink or something, in the long run
# this might even be better as ScanDetailsFileLink and ScanDetails
################################################################################
class FaceFileLink(Base):
__tablename__ = "face_file_link"
face_id = Column(Integer, ForeignKey("face.id"), primary_key=True )
@@ -257,6 +333,11 @@ class FaceFileLink(Base):
def __repr__(self):
return f"<face_id: {self.face_id}, file_eid={self.file_eid}"
################################################################################
# Class describing FaceRefimgLink in the database and DB via sqlalchemy
# connects / implies a face has matched a refimg and we keep the distance too
# distance is mainly for debugging for now and shown in viewer
################################################################################
class FaceRefimgLink(Base):
__tablename__ = "face_refimg_link"
face_id = Column(Integer, ForeignKey("face.id"), primary_key=True )
@@ -266,12 +347,8 @@ class FaceRefimgLink(Base):
def __repr__(self):
return f"<face_id: {self.face_id}, refimg_id={self.refimg_id}"
################################################################################
# classes for the job manager:
# Job (and Joblog, JobExtra) for each Job, and
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
# Class describing logs for each job and via sqlalchemy, connected to the DB as well
################################################################################
class Joblog(Base):
__tablename__ = "joblog"
@@ -283,6 +360,9 @@ class Joblog(Base):
def __repr__(self):
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
################################################################################
# Class describing extra/specific info for a Job and via sqlalchemy, connected to the DB as well
################################################################################
class JobExtra(Base):
__tablename__ = "jobextra"
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
@@ -293,6 +373,9 @@ class JobExtra(Base):
def __repr__(self):
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
################################################################################
# Class describing Job and via sqlalchemy, connected to the DB as well
################################################################################
class Job(Base):
__tablename__ = "job"
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
@@ -312,6 +395,13 @@ class Job(Base):
def __repr__(self):
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
##############################################################################
# Class describing PA_JobManager_FE_Message and in the DB (via sqlalchemy)
# the job manager (this code) can send a message back to the front end via the
# DB. has to be about a specific job_id and is success/danger, etc. (alert)
# and a message
################################################################################
class PA_JobManager_FE_Message(Base):
__tablename__ = "pa_job_manager_fe_message"
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
@@ -322,7 +412,8 @@ class PA_JobManager_FE_Message(Base):
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
##############################################################################
# Util (non Class) functions
# MessageToFE(): sends a specific alert/messasge for a given job via the DB to
# the front end
##############################################################################
def MessageToFE( job_id, alert, message ):
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
@@ -330,6 +421,10 @@ def MessageToFE( job_id, alert, message ):
session.commit()
return msg.id
##############################################################################
# ProcessRecycleBinDir(): create Path/symlink if needed (func called once on
# startup)
##############################################################################
def ProcessRecycleBinDir(job):
settings = session.query(Settings).first()
if settings == None:
@@ -346,9 +441,12 @@ def ProcessRecycleBinDir(job):
AddPath( job, symlink, ptype.id )
session.commit()
return
##############################################################################
# ProcessStorageDirs(): wrapper func to call passed in job for each
# storage path defined in Settings - called via scan storage job
##############################################################################
def ProcessStorageDirs(parent_job):
settings = session.query(Settings).first()
if settings == None:
@@ -358,6 +456,10 @@ def ProcessStorageDirs(parent_job):
JobsForPaths( parent_job, paths, ptype )
return
##############################################################################
# ProcessImportDirs(): wrapper func to call passed in job for each
# storage path defined in Settings - called via scan import job
##############################################################################
def ProcessImportDirs(parent_job):
settings = session.query(Settings).first()
if settings == None:
@@ -367,6 +469,11 @@ def ProcessImportDirs(parent_job):
JobsForPaths( parent_job, paths, ptype )
return
##############################################################################
# JobsForPaths(): wrapper func to create jobs for passed in parent_job for
# each path passed in, with path_type passed in - used by
# Process{Storage|Import}Dirs() above
##############################################################################
def JobsForPaths( parent_job, paths, ptype ):
now=datetime.now(pytz.utc)
# make new set of Jobs per path... HandleJobs will make them run later
@@ -404,15 +511,22 @@ def JobsForPaths( parent_job, paths, ptype ):
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
"""
### FIXME: wait for job3 not job2!
job4=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
session.add(job4)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job4.id, job4.id, job4.name, job4.wait_for ) )
HandleJobs()
### FIXME: wait for job3 not job2!
job4=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
session.add(job4)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job4.id, job4.id, job4.name, job4.wait_for ) )
HandleJobs()
return
##############################################################################
# ProcessFileForJob(): will set current_file in job for display in f/e job
# status, and if num_files is set of this job, then increment current file num
# to keep track of how far throught the job is (for display in f/e)
# and then add a log to say we have processed this file
##############################################################################
def ProcessFileForJob(job, message, current_file):
job.current_file=os.path.basename(current_file)
if job.num_files:
@@ -420,6 +534,12 @@ def ProcessFileForJob(job, message, current_file):
AddLogForJob(job, message )
return
##############################################################################
# AddLogForJob(): add a log line to joblog, if the last time we wrote a log
# was over 5 seconds ago, then commit the log to the db, so in f/e we see
# progress no matter the speed of job log output
# also when DEBUG is set, print a debug log too
##############################################################################
def AddLogForJob(job, message):
now=datetime.now(pytz.utc)
log=Joblog( job_id=job.id, log=message, log_date=now )
@@ -439,6 +559,11 @@ def AddLogForJob(job, message):
print( f"DEBUG: {message}" )
return
##############################################################################
# RunJob(): runs the actual job, based on job.name we call the appropriate function
# if job.name is not 'known', then we catch all force a FinishJob to make sure
# it does not cause the job manager to choke on the unmatched job
##############################################################################
def RunJob(job):
# session = Session()
job.start_time=datetime.now(pytz.utc)
@@ -475,12 +600,19 @@ def RunJob(job):
HandleJobs()
return
##############################################################################
# CancelJob(): cancel this job, and if any other job is waiting on this job,
# then cancel it too (this is done recursively)
##############################################################################
def CancelJob(job,id):
for j in session.query(Job).filter(Job.wait_for==id).all():
FinishJob(j, f"Job (#{j.id}) has been withdrawn as the job being waited for #{job.id} failed", "Withdrawn" )
CancelJob(j, j.id)
return
##############################################################################
# FinishJob(): finish this job off (if no overrides), its just marked completed
##############################################################################
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
job.state=state
job.pa_job_state=pa_job_state
@@ -493,6 +625,10 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
print( f"DEBUG: {last_log}" )
return
##############################################################################
# HandleJobs(): go through each job, if it New, then tackle it --
# TODO: why not only retrieve New jobs from DB?
##############################################################################
def HandleJobs():
if DEBUG==1:
print("INFO: PA job manager is scanning for new jobs to process")
@@ -525,12 +661,20 @@ def HandleJobs():
print("INFO: PA job manager is waiting for a job")
return
##############################################################################
# JobProgressState(): set pa_job_state to 'In Progress', and state to whatever
# is passed in (so the state in the f/e can be whatever, but the job mgr only
# has accurate pa_job_states
##############################################################################
def JobProgressState( job, state ):
job.pa_job_state = "In Progress"
job.state=state
session.commit()
return
##############################################################################
# JobScanNow(): start and process the job to start scanning now (import paths)
##############################################################################
def JobScanNow(job):
JobProgressState( job, "In Progress" )
ProcessImportDirs(job)
@@ -539,6 +683,9 @@ def JobScanNow(job):
session.commit()
return
##############################################################################
# JobScanStorageDir(): start and process the job to start scanning now (storage paths)
##############################################################################
def JobScanStorageDir(job):
JobProgressState( job, "In Progress" )
ProcessStorageDirs(job)
@@ -547,6 +694,10 @@ def JobScanStorageDir(job):
session.commit()
return
##############################################################################
# JobForceScan(): start and process the job to force scanning now - so delete
# all attached data in DB, then scan import and storage paths
##############################################################################
def JobForceScan(job):
JobProgressState( job, "In Progress" )
session.query(FaceFileLink).delete()
@@ -567,7 +718,10 @@ def JobForceScan(job):
session.commit()
return
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
##############################################################################
# CreateSymlink(): to serve static content of the images, we create a symlink
# from inside the static subdir of each import_path that exists
##############################################################################
def CreateSymlink(job,ptype,path):
path_type = session.query(PathType).get(ptype)
symlink=SymlinkName(path_type.name, path, path)
@@ -577,8 +731,11 @@ def CreateSymlink(job,ptype,path):
os.symlink(path, symlink)
return symlink
# function to create or return a Path object basedon the path prefix (pp) and the type (the type id of Import, Storage, Bin)
# if the Path is created, it also creates the Dir object (which in turn creates the Entry object)
##############################################################################
# AddPath() create or return a Path object basedon the path prefix (pp) and the
# type (the type id of Import, Storage, Bin) if the Path is created, it also
# creates the Dir object (which in turn creates the Entry object)
##############################################################################
def AddPath(job, pp, type ):
path_obj=session.query(Path).filter(Path.path_prefix==pp).first()
if not path_obj:
@@ -591,8 +748,7 @@ def AddPath(job, pp, type ):
return path_obj
################################################################################################################################################################
#
####################################################################################################################################
# Key function that runs as part of (usually) an import job. The name of the directory (dirname) is checked to see
# if it already is in the database (inside of in_dir in in_path). If it is,
# just return the db entry. If not, then we create a new row in Dir, that has name of dirname, has a parent directory
@@ -601,9 +757,9 @@ def AddPath(job, pp, type ):
#
# e.g. path on FS: /home/ddp/src/photoassistant/images_to_process/ ... ends in DB as path_prefix="static/Import/images_to_process"
# and we have a dir in /home/ddp/src/photoassistant/images_to_process/0000/subtest, then we call:
# AddDir( job, dirname='subtest', in_dir=Dir object for '0000', rel_path='0000/subtest', in_path=Path object for 'static/Import/images_to_process' )
#
################################################################################################################################################################
# AddDir( job, dirname='subtest', in_dir=Dir object for '0000', rel_path='0000/subtest',
# in_path=Path object for 'static/Import/images_to_process' )
####################################################################################################################################
def AddDir(job, dirname, in_dir, rel_path, in_path ):
dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==in_path.id).filter(Dir.rel_path==rel_path).first()
if dir:
@@ -622,6 +778,10 @@ def AddDir(job, dirname, in_dir, rel_path, in_path ):
session.add(e)
return dir
####################################################################################################################################
# AddFile(): adds a file into the given dir (in_dir) with dates provided. If
# it is already in the DB, just return the entry
####################################################################################################################################
def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ):
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first()
if e:
@@ -636,8 +796,10 @@ def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ):
session.add(e)
return e
# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that
# is still false, has been deleted
####################################################################################################################################
# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True,
# anything that is still false, has been deleted (and will be deleted out of dir in other func as needed)
####################################################################################################################################
def ResetExistsOnFS(job, path):
reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).all()
for reset_dir in reset_dirs:
@@ -649,16 +811,19 @@ def ResetExistsOnFS(job, path):
session.add(reset_file)
return
####################################################################################################################################
# Convenience function to remove a file from the database - and its associated links
# used when scanning and a file has been removed out from under PA, or
# when we remove duplicates
# used when scanning and a file has been removed out from under PA, or when we remove duplicates
####################################################################################################################################
def RemoveFileFromDB(id):
session.query(EntryDirLink).filter(EntryDirLink.entry_id==id).delete()
session.query(File).filter(File.eid==id).delete()
session.query(Entry).filter(Entry.id==id).delete()
return
####################################################################################################################################
# Actually moves the physical file from its current real directory to a subdir of the recycle bin path
####################################################################################################################################
def RemoveFileFromFS( del_me ):
try:
settings = session.query(Settings).first()
@@ -671,9 +836,11 @@ def RemoveFileFromFS( del_me ):
print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}")
return
####################################################################################################################################
# Function that restores a file that was deleted (moved into the Bin)
# it moves file on the filesystem back to its original path and then changes the database path from the Bin path
# to the original import or storage path and appropriate dir
####################################################################################################################################
def RestoreFile(job,restore_me):
try:
# rel_path for a file in the Bin, is like 'Import/images_to_process/1111', so just prepend static/
@@ -720,9 +887,10 @@ def RestoreFile(job,restore_me):
session.commit()
return
# Function that moves a file we are "deleting" to the recycle bin, it moves the
# file on the filesystem and then changes the database path from the import or
# storage path over to the Bin path
####################################################################################################################################
# Function that moves a file we are "deleting" to the recycle bin, it moves the file on the filesystem and then changes the
# database path from the import or storage path over to the Bin path
####################################################################################################################################
def MoveFileToRecycleBin(job,del_me):
try:
settings = session.query(Settings).first()
@@ -763,8 +931,10 @@ def MoveFileToRecycleBin(job,del_me):
AddLogForJob(job, f"Deleted file: {del_me.name} - (moved to {os.path.dirname(del_me.FullPathOnFS())})" )
return
####################################################################################################################################
# Function that moves a file into a new folder in the storage path - if needed it makes the folder on the FS,
# moves the file into the folder on the FS and then changes the database path to the relevant Storage path
####################################################################################################################################
def MoveFileToNewFolderInStorage(job,move_me, dst_storage_path, dst_rel_path):
print( f"MoveFileToNewFolderInStorage: {move_me} to {dst_storage_path} in new? folder: {dst_storage_path}")
try:
@@ -781,12 +951,6 @@ def MoveFileToNewFolderInStorage(job,move_me, dst_storage_path, dst_rel_path):
# need these for AddDir calls below to work
parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==dst_storage_path.id).first()
# remove static from rel path, as it will move to static/Bin anyway...
# new_rel_path=del_me.in_dir.in_path.path_prefix.replace('static/','')
# if there is a relative path on this dir, add it to the new_rel_path as there is only ever 1 Bin path
# if len(del_me.in_dir.rel_path):
# new_rel_path += '/' + del_me.in_dir.rel_path
# okay, go through new relative path and AddDir any missing subdirs of this
# path (think Import/Dir1/Dir2) which b/c we have bin_path in AddDir will
# create Bin/Import, Bin/Import/Dir1, Bin/Import/Dir1/Dir2
@@ -804,15 +968,19 @@ def MoveFileToNewFolderInStorage(job,move_me, dst_storage_path, dst_rel_path):
AddLogForJob(job, f"{move_me.name} - (moved to {os.path.dirname(move_me.FullPathOnFS())})" )
return
####################################################################################################################################
# Convenience function to remove a dir from the database - and its associated links
####################################################################################################################################
def RemoveDirFromDB(id):
session.query(EntryDirLink).filter(EntryDirLink.entry_id==id).delete()
session.query(Dir).filter(Dir.eid==id).delete()
session.query(Entry).filter(Entry.id==id).delete()
return
####################################################################################################################################
# this routine is used when a scan finds files/dirs that have been removed
# underneath PA, so it just deletes them form the DB
####################################################################################################################################
def HandleAnyFSDeletions(job):
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all()
@@ -829,7 +997,9 @@ def HandleAnyFSDeletions(job):
rm_cnt+=1
return rm_cnt
####################################################################################################################################
# try several ways to work out date of file created (try exif, then filename, lastly filesystem)
####################################################################################################################################
def GetDateFromFile(file, stat):
# try exif
try:
@@ -856,6 +1026,9 @@ def GetDateFromFile(file, stat):
woy=c[1]
return year, month, day, woy
####################################################################################################################################
# AddJexToDependantJobs(): if a parent job has jex, then copy them down into dependant jobs so we can just use the data
####################################################################################################################################
def AddJexToDependantJobs(job,name,value):
for j in session.query(Job).filter(Job.wait_for==job.id).all():
jex=JobExtra( name=name, value=value )
@@ -863,6 +1036,10 @@ def AddJexToDependantJobs(job,name,value):
AddJexToDependantJobs(j, name, value)
return
####################################################################################################################################
# JobImportDir(): job that scan import dir and processes entries in there - key function that uses os.walk() to traverse the
# file system and calls AddFile()/AddDir() as necessary
####################################################################################################################################
def JobImportDir(job):
JobProgressState( job, "In Progress" )
settings = session.query(Settings).first()
@@ -952,6 +1129,9 @@ def JobImportDir(job):
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)")
return
####################################################################################################################################
# RunFuncOnFilesInPath(): take a path, find the Dir Entry for it, then go through each file in the dir (use ORM here)
####################################################################################################################################
def RunFuncOnFilesInPath( job, path, file_func, count_dirs ):
d = session.query(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).filter(Dir.rel_path=='').first()
files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==d.eid).all()
@@ -960,6 +1140,9 @@ def RunFuncOnFilesInPath( job, path, file_func, count_dirs ):
return
####################################################################################################################################
# WrapperForScanFileForPerson(): take an entry and scan it for a person (if it is an image), update job file counts as needed too
####################################################################################################################################
def WrapperForScanFileForPerson(job, entry):
if entry.type.name == 'Image':
if DEBUG:
@@ -969,12 +1152,19 @@ def WrapperForScanFileForPerson(job, entry):
job.current_file_num+=1
return
####################################################################################################################################
# Simple function that allows us to count image files (allows us to know AI job's total # of files)
####################################################################################################################################
def AddToJobImageCount(job, entry ):
if entry.type.name == 'Image':
job.num_files += 1
return
####################################################################################################################################
# JobRunAIOn(): job that grabs relevant refimgs to scan against the given set of entries (from seln made in F/E)
####################################################################################################################################
def JobRunAIOn(job):
# TODO: need to use JobInProgress... (here and other jobs?) -- so should I move this to HandleJob???
AddLogForJob(job, f"INFO: Starting job to look for faces in files...")
which_person=[jex.value for jex in job.extra if jex.name == "person"][0]
if which_person == "all":
@@ -1018,6 +1208,10 @@ def JobRunAIOn(job):
FinishJob(job, "Finished Processesing AI")
return
####################################################################################################################################
# JobRotateImage(): rotate an image by the amount requested (can also flip horizontal or vertical)
# TODO: should be JobTransformImage() ;)
####################################################################################################################################
def JobRotateImage(job):
AddLogForJob(job, f"INFO: Starting rotation/flip of image file...")
id=[jex.value for jex in job.extra if jex.name == "id"][0]
@@ -1025,8 +1219,6 @@ def JobRotateImage(job):
e=session.query(Entry).join(File).filter(Entry.id==id).first()
im = Image.open( e.FullPathOnFS() )
print(amt)
if amt == "fliph":
AddLogForJob(job, f"INFO: Flipping {e.FullPathOnFS()} horizontally" )
out = im.transpose(Image.FLIP_LEFT_RIGHT)
@@ -1042,6 +1234,9 @@ def JobRotateImage(job):
FinishJob(job, "Finished Processesing image rotation/flip")
return
####################################################################################################################################
# GenHashAndThumb(): calc. MD5 hash on given entry and generate image thumbnail for image, or video thumbnail for video
####################################################################################################################################
def GenHashAndThumb(job, e):
# commit every 100 files to see progress being made but not hammer the database
if job.current_file_num % 100 == 0:
@@ -1063,6 +1258,10 @@ def GenHashAndThumb(job, e):
e.file_details.last_hash_date = time.time()
return
####################################################################################################################################
# ProcessFilesInDir(): take an entry, if its a Dir recurse into it, if its a
# File, call file_func on it
####################################################################################################################################
def ProcessFilesInDir(job, e, file_func, count_dirs):
if DEBUG==1:
print( f"DEBUG: ProcessFilesInDir: {e.FullPathOnFS()}")
@@ -1077,6 +1276,9 @@ def ProcessFilesInDir(job, e, file_func, count_dirs):
ProcessFilesInDir(job, sub, file_func, count_dirs)
return
####################################################################################################################################
# JobGetFileDetails(): job to get the file details (hashes & thumbnails)
####################################################################################################################################
def JobGetFileDetails(job):
JobProgressState( job, "In Progress" )
path=[jex.value for jex in job.extra if jex.name == "path"][0]
@@ -1092,6 +1294,9 @@ def JobGetFileDetails(job):
session.commit()
return
####################################################################################################################################
# isVideo(): use MediaInfo python lib to see if the file is a video or not
####################################################################################################################################
def isVideo(file):
try:
fileInfo = MediaInfo.parse(file)
@@ -1102,7 +1307,9 @@ def isVideo(file):
except Exception:
return False
####################################################################################################################################
# Returns an md5 hash of the fnames' contents
####################################################################################################################################
def md5(job, fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
@@ -1112,6 +1319,9 @@ def md5(job, fname):
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
return hash
####################################################################################################################################
# isImage(): use python PIL library, if we can open it as an image, it is one, so return True
####################################################################################################################################
def isImage(file):
try:
Image.open(file)
@@ -1119,11 +1329,19 @@ def isImage(file):
except:
return False
####################################################################################################################################
# GenImageThumbnail(): log and then generate the thumb for a file
# TODO: this is now sort of pointless with moving the body to shared...
####################################################################################################################################
def GenImageThumbnail(job, file):
ProcessFileForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
thumb, _, _ = GenThumb(file)
return thumb
####################################################################################################################################
# GenVideoThumbnail(): log and then generate the thumb for a video (this grabs the width/height of a frame from the video,
# and then reads the first few frames until the mean() fo the frame indicates its not just black frame, then make the thumbnail
####################################################################################################################################
def GenVideoThumbnail(job, file):
ProcessFileForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file )
try:
@@ -1151,10 +1369,12 @@ def GenVideoThumbnail(job, file):
return None
return thumbnail
####################################################################################################################################
# utility function to clear any other future Duplicate messages, called if we
# either create a "new" CheckDups (often del/restore related), OR because we
# are actualyl handling the dups now from a front-end click through to
# /removedups, but some other job has since created another dup message...
####################################################################################################################################
def ClearOtherDupMessagesAndJobs():
msgs=session.query(PA_JobManager_FE_Message).join(Job).filter(Job.name=='checkdups')
for msg in msgs:
@@ -1164,6 +1384,9 @@ def ClearOtherDupMessagesAndJobs():
FinishJob(j, "New CheckForDups job/removal supercedes this job, withdrawing it", "Withdrawn")
session.commit()
####################################################################################################################################
# CheckForDups(): job to dig into the DB with sql, find duplicates - if there are any, pop a F/E status to say so
####################################################################################################################################
def CheckForDups(job):
AddLogForJob( job, f"Check for duplicates" )
ClearOtherDupMessagesAndJobs()
@@ -1178,6 +1401,10 @@ def CheckForDups(job):
FinishJob(job, f"Finished looking for duplicates")
return
####################################################################################################################################
# RemoveDups(): job to go through the f/e nominated keep/del data and then put a new CheckDups() in, as the user may have only
# partially addressed them (e.g. the first 10 dups...)
####################################################################################################################################
def RemoveDups(job):
AddLogForJob(job, f"INFO: Starting Remove Duplicates job...")
# as checkdups covers all dups, delete all future dups messages, and Withdraw future checkdups jobs
@@ -1244,6 +1471,10 @@ def RemoveDups(job):
AddLogForJob(job, "adding <a href='/job/{}'>job id={} {}</a> to confirm there are no more duplicates".format( next_job.id, next_job.id, next_job.name ) )
return
####################################################################################################################################
# JobMoveFiles(): moves files from a specified location (usually in the import dir) to a specificed location (usually a brand new
# folder in the storage dir)
####################################################################################################################################
def JobMoveFiles(job):
AddLogForJob(job, f"INFO: Starting Move Files job...")
prefix=[jex.value for jex in job.extra if jex.name == "prefix"][0]
@@ -1261,6 +1492,9 @@ def JobMoveFiles(job):
FinishJob(job, f"Finished move selected file(s)")
return
####################################################################################################################################
# JobDeleteFiles(): job to delete specified files (chosen in f/e)
####################################################################################################################################
def JobDeleteFiles(job):
AddLogForJob(job, f"INFO: Starting Delete Files job...")
for jex in job.extra:
@@ -1274,6 +1508,9 @@ def JobDeleteFiles(job):
FinishJob(job, f"Finished deleting selected file(s)")
return
####################################################################################################################################
# JobRestoreFiles(): if in the Bin path the user can restore specified files (chosen in f/e)
####################################################################################################################################
def JobRestoreFiles(job):
AddLogForJob(job, f"INFO: Starting Restore Files job...")
for jex in job.extra:
@@ -1287,6 +1524,11 @@ def JobRestoreFiles(job):
FinishJob(job, f"Finished restoring selected file(s)")
return
####################################################################################################################################
# InitialValidationChecks(): checks paths (and dirs) exist in DB on first run.
# IF path from settings does not exists - log it
# If there is content in the Bin already, its logs this - mostly useful when testing)
####################################################################################################################################
def InitialValidationChecks():
now=datetime.now(pytz.utc)
job=Job(start_time=now, last_update=now, name="init", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
@@ -1337,6 +1579,9 @@ def InitialValidationChecks():
FinishJob(job,"Finished Initial Validation Checks")
return
####################################################################################################################################
# AddFaceToFile(): adds the specified face, location & model_used to the specified file
####################################################################################################################################
def AddFaceToFile( locn_data, face_data, file_eid, model_id ):
face = Face( face=face_data.tobytes(), locn=json.dumps(locn_data) )
session.add(face)
@@ -1346,11 +1591,18 @@ def AddFaceToFile( locn_data, face_data, file_eid, model_id ):
session.commit()
return face
####################################################################################################################################
# DelFacesForFile(): quick func to delete any faces associated with the specified file
####################################################################################################################################
def DelFacesForFile( eid ):
session.execute( f"delete from face where id in (select face_id from face_file_link where file_eid = {eid})" )
session.commit()
return
####################################################################################################################################
# MatchRefimgToFace(): take specified refimg and a face & distance and connect
# them in the DB (e.g. Mark this face as matching this refimge with this face distance)
####################################################################################################################################
def MatchRefimgToFace( refimg_id, face_id, face_dist ):
# remove any match to this face from previous attempts, and 'replace' with new one
session.query(FaceRefimgLink).filter(FaceRefimgLink.face_id==face_id).delete()
@@ -1359,10 +1611,11 @@ def MatchRefimgToFace( refimg_id, face_id, face_dist ):
session.commit()
return
def UnmatchedFacesForFile( eid ):
rows = session.execute( f"select f.* from face f left join face_refimg_link frl on f.id = frl.face_id join face_file_link ffl on f.id = ffl.face_id where ffl.file_eid = {eid} and frl.refimg_id is null" )
return rows
####################################################################################################################################
# BestFaceMatch(): take in required threshold for anything to be a match, then go through the 'dist' array - it contains the face
# distance for each refimg / face. So this func loops through all of them,
# return the best match (with lowest distance) by checking each one that is a match based on the threshold
####################################################################################################################################
def BestFaceMatch(dist, fid, threshold):
# 1 is not a match (0 is perfect match)
lowest=1.0
@@ -1374,6 +1627,20 @@ def BestFaceMatch(dist, fid, threshold):
print( f"bfm: return {which}, {lowest} for {fid}" )
return which, lowest
####################################################################################################################################
# ScanFileForPerson(): for a file, check to see if a person is matched via face_recognition
#
# NOTE: can pass force into this, but no f/e to trip this yet
# if we do not have (any) faces for this file, go get them and their locations and store them in the DB assocaited with this file
# then for each face (known/matched already or not), create a new array 'dist[refimg][face]' and run face_recognition code
# to calculate the distance between the refimg and this face, for each refimg and each face
# then invoke BestFaceMatch above to find the best match, and then store that in the DB for each face for this file
# TODO: I *think* by going face-1, face-2, etc. we could find a suboptimal solution:
# e.g face-1 (Cam 0.55, Mich 0.6)
# face-2 (Cam 0.45, Mich 0.54)
# Algo would pick Cam for face-1, delete Cam, then Mich for face-2.
# Should have chosen the other way -> might need to research best algo here
####################################################################################################################################
def ScanFileForPerson( job, e, force=False ):
# get default_scan_model from settings (test this)
settings = session.query(Settings).first()
@@ -1418,6 +1685,7 @@ def ScanFileForPerson( job, e, force=False ):
dist[r.id][face.id] = face_recognition.face_distance(unknown_face_data, [refimg_face_data])
# if you need to check face distances, uncomment this: print( f"dist={dist}" )
# TODO: I think this next line is not needed anymore (does the same as above?)
faces = session.execute( f"select f.* from face f join face_file_link ffl on f.id = ffl.face_id where ffl.file_eid = {e.id}" )
for face in faces:
who, fd = BestFaceMatch(dist, face.id, threshold )
@@ -1428,6 +1696,10 @@ def ScanFileForPerson( job, e, force=False ):
return
####################################################################################################################################
# MAIN - start with validation, then grab any jobs in the DB to process, then
# go into waiting on a socket to be woken up (and then if woken, back into HandleJobs()
####################################################################################################################################
if __name__ == "__main__":
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )