1497 lines
64 KiB
Python
1497 lines
64 KiB
Python
###
|
|
#
|
|
# This file controls the 'external' job control manager, that (periodically #
|
|
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
|
#
|
|
# It then stores the progress/status, etc. in job and joblog tables as needed
|
|
# via wrapper functions.
|
|
#
|
|
# The whole pa_job_manager is multi-threaded, and uses the database tables for
|
|
# state management and communication back to the pa web site
|
|
#
|
|
###
|
|
|
|
# pylint: disable=no-member
|
|
|
|
|
|
# global debug setting
|
|
DEBUG=1
|
|
|
|
### SQLALCHEMY IMPORTS ###
|
|
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.orm import scoped_session
|
|
|
|
### SQLALCHEMY IMPORTS ###
|
|
|
|
### LOCAL FILE IMPORTS ###
|
|
|
|
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName
|
|
from datetime import datetime, timedelta, date
|
|
|
|
### PYTHON LIB IMPORTS ###
|
|
import pytz
|
|
import time
|
|
import os
|
|
import glob
|
|
from PIL import Image, ImageOps
|
|
from pymediainfo import MediaInfo
|
|
import hashlib
|
|
import exifread
|
|
import base64
|
|
import numpy
|
|
import cv2
|
|
import socket
|
|
import threading
|
|
import io
|
|
import face_recognition
|
|
import re
|
|
import sys
|
|
|
|
|
|
# this is required to handle the duplicate processing code
|
|
sys.setrecursionlimit(50000)
|
|
|
|
# a Manager, which the Session will use for connection resources
|
|
some_engine = create_engine(DB_URL)
|
|
|
|
# create a configured "Session" class
|
|
#Session = sessionmaker(bind=some_engine)
|
|
|
|
# create a Session
|
|
|
|
session_factory = sessionmaker(bind=some_engine)
|
|
Session = scoped_session(session_factory)
|
|
session = Session()
|
|
|
|
Base = declarative_base()
|
|
|
|
|
|
################################################################################
|
|
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
|
# This has to match one-for-one the DB table
|
|
################################################################################
|
|
class PathType(Base):
|
|
__tablename__ = "path_type"
|
|
id = Column(Integer, Sequence('path_type_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}"
|
|
|
|
|
|
class PathDirLink(Base):
|
|
__tablename__ = "path_dir_link"
|
|
path_id = Column(Integer, ForeignKey("path.id"), primary_key=True )
|
|
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<path_id: {self.path_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
class EntryDirLink(Base):
|
|
__tablename__ = "entry_dir_link"
|
|
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
class Path(Base):
|
|
__tablename__ = "path"
|
|
id = Column(Integer, Sequence('path_id_seq'), primary_key=True )
|
|
type_id = Column(Integer, ForeignKey("path_type.id"))
|
|
type = relationship("PathType")
|
|
path_prefix = Column(String, unique=True, nullable=False )
|
|
num_files = Column(Integer)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, type={self.type}, path_prefix={self.path_prefix}>"
|
|
|
|
class Dir(Base):
|
|
__tablename__ = "dir"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
rel_path = Column(String, unique=True, nullable=False )
|
|
in_path = relationship("Path", secondary="path_dir_link", uselist=False)
|
|
last_import_date = Column(Float)
|
|
|
|
def PathOnFS(self):
|
|
return self.in_path.path_prefix+'/'+self.rel_path
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, rel_path: {self.rel_path}, in_path={self.in_path}, last_import_date: {self.last_import_date}>"
|
|
|
|
class Entry(Base):
|
|
__tablename__ = "entry"
|
|
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
|
name = Column(String, unique=False, nullable=False )
|
|
type_id = Column(Integer, ForeignKey("file_type.id"))
|
|
exists_on_fs=Column(Boolean)
|
|
type=relationship("FileType")
|
|
dir_details = relationship( "Dir", uselist=False )
|
|
file_details = relationship( "File", uselist=False )
|
|
in_dir = relationship ("Dir", secondary="entry_dir_link", uselist=False )
|
|
|
|
def FullPathOnFS(self):
|
|
if self.in_dir:
|
|
s=self.in_dir.in_path.path_prefix + '/'
|
|
if len(self.in_dir.rel_path) > 0:
|
|
s += self.in_dir.rel_path + '/'
|
|
# this occurs when we have a dir that is the root of a path
|
|
else:
|
|
s=self.dir_details.in_path.path_prefix+'/'
|
|
s += self.name
|
|
return s
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name: {self.name}, type={self.type}, exists_on_fs={self.exists_on_fs}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}>"
|
|
|
|
class File(Base):
|
|
__tablename__ = "file"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
size_mb = Column(Integer, unique=False, nullable=False)
|
|
hash = Column(Integer, unique=True, nullable=True)
|
|
thumbnail = Column(String, unique=False, nullable=True)
|
|
year = Column(Integer)
|
|
month = Column(Integer)
|
|
day = Column(Integer)
|
|
woy = Column(Integer)
|
|
last_hash_date = Column(Float)
|
|
faces = Column( LargeBinary )
|
|
faces_created_on = Column(Float)
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, last_hash_date: {self.last_hash_date}>"
|
|
|
|
class DelFile(Base):
|
|
__tablename__ = "del_file"
|
|
file_eid = Column(Integer, ForeignKey("file.eid"), primary_key=True )
|
|
orig_path_prefix = Column(String, unique=False )
|
|
|
|
def __repr__(self):
|
|
return f"<file_eid: {self.file_eid}, orig_path_prefix={self.orig_path_prefix}>"
|
|
|
|
class FileType(Base):
|
|
__tablename__ = "file_type"
|
|
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}>"
|
|
|
|
class Settings(Base):
|
|
__tablename__ = "settings"
|
|
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
|
|
import_path = Column(String)
|
|
storage_path = Column(String)
|
|
recycle_bin_path = Column(String)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, import_path: {self.import_path}, recycle_bin_path: {self.recycle_bin_path}>"
|
|
|
|
class PersonRefimgLink(Base):
|
|
__tablename__ = "person_refimg_link"
|
|
person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True)
|
|
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
|
|
|
|
def __repr__(self):
|
|
return f"<person_id: {self.person_id}, refimg_id: {self.refimg_id}>"
|
|
|
|
class Person(Base):
|
|
__tablename__ = "person"
|
|
id = Column(Integer, Sequence('person_id_seq'), primary_key=True )
|
|
tag = Column(String(48), unique=False, nullable=False)
|
|
surname = Column(String(48), unique=False, nullable=False)
|
|
firstname = Column(String(48), unique=False, nullable=False)
|
|
refimg = relationship('Refimg', secondary=PersonRefimgLink.__table__)
|
|
|
|
def __repr__(self):
|
|
return f"<tag: {self.tag}, firstname: {self.firstname}, surname: {self.surname}, refimg: {self.refimg}>"
|
|
|
|
class Refimg(Base):
|
|
__tablename__ = "refimg"
|
|
id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True )
|
|
fname = Column(String(256), unique=True, nullable=False)
|
|
face = Column(LargeBinary, unique=True, nullable=False)
|
|
thumbnail = Column(String, unique=False, nullable=True)
|
|
created_on = Column(Float)
|
|
orig_w = Column(Integer)
|
|
orig_h = Column(Integer)
|
|
face_locn = Column(String)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, fname: {self.fname}, created_on: {self.created_on}>"
|
|
|
|
class Face(Base):
|
|
__tablename__ = "face"
|
|
id = Column(Integer, Sequence('face_id_seq'), primary_key=True )
|
|
face = Column( LargeBinary )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, face={self.face}"
|
|
|
|
class FaceFileLink(Base):
|
|
__tablename__ = "face_file_link"
|
|
face_id = Column(Integer, ForeignKey("face.id"), primary_key=True )
|
|
file_eid = Column(Integer, ForeignKey("file.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<face_id: {self.face_id}, file_eid={self.file_eid}"
|
|
|
|
class FaceRefimgLink(Base):
|
|
__tablename__ = "face_refimg_link"
|
|
face_id = Column(Integer, ForeignKey("face.id"), primary_key=True )
|
|
refimg_id = Column(Integer, ForeignKey("refimg.id"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<face_id: {self.face_id}, refimg_id={self.refimg_id}"
|
|
|
|
|
|
|
|
################################################################################
|
|
# classes for the job manager:
|
|
# Job (and Joblog, JobExtra) for each Job, and
|
|
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
|
|
################################################################################
|
|
class Joblog(Base):
|
|
__tablename__ = "joblog"
|
|
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
log_date = Column(DateTime(timezone=True))
|
|
log = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
|
|
|
|
class JobExtra(Base):
|
|
__tablename__ = "jobextra"
|
|
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
name = Column(String)
|
|
value = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
|
|
|
|
class Job(Base):
|
|
__tablename__ = "job"
|
|
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
|
|
start_time = Column(DateTime(timezone=True))
|
|
last_update = Column(DateTime(timezone=True))
|
|
name = Column(String)
|
|
state = Column(String)
|
|
num_files = Column(Integer)
|
|
current_file_num = Column(Integer)
|
|
current_file = Column(String)
|
|
wait_for = Column(Integer)
|
|
pa_job_state = Column(String)
|
|
|
|
logs = relationship( "Joblog")
|
|
extra = relationship( "JobExtra")
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
|
|
|
|
class PA_JobManager_FE_Message(Base):
|
|
__tablename__ = "pa_job_manager_fe_message"
|
|
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
alert = Column(String)
|
|
message = Column(String)
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
|
|
|
##############################################################################
|
|
# Util (non Class) functions
|
|
##############################################################################
|
|
def MessageToFE( job_id, alert, message ):
|
|
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
|
session.add(msg)
|
|
session.commit()
|
|
return msg.id
|
|
|
|
def ProcessRecycleBinDir(job):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / recycle bin path is missing")
|
|
ptype = session.query(PathType).filter(PathType.name=='Bin').first()
|
|
paths = settings.recycle_bin_path.split("#")
|
|
|
|
for path in paths:
|
|
if not os.path.exists( path ):
|
|
AddLogForJob( job, f"Not Importing {path} -- Path does not exist" )
|
|
continue
|
|
symlink=SymlinkName(ptype.name, path, path)
|
|
# create the Path (and Dir objects for the Bin)
|
|
AddPath( job, symlink, ptype.id )
|
|
|
|
session.commit()
|
|
|
|
return
|
|
|
|
def ProcessStorageDirs(parent_job):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / storage path is missing")
|
|
paths = settings.storage_path.split("#")
|
|
ptype = session.query(PathType).filter(PathType.name=='Storage').first()
|
|
JobsForPaths( parent_job, paths, ptype )
|
|
return
|
|
|
|
def ProcessImportDirs(parent_job):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
|
paths = settings.import_path.split("#")
|
|
ptype = session.query(PathType).filter(PathType.name=='Import').first()
|
|
JobsForPaths( parent_job, paths, ptype )
|
|
return
|
|
|
|
def JobsForPaths( parent_job, paths, ptype ):
|
|
now=datetime.now(pytz.utc)
|
|
# make new set of Jobs per path... HandleJobs will make them run later
|
|
for path in paths:
|
|
p=session.query(Path).filter(Path.path_prefix==SymlinkName(ptype.name,path,path+'/')).first()
|
|
cfn=0
|
|
if p:
|
|
cfn=p.num_files
|
|
|
|
jex=JobExtra( name="path", value=path )
|
|
jex2=JobExtra( name="path_type", value=ptype.id )
|
|
job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=cfn )
|
|
job.extra.append(jex)
|
|
job.extra.append(jex2)
|
|
session.add(job)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a>".format( job.id, job.id, job.name ) )
|
|
# force commit to make job.id be valid in use of wait_for later
|
|
session.commit()
|
|
jex2=JobExtra( name="path", value=path )
|
|
job2=Job(start_time=now, last_update=now, name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 )
|
|
job2.extra.append(jex2)
|
|
session.add(job2)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) )
|
|
|
|
"""
|
|
jex3=JobExtra( name="path", value=path )
|
|
job3=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
|
|
job3.extra.append(jex4)
|
|
session.add(job3)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
|
|
"""
|
|
### FIXME: wait for job3 not job2!
|
|
job4=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
|
|
session.add(job4)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job4.id, job4.id, job4.name, job4.wait_for ) )
|
|
HandleJobs()
|
|
return
|
|
|
|
def ProcessFileForJob(job, message, current_file):
|
|
job.current_file=os.path.basename(current_file)
|
|
if job.num_files:
|
|
job.current_file_num=job.current_file_num+1
|
|
AddLogForJob(job, message )
|
|
return
|
|
|
|
def AddLogForJob(job, message):
|
|
now=datetime.now(pytz.utc)
|
|
log=Joblog( job_id=job.id, log=message, log_date=now )
|
|
job.last_update=now
|
|
session.add(log)
|
|
# some logs have DEBUG: in front, so clean that up
|
|
message = message.replace("DEBUG:", "" )
|
|
# if its been more than 5 seconds since our last log, then commit to the DB to show some progress
|
|
if hasattr(job, 'last_commit'):
|
|
if (now - job.last_commit).seconds > 5:
|
|
job.last_commmit=now
|
|
print( "DELME: we have taken longer than 5 seconds since last commit so do it")
|
|
session.commit()
|
|
else:
|
|
job.last_commit = now
|
|
if DEBUG:
|
|
print( f"DEBUG: {message}" )
|
|
return
|
|
|
|
def RunJob(job):
|
|
# session = Session()
|
|
job.start_time=datetime.now(pytz.utc)
|
|
if job.name =="scannow":
|
|
JobScanNow(job)
|
|
elif job.name =="forcescan":
|
|
JobForceScan(job)
|
|
elif job.name =="scan_sp":
|
|
JobScanStorageDir(job)
|
|
elif job.name =="importdir":
|
|
JobImportDir(job)
|
|
elif job.name =="getfiledetails":
|
|
JobGetFileDetails(job)
|
|
elif job.name == "checkdups":
|
|
CheckForDups(job)
|
|
elif job.name == "rmdups":
|
|
RemoveDups(job)
|
|
elif job.name == "delete_files":
|
|
JobDeleteFiles(job)
|
|
elif job.name == "move_files":
|
|
JobMoveFiles(job)
|
|
elif job.name == "restore_files":
|
|
JobRestoreFiles(job)
|
|
elif job.name == "processai":
|
|
JobProcessAI(job)
|
|
elif job.name == "run_ai_on":
|
|
JobRunAIOn(job)
|
|
elif job.name == "rotate_image":
|
|
JobRotateImage(job)
|
|
else:
|
|
print("ERROR: Requested to process unknown job type: {}".format(job.name))
|
|
# okay, we finished a job, so check for any jobs that are dependant on this and run them...
|
|
# session.close()
|
|
if job.pa_job_state != "Completed":
|
|
FinishJob(job, "PA Job Manager - This is a catchall to close of a Job, this should never be seen and implies a job did not actually complete?", "Failed" )
|
|
HandleJobs()
|
|
return
|
|
|
|
def CancelJob(job,id):
|
|
for j in session.query(Job).filter(Job.wait_for==id).all():
|
|
FinishJob(j, f"Job (#{j.id}) has been withdrawn as the job being waited for #{job.id} failed", "Withdrawn" )
|
|
CancelJob(j, j.id)
|
|
return
|
|
|
|
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
|
job.state=state
|
|
job.pa_job_state=pa_job_state
|
|
job.last_update=datetime.now(pytz.utc)
|
|
AddLogForJob(job, last_log)
|
|
if job.state=="Failed":
|
|
CancelJob(job,job.id)
|
|
session.commit()
|
|
if DEBUG==1:
|
|
print( f"DEBUG: {last_log}" )
|
|
return
|
|
|
|
def HandleJobs():
|
|
if DEBUG==1:
|
|
print("INFO: PA job manager is scanning for new jobs to process")
|
|
for job in session.query(Job).all():
|
|
if job.pa_job_state == 'New':
|
|
if job.wait_for != None:
|
|
j2 = session.query(Job).get(job.wait_for)
|
|
if not j2:
|
|
print ("ERROR: job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
|
|
for j in session.query(Job).all():
|
|
print ("ERROR: j={}".format(j.id))
|
|
continue
|
|
if j2.pa_job_state != 'Completed':
|
|
continue
|
|
|
|
# use this to remove threads for easier debugging, and errors will stacktrace to the console
|
|
if DEBUG==1:
|
|
print("*************************************")
|
|
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
|
|
RunJob(job)
|
|
else:
|
|
try:
|
|
RunJob(job)
|
|
# threading.Thread(target=RunJob, args=(job,)).start()
|
|
except Exception as e:
|
|
try:
|
|
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
|
|
except Exception as e2:
|
|
print("ERROR: Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
|
print("INFO: PA job manager is waiting for a job")
|
|
return
|
|
|
|
def JobProgressState( job, state ):
|
|
job.pa_job_state = "In Progress"
|
|
job.state=state
|
|
session.commit()
|
|
return
|
|
|
|
def JobScanNow(job):
|
|
JobProgressState( job, "In Progress" )
|
|
ProcessImportDirs(job)
|
|
FinishJob( job, "Completed (scan for new files)" )
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
session.commit()
|
|
return
|
|
|
|
def JobScanStorageDir(job):
|
|
JobProgressState( job, "In Progress" )
|
|
ProcessStorageDirs(job)
|
|
FinishJob( job, "Completed (scan for new files)" )
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
session.commit()
|
|
return
|
|
|
|
def JobForceScan(job):
|
|
JobProgressState( job, "In Progress" )
|
|
session.query(FaceFileLink).delete()
|
|
session.query(FaceRefimgLink).delete()
|
|
session.query(DelFile).delete()
|
|
session.query(EntryDirLink).delete()
|
|
session.query(PathDirLink).delete()
|
|
session.query(Path).delete()
|
|
session.query(Dir).delete()
|
|
session.query(File).delete()
|
|
session.query(Entry).delete()
|
|
session.commit()
|
|
ProcessRecycleBinDir(job)
|
|
ProcessImportDirs(job)
|
|
ProcessStorageDirs(job)
|
|
FinishJob(job, "Completed (forced remove and recreation of all file data)")
|
|
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
|
|
session.commit()
|
|
return
|
|
|
|
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
|
|
def CreateSymlink(job,ptype,path):
|
|
path_type = session.query(PathType).get(ptype)
|
|
symlink=SymlinkName(path_type.name, path, path)
|
|
if not os.path.exists(symlink):
|
|
print( f"INFO: symlink does not exist, actually creating it -- s={symlink}" )
|
|
os.makedirs( os.path.dirname(symlink), mode=0o777, exist_ok=True )
|
|
os.symlink(path, symlink)
|
|
return symlink
|
|
|
|
# function to create or return a Path object basedon the path prefix (pp) and the type (the type id of Import, Storage, Bin)
|
|
# if the Path is created, it also creates the Dir object (which in turn creates the Entry object)
|
|
def AddPath(job, pp, type ):
|
|
path_obj=session.query(Path).filter(Path.path_prefix==pp).first()
|
|
if not path_obj:
|
|
path_obj=Path( path_prefix=pp, num_files=0, type_id=type )
|
|
# if there is no path yet, then there is no Dir for it, so create that too (which creates the entry, etc.)
|
|
# Dir name is os.path.basename(pp), is not in another Dir (None), and its relative path is "", inside the path_obj we just created
|
|
dir=AddDir( job, os.path.basename(pp), None, "", path_obj )
|
|
session.add(path_obj)
|
|
session.add(dir)
|
|
return path_obj
|
|
|
|
|
|
################################################################################################################################################################
|
|
#
|
|
# Key function that runs as part of (usually) an import job. The name of the directory (dirname) is checked to see
|
|
# if it already is in the database (inside of in_dir in in_path). If it is,
|
|
# just return the db entry. If not, then we create a new row in Dir, that has name of dirname, has a parent directory
|
|
# of in_dir (DB object), has the rel_path set to the relative fs path from the actual fs path to this entry (including the dirname)
|
|
# and the in_path set to the overarching path (one of an Import, Storage or Recycle_bin path in the DB)
|
|
#
|
|
# e.g. path on FS: /home/ddp/src/photoassistant/images_to_process/ ... ends in DB as path_prefix="static/Import/images_to_process"
|
|
# and we have a dir in /home/ddp/src/photoassistant/images_to_process/0000/subtest, then we call:
|
|
# AddDir( job, dirname='subtest', in_dir=Dir object for '0000', rel_path='0000/subtest', in_path=Path object for 'static/Import/images_to_process' )
|
|
#
|
|
################################################################################################################################################################
|
|
def AddDir(job, dirname, in_dir, rel_path, in_path ):
|
|
dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==in_path.id).filter(Dir.rel_path==rel_path).first()
|
|
if dir:
|
|
e=session.query(Entry).get(dir.eid)
|
|
e.exists_on_fs=True
|
|
return dir
|
|
dir=Dir( last_import_date=0, rel_path=rel_path, in_path=in_path )
|
|
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
|
e=Entry( name=dirname, type=dtype, exists_on_fs=True )
|
|
e.dir_details=dir
|
|
# no in_dir occurs when we Add the actual Dir for the import_path (top of the tree)
|
|
if in_dir:
|
|
e.in_dir=in_dir
|
|
if DEBUG==1:
|
|
AddLogForJob(job, f"DEBUG: Process new dir: {dirname}, rel_path={rel_path}")
|
|
session.add(e)
|
|
return dir
|
|
|
|
def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ):
|
|
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first()
|
|
if e:
|
|
e.exists_on_fs=True
|
|
return e
|
|
ftype = session.query(FileType).filter(FileType.name==type_str).first()
|
|
e=Entry( name=fname, type=ftype, exists_on_fs=True )
|
|
f=File( size_mb=fsize, last_hash_date=0, faces_created_on=0, year=year, month=month, day=day, woy=woy )
|
|
e.file_details = f
|
|
e.in_dir=in_dir
|
|
AddLogForJob(job, "Found new file: {}".format(fname) )
|
|
session.add(e)
|
|
return e
|
|
|
|
# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that
|
|
# is still false, has been deleted
|
|
def ResetExistsOnFS(job, path):
|
|
reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).all()
|
|
for reset_dir in reset_dirs:
|
|
reset_dir.exists_on_fs=False
|
|
session.add(reset_dir)
|
|
reset_files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==reset_dir.id).all()
|
|
for reset_file in reset_files:
|
|
reset_file.exists_on_fs=False
|
|
session.add(reset_file)
|
|
return
|
|
|
|
# Convenience function to remove a file from the database - and its associated links
|
|
# used when scanning and a file has been removed out from under PA, or
|
|
# when we remove duplicates
|
|
def RemoveFileFromDB(id):
|
|
session.query(EntryDirLink).filter(EntryDirLink.entry_id==id).delete()
|
|
session.query(File).filter(File.eid==id).delete()
|
|
session.query(Entry).filter(Entry.id==id).delete()
|
|
return
|
|
|
|
# Actually moves the physical file from its current real directory to a subdir of the recycle bin path
|
|
def RemoveFileFromFS( del_me ):
|
|
try:
|
|
settings = session.query(Settings).first()
|
|
dst_dir=settings.recycle_bin_path + '/' + del_me.in_dir.in_path.path_prefix.replace('static/','') + '/' + del_me.in_dir.rel_path + '/'
|
|
os.makedirs( dst_dir,mode=0o777, exist_ok=True )
|
|
src=del_me.FullPathOnFS()
|
|
dst=dst_dir + '/' + del_me.name
|
|
os.replace( src, dst )
|
|
except Exception as e:
|
|
print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}")
|
|
return
|
|
|
|
# Function that restores a file that was deleted (moved into the Bin)
|
|
# it moves file on the filesystem back to its original path and then changes the database path from the Bin path
|
|
# to the original import or storage path and appropriate dir
|
|
def RestoreFile(job,restore_me):
|
|
try:
|
|
# rel_path for a file in the Bin, is like 'Import/images_to_process/1111', so just prepend static/
|
|
dst_dir='static/' + restore_me.in_dir.rel_path + '/'
|
|
os.makedirs( dst_dir,mode=0o777, exist_ok=True )
|
|
src=restore_me.FullPathOnFS()
|
|
dst=dst_dir + '/' + restore_me.name
|
|
os.replace( src, dst )
|
|
except Exception as e:
|
|
print( f"ERROR: Failed to restores (mv) file on filesystem - which={src} to {dst}, err: {e}")
|
|
|
|
# need these for AddDir calls below to work
|
|
orig_file_details = session.query(DelFile).get(restore_me.id)
|
|
orig_path = session.query(Path).filter(Path.path_prefix==orig_file_details.orig_path_prefix).first()
|
|
parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==orig_path.id).first()
|
|
|
|
# in case our new_rel_path is just in the top level of the path, make
|
|
# new_dir = parent_dir so restore_me.in_dir = new_dir after the for loop works
|
|
new_dir=parent_dir
|
|
|
|
# e.g. restore_me's rel_path 'Import/images_to_process/1111', orig_path was 'static/Import/images_to_process', need new rel_path to be just the 1111 bit...
|
|
new_rel_path='static/'+restore_me.in_dir.rel_path
|
|
new_rel_path=new_rel_path.replace(orig_file_details.orig_path_prefix, '')
|
|
if len(new_rel_path) > 0 and new_rel_path[-1] == '/':
|
|
new_rel_path=new_rel_path[0:-1]
|
|
if len(new_rel_path) > 0 and new_rel_path[0] == '/':
|
|
new_rel_path=new_rel_path[1:]
|
|
|
|
# path (think Import/Dir1/Dir2) which b/c we have orig_path in AddDir will
|
|
# create static/Import, static/Import/Dir1, static/Import/Dir1/Dir2
|
|
part_rel_path=""
|
|
for dirname in new_rel_path.split("/"):
|
|
part_rel_path += f"{dirname}"
|
|
new_dir=AddDir( job, dirname, parent_dir, part_rel_path, orig_path )
|
|
parent_dir=new_dir
|
|
part_rel_path += "/"
|
|
|
|
restore_me.in_dir = new_dir
|
|
AddLogForJob(job, f"Restored file: {restore_me.name} to {os.path.dirname(restore_me.FullPathOnFS())}" )
|
|
### when restoring, an original dir tree might have been removed, so need make it (if needed)
|
|
os.makedirs( os.path.dirname(restore_me.FullPathOnFS()),mode=0o777, exist_ok=True )
|
|
# remove DelFile entry for this restored file
|
|
session.query(DelFile).filter(DelFile.file_eid==restore_me.id).delete()
|
|
session.commit()
|
|
return
|
|
|
|
# Function that moves a file we are "deleting" to the recycle bin, it moves the
|
|
# file on the filesystem and then changes the database path from the import or
|
|
# storage path over to the Bin path
|
|
def MoveFileToRecycleBin(job,del_me):
|
|
try:
|
|
settings = session.query(Settings).first()
|
|
dst_dir=settings.recycle_bin_path + '/' + del_me.in_dir.in_path.path_prefix.replace('static/','') + '/' + del_me.in_dir.rel_path + '/'
|
|
os.makedirs( dst_dir,mode=0o777, exist_ok=True )
|
|
src=del_me.FullPathOnFS()
|
|
dst=dst_dir + '/' + del_me.name
|
|
os.replace( src, dst )
|
|
except Exception as e:
|
|
print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}")
|
|
|
|
# need these for AddDir calls below to work
|
|
bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
|
|
parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==bin_path.id).first()
|
|
|
|
# if we ever need to restore, lets remember this file's original path
|
|
# (use a string in case the dir/path is ever deleted from FS (and then DB) and we need to recreate)
|
|
del_file_details = DelFile( file_eid=del_me.id, orig_path_prefix=del_me.in_dir.in_path.path_prefix )
|
|
session.add( del_file_details )
|
|
|
|
# remove static from rel path, as it will move to static/Bin anyway...
|
|
new_rel_path=del_me.in_dir.in_path.path_prefix.replace('static/','')
|
|
# if there is a relative path on this dir, add it to the new_rel_path as there is only ever 1 Bin path
|
|
if len(del_me.in_dir.rel_path):
|
|
new_rel_path += '/' + del_me.in_dir.rel_path
|
|
|
|
# okay, go through new relative path and AddDir any missing subdirs of this
|
|
# path (think Import/Dir1/Dir2) which b/c we have bin_path in AddDir will
|
|
# create Bin/Import, Bin/Import/Dir1, Bin/Import/Dir1/Dir2
|
|
part_rel_path=""
|
|
for dirname in new_rel_path.split("/"):
|
|
part_rel_path += f"{dirname}"
|
|
new_dir=AddDir( job, dirname, parent_dir, part_rel_path, bin_path )
|
|
parent_dir=new_dir
|
|
part_rel_path += "/"
|
|
|
|
del_me.in_dir = new_dir
|
|
AddLogForJob(job, f"Deleted file: {del_me.name} - (moved to {os.path.dirname(del_me.FullPathOnFS())})" )
|
|
return
|
|
|
|
# Function that moves a file into a new folder in the storage path - if needed it makes the folder on the FS,
|
|
# moves the file into the folder on the FS and then changes the database path to the relevant Storage path
|
|
def MoveFileToNewFolderInStorage(job,move_me, dst_storage_path, dst_rel_path):
|
|
print( f"MoveFileToNewFolderInStorage: {move_me} to {dst_storage_path} in new? folder: {dst_storage_path}")
|
|
try:
|
|
dst_dir=dst_storage_path.path_prefix + '/' + dst_rel_path
|
|
print( f"would make dir: {dst_dir}" )
|
|
os.makedirs( dst_dir,mode=0o777, exist_ok=True )
|
|
src=move_me.FullPathOnFS()
|
|
dst=dst_dir + '/' + move_me.name
|
|
os.replace( src, dst )
|
|
print( f"would mv {src} {dst}" )
|
|
except Exception as e:
|
|
print( f"ERROR: Failed to move file to new location on filesystem - which={src}, location={dir}, err: {e}")
|
|
|
|
# need these for AddDir calls below to work
|
|
parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==dst_storage_path.id).first()
|
|
|
|
# remove static from rel path, as it will move to static/Bin anyway...
|
|
# new_rel_path=del_me.in_dir.in_path.path_prefix.replace('static/','')
|
|
# if there is a relative path on this dir, add it to the new_rel_path as there is only ever 1 Bin path
|
|
# if len(del_me.in_dir.rel_path):
|
|
# new_rel_path += '/' + del_me.in_dir.rel_path
|
|
|
|
# okay, go through new relative path and AddDir any missing subdirs of this
|
|
# path (think Import/Dir1/Dir2) which b/c we have bin_path in AddDir will
|
|
# create Bin/Import, Bin/Import/Dir1, Bin/Import/Dir1/Dir2
|
|
part_rel_path=""
|
|
for dirname in dst_rel_path.split("/"):
|
|
part_rel_path += f"{dirname}"
|
|
print( f"Should make a Dir in the DB for {dirname} with parent: {parent_dir}, prp={part_rel_path} in storage path" )
|
|
new_dir=AddDir( job, dirname, parent_dir, part_rel_path, dst_storage_path )
|
|
parent_dir=new_dir
|
|
part_rel_path += "/"
|
|
print( f"now should change {move_me} in_dir to {new_dir} created above in {dst_storage_path}" )
|
|
move_me.in_dir = new_dir
|
|
move_me.in_path = dst_storage_path
|
|
print( f"DONE change of {move_me} in_dir to {new_dir} created above" )
|
|
AddLogForJob(job, f"{move_me.name} - (moved to {os.path.dirname(move_me.FullPathOnFS())})" )
|
|
return
|
|
|
|
# Convenience function to remove a dir from the database - and its associated links
|
|
def RemoveDirFromDB(id):
|
|
session.query(EntryDirLink).filter(EntryDirLink.entry_id==id).delete()
|
|
session.query(Dir).filter(Dir.eid==id).delete()
|
|
session.query(Entry).filter(Entry.id==id).delete()
|
|
return
|
|
|
|
# this routine is used when a scan finds files/dirs that have been removed
|
|
# underneath PA, so it just deletes them form the DB
|
|
def HandleAnyFSDeletions(job):
|
|
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
|
rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all()
|
|
rm_cnt=0
|
|
for rm in rms:
|
|
RemoveFileFromDB(rm.id)
|
|
AddLogForJob( job, f"INFO: Removing {rm.name} from system as it is no longer on the file system")
|
|
rm_cnt+=1
|
|
|
|
rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==1).order_by(Entry.id.desc()).all()
|
|
for rmdir in rmdirs:
|
|
RemoveFileFromDB(rmdir.id)
|
|
AddLogForJob( job, f"INFO: Removing {rmdir.name} from system as it is no longer on the file system")
|
|
rm_cnt+=1
|
|
return rm_cnt
|
|
|
|
# try several ways to work out date of file created (try exif, then filename, lastly filesystem)
|
|
def GetDateFromFile(file, stat):
|
|
# try exif
|
|
try:
|
|
f = open(file, 'rb')
|
|
tags = exifread.process_file(f)
|
|
date_str, _ = str(tags["EXIF DateTimeOriginal"]).split(" ")
|
|
year, month, day = date_str.split(":")
|
|
year=int(year)
|
|
month=int(month)
|
|
day=int(day)
|
|
check = datetime( year, month, day )
|
|
except:
|
|
# try parsing filename
|
|
try:
|
|
m=re.search( r'(\d{4})(\d{2})(\d{2})', file)
|
|
year=int(m[1])
|
|
month=int(m[2])
|
|
day=int(m[3])
|
|
check2 = datetime( year, month, day )
|
|
# give up and use file sys date
|
|
except:
|
|
year, month, day, _, _, _, _, _, _ = datetime.fromtimestamp(stat.st_ctime).timetuple()
|
|
c=date(year, month, day).isocalendar()
|
|
woy=c[1]
|
|
return year, month, day, woy
|
|
|
|
def AddJexToDependantJobs(job,name,value):
|
|
for j in session.query(Job).filter(Job.wait_for==job.id).all():
|
|
jex=JobExtra( name=name, value=value )
|
|
j.extra.append(jex)
|
|
AddJexToDependantJobs(j, name, value)
|
|
return
|
|
|
|
def JobImportDir(job):
|
|
JobProgressState( job, "In Progress" )
|
|
settings = session.query(Settings).first()
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path_type=[jex.value for jex in job.extra if jex.name == "path_type"][0]
|
|
AddLogForJob(job, f"Checking {path_type} Directory: {path}" )
|
|
if DEBUG==1:
|
|
print( f"DEBUG: Checking Directory: {path}" )
|
|
if not os.path.exists( path ):
|
|
FinishJob( job, f"Finished Importing: {path} -- Path does not exist", "Failed" )
|
|
return
|
|
ptype = session.query(PathType).get(path_type)
|
|
symlink=SymlinkName(ptype.name, path, path)
|
|
|
|
# create/find the Path
|
|
path_obj=AddPath( job, symlink, path_type )
|
|
session.commit()
|
|
# for recycle bin path, we dont want to import content, just create the path/dir vars (above) in the DB
|
|
bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
|
|
if bin_path != None and path_type == bin_path.type.id:
|
|
return
|
|
|
|
# find all jobs waiting on me and their children, etc. and add a path_prefix jex to symlink, so we can just reference it form here on in, rather than recreate that string
|
|
AddJexToDependantJobs(job,"path_prefix",symlink)
|
|
ResetExistsOnFS(job, symlink)
|
|
|
|
# go through data once to work out file_cnt so progress bar works from first import
|
|
walk=os.walk(path, topdown=True)
|
|
ftree=list(walk)
|
|
overall_file_cnt=0
|
|
for root, subdirs, files in ftree:
|
|
overall_file_cnt+= len(subdirs) + len(files)
|
|
path_obj.num_files=overall_file_cnt
|
|
|
|
# find the dir we created with AddPath to use for entries at top-level of path
|
|
dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==path_obj.id,Dir.rel_path=='').first()
|
|
# session.add in case we already have imported this dir (as AddDir wont) & now we might have diff num of files to last time,
|
|
session.add(dir)
|
|
|
|
# if we set / then commit this now, the web page will know how many files
|
|
# to process as we then do the slow job of processing them
|
|
job.num_files=overall_file_cnt
|
|
AddLogForJob(job, f"Found {overall_file_cnt} file(s) to process")
|
|
session.commit()
|
|
|
|
# root == path of dir, files are in dir... subdirs are in dir
|
|
for root, subdirs, files in ftree:
|
|
# already create root above to work out num_files for whole os.walk
|
|
if root != path:
|
|
pp=SymlinkName( path_obj.type.name, path, root )+'/'+os.path.basename(root)
|
|
rel_path=pp.replace(symlink+'/','')
|
|
parent_dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==path_obj.id,Dir.rel_path==os.path.dirname(rel_path)).first()
|
|
dir=AddDir(job, os.path.basename(root), parent_dir, rel_path, path_obj)
|
|
for basename in files:
|
|
# commit every 100 files to see progress being made but not hammer the database
|
|
if job.current_file_num % 100 == 0:
|
|
session.commit()
|
|
fname=dir.PathOnFS()+'/'+basename
|
|
|
|
stat = os.stat(fname)
|
|
if stat.st_ctime > dir.last_import_date:
|
|
if DEBUG==1:
|
|
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
|
|
if isImage(fname):
|
|
type_str = 'Image'
|
|
elif isVideo(fname):
|
|
type_str = 'Video'
|
|
else:
|
|
type_str = 'Unknown'
|
|
fsize = round(stat.st_size/(1024*1024))
|
|
|
|
year, month, day, woy = GetDateFromFile(fname, stat)
|
|
e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy )
|
|
else:
|
|
if DEBUG==1:
|
|
print( f"DEBUG: { basename} - {stat.st_ctime} is OLDER than {dir.last_import_date}" )
|
|
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==basename,Dir.eid==dir.eid).first()
|
|
e.exists_on_fs=True
|
|
job.current_file=basename
|
|
job.current_file_num+=1
|
|
job.current_file_num += len(subdirs)
|
|
dir.last_import_date = time.time()
|
|
job.num_files=overall_file_cnt
|
|
|
|
rm_cnt=HandleAnyFSDeletions(job)
|
|
|
|
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)")
|
|
return
|
|
|
|
def RunFuncOnFilesInPath( job, path, file_func, count_dirs ):
|
|
d = session.query(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).filter(Dir.rel_path=='').first()
|
|
files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==d.eid).all()
|
|
for e in files:
|
|
ProcessFilesInDir(job, e, file_func, count_dirs)
|
|
return
|
|
|
|
|
|
def JobProcessAI(job):
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path_prefix=[jex.value for jex in job.extra if jex.name == "path_prefix"][0]
|
|
path = SymlinkName(path_prefix, path, '/')
|
|
p = session.query(Path).filter(Path.path_prefix==path).first()
|
|
job.num_files=p.num_files
|
|
|
|
RunFuncOnFilesInPath( job, path, ProcessAI, True )
|
|
|
|
FinishJob(job, "Finished Processesing AI")
|
|
return
|
|
|
|
def WrapperForScanFileForPerson(job, entry):
|
|
which_person=[jex.value for jex in job.extra if jex.name == "person"][0]
|
|
|
|
if entry.type.name == 'Image':
|
|
if DEBUG:
|
|
AddLogForJob( job, f'INFO: processing File: {entry.name}' )
|
|
for pid in job.ppl:
|
|
ScanFileForPerson( job, entry, pid, force=False)
|
|
# processed this file, add 1 to count
|
|
job.current_file_num+=1
|
|
return
|
|
|
|
def AddToJobImageCount(job, entry ):
|
|
if entry.type.name == 'Image':
|
|
job.num_files += 1
|
|
return
|
|
|
|
def JobRunAIOn(job):
|
|
AddLogForJob(job, f"INFO: Starting looking For faces in files job...")
|
|
which_person=[jex.value for jex in job.extra if jex.name == "person"][0]
|
|
if which_person == "all":
|
|
ppl=session.query(Person).all()
|
|
else:
|
|
ppl=session.query(Person).filter(Person.tag==which_person).all()
|
|
|
|
# start by working out how many images in this selection we will need face match on
|
|
job.num_files = 0
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
entry=session.query(Entry).get(jex.value)
|
|
if entry.type.name == 'Directory':
|
|
# False in last param says, dont count dirs (we won't AI a dir entry itself)
|
|
ProcessFilesInDir( job, entry, AddToJobImageCount, False )
|
|
elif entry.type.name == 'Image':
|
|
job.num_files += 1
|
|
# update job, so file count UI progress bar will work
|
|
# remember that ProcessFilesInDir updates the current_file_num so zero it out so we can start again
|
|
job.current_file_num = 0
|
|
session.commit()
|
|
|
|
ppl_lst=[]
|
|
for person in ppl:
|
|
ppl_lst.append(person.id)
|
|
|
|
job.ppl = ppl_lst
|
|
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
entry=session.query(Entry).get(jex.value)
|
|
if entry.type.name == 'Directory':
|
|
# False in last param says, dont count dirs (we won't AI a dir entry itself)
|
|
ProcessFilesInDir( job, entry, WrapperForScanFileForPerson, False )
|
|
elif entry.type.name == 'Image':
|
|
which_file=session.query(Entry).join(File).filter(Entry.id==jex.value).first()
|
|
if DEBUG:
|
|
AddLogForJob( job, f'INFO: processing File: {entry.name}' )
|
|
for person in ppl:
|
|
ScanFileForPerson( job, which_file, person.id, force=False)
|
|
# processed this file, add 1 to count
|
|
job.current_file_num+=1
|
|
else:
|
|
AddLogForJob( job, f'Not processing Entry: {entry.name} - not an image' )
|
|
FinishJob(job, "Finished Processesing AI")
|
|
return
|
|
|
|
def JobRotateImage(job):
|
|
AddLogForJob(job, f"INFO: Starting rotation/flip of image file...")
|
|
id=[jex.value for jex in job.extra if jex.name == "id"][0]
|
|
amt=[jex.value for jex in job.extra if jex.name == "amt"][0]
|
|
e=session.query(Entry).join(File).filter(Entry.id==id).first()
|
|
im = Image.open( e.FullPathOnFS() )
|
|
|
|
print(amt)
|
|
|
|
if amt == "fliph":
|
|
AddLogForJob(job, f"INFO: Flipping {e.FullPathOnFS()} horizontally" )
|
|
out = im.transpose(Image.FLIP_LEFT_RIGHT)
|
|
elif amt == "flipv":
|
|
AddLogForJob(job, f"INFO: Flipping {e.FullPathOnFS()} vertically" )
|
|
out = im.transpose(Image.FLIP_TOP_BOTTOM)
|
|
else:
|
|
AddLogForJob(job, f"INFO: Rotating {e.FullPathOnFS()} by {amt} degrees" )
|
|
out = im.rotate(int(amt), expand=True)
|
|
out.save( e.FullPathOnFS() )
|
|
e.file_details.thumbnail = GenThumb( e.FullPathOnFS() )
|
|
session.add(e)
|
|
FinishJob(job, "Finished Processesing image rotation/flip")
|
|
return
|
|
|
|
def GenHashAndThumb(job, e):
|
|
# commit every 100 files to see progress being made but not hammer the database
|
|
if job.current_file_num % 100 == 0:
|
|
session.commit()
|
|
stat = os.stat( e.FullPathOnFS() )
|
|
if stat.st_ctime < e.file_details.last_hash_date:
|
|
if DEBUG==1:
|
|
print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
|
|
job.current_file_num+=1
|
|
return
|
|
|
|
e.file_details.hash = md5( job, e.FullPathOnFS() )
|
|
if e.type.name == 'Image':
|
|
e.file_details.thumbnail = GenImageThumbnail( job, e.FullPathOnFS() )
|
|
elif e.type.name == 'Video':
|
|
e.file_details.thumbnail = GenVideoThumbnail( job, e.FullPathOnFS() )
|
|
elif e.type.name == 'Unknown':
|
|
job.current_file_num+=1
|
|
e.file_details.last_hash_date = time.time()
|
|
return
|
|
|
|
def ProcessAI(job, e):
|
|
if e.type.name != 'Image':
|
|
job.current_file_num+=1
|
|
return
|
|
|
|
file = e.FullPathOnFS()
|
|
stat = os.stat(file)
|
|
# find if file is newer than when we found faces before (fyi: first time faces_created_on == 0)
|
|
if stat.st_ctime > e.file_details.faces_created_on:
|
|
session.add(e)
|
|
im_orig = Image.open(file)
|
|
im = ImageOps.exif_transpose(im_orig)
|
|
|
|
faces = generateUnknownEncodings(im)
|
|
e.file_details.faces_created_on=time.time()
|
|
if faces:
|
|
flat_faces = numpy.array(faces)
|
|
e.file_details.faces = flat_faces.tobytes()
|
|
else:
|
|
e.file_details.faces = None
|
|
job.current_file_num+=1
|
|
return
|
|
else:
|
|
if not e.file_details.faces:
|
|
print("OPTIM: This image has no faces, skip it")
|
|
job.current_file_num+=1
|
|
return
|
|
recover=numpy.frombuffer(e.file_details.faces,dtype=numpy.float64)
|
|
real_recover=numpy.reshape(recover,(-1,128))
|
|
l=[]
|
|
for el in real_recover:
|
|
l.append(numpy.array(el))
|
|
faces = l
|
|
people = session.query(Person).all()
|
|
for unknown_encoding in faces:
|
|
for person in people:
|
|
lookForPersonInImage(job, person, unknown_encoding, e)
|
|
ProcessFileForJob(job, f"Finished processing {e.name}", e.name )
|
|
return
|
|
|
|
def lookForPersonInImage(job, person, unknown_encoding, e):
|
|
FinishJob( job, "THIS CODE HAS BEEN REMOVED, need to use new Face* tables, and rethink", "Failed" )
|
|
return
|
|
|
|
|
|
def generateUnknownEncodings(im):
|
|
unknown_image = numpy.array(im)
|
|
face_locations = face_recognition.face_locations(unknown_image)
|
|
if not face_locations:
|
|
return None
|
|
unknown_encodings = face_recognition.face_encodings(unknown_image, known_face_locations=face_locations)
|
|
return unknown_encodings
|
|
|
|
|
|
def compareAI(known_encoding, unknown_encoding):
|
|
results = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.55)
|
|
return results
|
|
|
|
|
|
def ProcessFilesInDir(job, e, file_func, count_dirs):
|
|
if DEBUG==1:
|
|
print( f"DEBUG: ProcessFilesInDir: {e.FullPathOnFS()}")
|
|
if e.type.name != 'Directory':
|
|
file_func(job, e)
|
|
else:
|
|
d=session.query(Dir).filter(Dir.eid==e.id).first()
|
|
if count_dirs:
|
|
job.current_file_num+=1
|
|
files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==d.eid).all()
|
|
for sub in files:
|
|
ProcessFilesInDir(job, sub, file_func, count_dirs)
|
|
return
|
|
|
|
def JobGetFileDetails(job):
|
|
JobProgressState( job, "In Progress" )
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path_prefix=[jex.value for jex in job.extra if jex.name == "path_prefix"][0]
|
|
if DEBUG==1:
|
|
print("DEBUG: JobGetFileDetails for path={path_prefix}" )
|
|
p=session.query(Path).filter(Path.path_prefix==path_prefix).first()
|
|
job.current_file_num = 0
|
|
job.num_files = p.num_files
|
|
session.commit()
|
|
RunFuncOnFilesInPath( job, path_prefix, GenHashAndThumb, True )
|
|
FinishJob(job, "File Details job finished")
|
|
session.commit()
|
|
return
|
|
|
|
def isVideo(file):
|
|
try:
|
|
fileInfo = MediaInfo.parse(file)
|
|
for track in fileInfo.tracks:
|
|
if track.track_type == "Video":
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
# Returns an md5 hash of the fnames' contents
|
|
def md5(job, fname):
|
|
hash_md5 = hashlib.md5()
|
|
with open(fname, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
hash = hash_md5.hexdigest()
|
|
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
|
|
return hash
|
|
|
|
def isImage(file):
|
|
try:
|
|
Image.open(file)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def GenThumb(file):
|
|
try:
|
|
im_orig = Image.open(file)
|
|
im = ImageOps.exif_transpose(im_orig)
|
|
bands = im.getbands()
|
|
if 'A' in bands:
|
|
im = im.convert('RGB')
|
|
im.thumbnail((THUMBSIZE,THUMBSIZE))
|
|
img_bytearray = io.BytesIO()
|
|
im.save(img_bytearray, format='JPEG')
|
|
img_bytearray = img_bytearray.getvalue()
|
|
thumbnail = base64.b64encode(img_bytearray)
|
|
thumbnail = str(thumbnail)[2:-1]
|
|
return thumbnail
|
|
except Exception as e:
|
|
AddLogForJob(job, f"WARNING: No EXIF TAF found for: {file} - error={e}")
|
|
return None
|
|
|
|
def GenImageThumbnail(job, file):
|
|
ProcessFileForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
|
|
return GenThumb(file)
|
|
|
|
def GenVideoThumbnail(job, file):
|
|
ProcessFileForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file )
|
|
try:
|
|
vcap = cv2.VideoCapture(file)
|
|
res, frame = vcap.read()
|
|
# if the mean pixel value is > 15, we have something worth making a sshot of (no black frame at start being the sshot)
|
|
while frame.mean() < 15 and res:
|
|
res, frame = vcap.read()
|
|
w = vcap.get(cv2.cv2.CAP_PROP_FRAME_WIDTH)
|
|
h = vcap.get(cv2.cv2.CAP_PROP_FRAME_HEIGHT)
|
|
if w > h:
|
|
factor = w / THUMBSIZE
|
|
else:
|
|
factor = h / THUMBSIZE
|
|
new_h = int(h / factor)
|
|
new_w = int(w / factor)
|
|
frame = cv2.resize(frame, (new_w, new_h), 0, 0, cv2.INTER_LINEAR)
|
|
|
|
res, thumb_buf = cv2.imencode('.jpeg', frame)
|
|
bt = thumb_buf.tobytes()
|
|
thumbnail = base64.b64encode(bt)
|
|
thumbnail = str(thumbnail)[2:-1]
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to Generate thumbnail for video file: {file} - error={e}" )
|
|
return None
|
|
return thumbnail
|
|
|
|
# utility function to clear any other future Duplicate messages, called if we
|
|
# either create a "new" CheckDups (often del/restore related), OR because we
|
|
# are actualyl handling the dups now from a front-end click through to
|
|
# /removedups, but some other job has since created another dup message...
|
|
def ClearOtherDupMessagesAndJobs():
|
|
msgs=session.query(PA_JobManager_FE_Message).join(Job).filter(Job.name=='checkdups')
|
|
for msg in msgs:
|
|
session.query(PA_JobManager_FE_Message).filter(PA_JobManager_FE_Message.id==msg.id).delete()
|
|
cd_jobs=session.query(Job).filter(Job.name=='checkdups').filter(Job.pa_job_state=='New').all()
|
|
for j in cd_jobs:
|
|
FinishJob(j, "New CheckForDups job/removal supercedes this job, withdrawing it", "Withdrawn")
|
|
session.commit()
|
|
|
|
def CheckForDups(job):
|
|
AddLogForJob( job, f"Check for duplicates" )
|
|
ClearOtherDupMessagesAndJobs()
|
|
|
|
res = session.execute( "select count(e1.id) from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb" )
|
|
for row in res:
|
|
if row.count > 0:
|
|
AddLogForJob(job, f"Found duplicates, Creating Status message in front-end for attention")
|
|
MessageToFE( job.id, "danger", f'Found duplicate(s), click <a href="javascript:document.body.innerHTML+=\'<form id=_fm method=POST action=/fix_dups></form>\'; document.getElementById(\'_fm\').submit();">here</a> to finalise import by removing duplicates' )
|
|
else:
|
|
FinishJob(job, f"No duplicates found")
|
|
FinishJob(job, f"Finished looking for duplicates")
|
|
return
|
|
|
|
def RemoveDups(job):
|
|
AddLogForJob(job, f"INFO: Starting Remove Duplicates job...")
|
|
# as checkdups covers all dups, delete all future dups messages, and Withdraw future checkdups jobs
|
|
ClearOtherDupMessagesAndJobs()
|
|
|
|
dup_cnt=0
|
|
for jex in job.extra:
|
|
if 'kfid-' in jex.name:
|
|
_, which = jex.name.split('-')
|
|
hash=[jex.value for jex in job.extra if jex.name == f"kfhash-{which}"][0]
|
|
AddLogForJob(job, f"deleting duplicate files with hash: {hash} but keeping file with DB id={jex.value}" )
|
|
files=session.query(Entry).join(File).filter(File.hash==hash).all()
|
|
keeping=jex.value
|
|
found=None
|
|
del_me_lst = []
|
|
for f in files:
|
|
if os.path.isfile( f.FullPathOnFS() ) == False:
|
|
AddLogForJob( job, f"ERROR: (per file del) file (DB id: {f.id} - {f.FullPathOnFS()}) does not exist? ignorning file")
|
|
elif f.id == int(keeping):
|
|
found = f
|
|
else:
|
|
del_me_lst.append(f)
|
|
if found == None:
|
|
AddLogForJob( job, f"ERROR: (per file dup) Cannot find file with hash={hash} to process - skipping it)" )
|
|
else:
|
|
AddLogForJob(job, f"Keep duplicate file: {found.FullPathOnFS()}" )
|
|
for del_me in del_me_lst:
|
|
AddLogForJob(job, f"Remove duplicate (per file dup) file: {del_me.FullPathOnFS()}" )
|
|
MoveFileToRecycleBin(job,del_me)
|
|
|
|
if 'kdid-' in jex.name:
|
|
_, which = jex.name.split('-')
|
|
hashes=[jex.value for jex in job.extra if jex.name == f"kdhash-{which}"][0]
|
|
keeping=jex.value
|
|
tmp=session.query(Dir).filter(Dir.eid==keeping).first()
|
|
AddLogForJob(job, f"Keeping files in {tmp.PathOnFS()}" )
|
|
for hash in hashes.split(","):
|
|
files=session.query(Entry).join(File).filter(File.hash==hash).all()
|
|
found=None
|
|
del_me=None
|
|
for f in files:
|
|
if os.path.isfile(f.FullPathOnFS()) == False:
|
|
AddLogForJob( job, f"ERROR: (per path del) file (DB id: {f.id} - {f.FullPathOnFS()}) does not exist? ignorning file")
|
|
if f.in_dir.eid == int(keeping):
|
|
found=f
|
|
else:
|
|
del_me=f
|
|
|
|
if found == None:
|
|
AddLogForJob( job, f"ERROR: (per path dup - dir id={keeping}) Cannot find file with hash={hash} to process - skipping it)" )
|
|
else:
|
|
AddLogForJob(job, f"Keep duplicate file: {found.FullPathOnFS()}" )
|
|
AddLogForJob(job, f"Remove duplicate (per path dup) file: {del_me.FullPathOnFS()}" )
|
|
MoveFileToRecycleBin(job,del_me)
|
|
dup_cnt += 1
|
|
|
|
FinishJob(job, f"Finished removing {dup_cnt} duplicate files" )
|
|
|
|
# Need to put another checkdups job in now to force / validate we have no dups
|
|
now=datetime.now(pytz.utc)
|
|
next_job=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
|
|
session.add(next_job)
|
|
session.commit()
|
|
AddLogForJob(job, "adding <a href='/job/{}'>job id={} {}</a> to confirm there are no more duplicates".format( next_job.id, next_job.id, next_job.name ) )
|
|
return
|
|
|
|
def JobMoveFiles(job):
|
|
AddLogForJob(job, f"INFO: Starting Move Files job...")
|
|
prefix=[jex.value for jex in job.extra if jex.name == "prefix"][0]
|
|
suffix=[jex.value for jex in job.extra if jex.name == "suffix"][0]
|
|
storage_rp=[jex.value for jex in job.extra if jex.name == "storage_rp"][0]
|
|
dst_storage_path = session.query(Path).filter(Path.path_prefix=='static/Storage/'+ storage_rp).first()
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
move_me=session.query(Entry).join(File).filter(Entry.id==jex.value).first()
|
|
MoveFileToNewFolderInStorage(job, move_me, dst_storage_path, f"{prefix}{suffix}" )
|
|
now=datetime.now(pytz.utc)
|
|
next_job=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
|
|
session.add(next_job)
|
|
MessageToFE( job.id, "success", "Completed (move of selected files)" )
|
|
FinishJob(job, f"Finished move selected file(s)")
|
|
return
|
|
|
|
def JobDeleteFiles(job):
|
|
AddLogForJob(job, f"INFO: Starting Delete Files job...")
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
del_me=session.query(Entry).join(File).filter(Entry.id==jex.value).first()
|
|
MoveFileToRecycleBin(job,del_me)
|
|
ynw=datetime.now(pytz.utc)
|
|
next_job=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
|
|
session.add(next_job)
|
|
MessageToFE( job.id, "success", "Completed (delete of selected files)" )
|
|
FinishJob(job, f"Finished deleting selected file(s)")
|
|
return
|
|
|
|
def JobRestoreFiles(job):
|
|
AddLogForJob(job, f"INFO: Starting Restore Files job...")
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
restore_me=session.query(Entry).join(File).filter(Entry.id==jex.value).first()
|
|
RestoreFile(job,restore_me)
|
|
now=datetime.now(pytz.utc)
|
|
next_job=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
|
|
session.add(next_job)
|
|
MessageToFE( job.id, "success", "Completed (restore of selected files)" )
|
|
FinishJob(job, f"Finished restoring selected file(s)")
|
|
return
|
|
|
|
def InitialValidationChecks():
|
|
now=datetime.now(pytz.utc)
|
|
job=Job(start_time=now, last_update=now, name="init", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
|
|
session.add(job)
|
|
settings = session.query(Settings).first()
|
|
rbp_exists=0
|
|
paths = settings.recycle_bin_path.split("#")
|
|
AddLogForJob(job, f"INFO: Starting Initial Validation checks...")
|
|
for path in paths:
|
|
if os.path.exists(path):
|
|
rbp_exists=1
|
|
ptype = session.query(PathType).filter(PathType.name=='Bin').first().id
|
|
symlink=CreateSymlink(job,ptype,path)
|
|
path, dirs, files = next(os.walk(path))
|
|
if len(dirs) + len(files) > 0:
|
|
AddLogForJob(job, "INFO: the bin path contains content, cannot process to know where original deletes were form - skipping content!" )
|
|
AddLogForJob(job, "TODO: could be smart about what is known in the DB vs on the FS, and change below to an ERROR if it is one")
|
|
AddLogForJob(job, "WARNING: IF the files in the bin are in the DB (succeeded from GUI deletes) then this is okay, otherwise you should delete contents form the recycle bin and restart the job manager)" )
|
|
break
|
|
if not rbp_exists:
|
|
AddLogForJob(job, "ERROR: The bin path in settings does not exist - Please fix now");
|
|
else:
|
|
bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
|
|
if not bin_path:
|
|
ProcessRecycleBinDir(job)
|
|
sp_exists=0
|
|
paths = settings.storage_path.split("#")
|
|
for path in paths:
|
|
if os.path.exists(path):
|
|
sp_exists=1
|
|
ptype = session.query(PathType).filter(PathType.name=='Storage').first().id
|
|
symlink=CreateSymlink(job,ptype,path)
|
|
if not sp_exists:
|
|
AddLogForJob(job, "ERROR: None of the storage paths in the settings exist - Please fix now");
|
|
ip_exists=0
|
|
paths = settings.import_path.split("#")
|
|
for path in paths:
|
|
if os.path.exists(path):
|
|
ip_exists=1
|
|
ptype = session.query(PathType).filter(PathType.name=='Import').first().id
|
|
symlink=CreateSymlink(job,ptype,path)
|
|
if not ip_exists:
|
|
AddLogForJob(job, "ERROR: None of the import paths in the settings exist - Please fix now");
|
|
if not rbp_exists or not sp_exists or not ip_exists:
|
|
FinishJob(job,"ERROR: Job manager EXITing until above errors are fixed by paths being created or settings being updated to valid paths", "Failed" )
|
|
exit(-1)
|
|
|
|
FinishJob(job,"Finished Initial Validation Checks")
|
|
return
|
|
|
|
def AddFaceToFile( face_data, file_eid ):
|
|
face = Face( face=face_data.tobytes() )
|
|
session.add(face)
|
|
session.commit()
|
|
ffl = FaceFileLink( face_id=face.id, file_eid=file_eid )
|
|
session.add(ffl)
|
|
session.commit()
|
|
return face
|
|
|
|
def DelFacesForFile( eid ):
|
|
session.execute( f"delete from face where id in (select face_id from face_file_link where file_eid = {eid})" )
|
|
session.commit()
|
|
return
|
|
|
|
def MatchRefimgToFace( refimg_id, face_id ):
|
|
rfl = FaceRefimgLink( refimg_id = refimg_id, face_id = face_id )
|
|
session.add(rfl)
|
|
session.commit()
|
|
return
|
|
|
|
def UnmatchedFacesForFile( eid ):
|
|
rows = session.execute( f"select f.* from face f left join face_refimg_link frl on f.id = frl.face_id join face_file_link ffl on f.id = ffl.face_id where ffl.file_eid = {eid} and frl.refimg_id is null" )
|
|
return rows
|
|
|
|
def ScanFileForPerson( job, e, person_id, force=False ):
|
|
file_h = session.query(File).get( e.id )
|
|
# if we are forcing this, delete any old faces (this will also delete linked tables), and reset faces_created_on to None
|
|
if force:
|
|
AddLogForJob( job, f'INFO: force is true, so deleting old face information for {e.name}' )
|
|
DelFacesForFile( e.id )
|
|
file_h.faces_created_on = 0
|
|
|
|
# optimise: dont rescan if we already have faces (we are just going to try
|
|
# to match (maybe?) a refimg
|
|
if file_h.faces_created_on == 0:
|
|
if DEBUG:
|
|
AddLogForJob( job, f"DEBUG: {e.name} is missing unknown faces, generating them" )
|
|
im = face_recognition.load_image_file(e.FullPathOnFS())
|
|
face_locations = face_recognition.face_locations(im)
|
|
unknown_encodings = face_recognition.face_encodings(im, known_face_locations=face_locations)
|
|
for face in unknown_encodings:
|
|
AddFaceToFile( face, e.id )
|
|
file_h.faces_created_on = time.time()
|
|
session.commit()
|
|
|
|
## now look for person
|
|
refimgs = session.query(Refimg).join(PersonRefimgLink).filter(PersonRefimgLink.person_id==person_id).all()
|
|
uf = UnmatchedFacesForFile( e.id )
|
|
if DEBUG and not uf:
|
|
AddLogForJob( job, "DEBUG: {e.name} all faces already matched - finished" )
|
|
|
|
for face in uf:
|
|
for r in refimgs:
|
|
unknown_face_data = numpy.frombuffer(face.face, dtype=numpy.float64)
|
|
refimg_face_data = numpy.frombuffer(r.face, dtype=numpy.float64)
|
|
match = compareAI(refimg_face_data, unknown_face_data)
|
|
if match[0]:
|
|
AddLogForJob(job, f'WE MATCHED: {r.fname} with file: {e.name} ')
|
|
MatchRefimgToFace( r.id, face.id )
|
|
# no need to keep looking for this face, we found it, go to next unknown face
|
|
break
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
|
|
|
|
InitialValidationChecks()
|
|
|
|
HandleJobs()
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
|
s.listen()
|
|
while True:
|
|
conn, addr = s.accept()
|
|
HandleJobs()
|