Files
photoassistant/pa_job_manager.py

756 lines
30 KiB
Python

###
#
# This file controls the 'external' job control manager, that (periodically #
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
#
# It then stores the progress/status, etc. in job and joblog tables as needed
# via wrapper functions.
#
# The whole pa_job_manager is multi-threaded, and uses the database tables for
# state management and communication back to the pa web site
#
###
### SQLALCHEMY IMPORTS ###
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
### SQLALCHEMY IMPORTS ###
### LOCAL FILE IMPORTS ###
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
from datetime import datetime, timedelta
import pytz
import time
import os
import glob
from PIL import Image, ImageOps
from pymediainfo import MediaInfo
import hashlib
#import exifread
import base64
import numpy
import cv2
import socket
import threading
import io
import face_recognition
DEBUG=1
# an Manager, which the Session will use for connection resources
some_engine = create_engine(DB_URL)
# create a configured "Session" class
#Session = sessionmaker(bind=some_engine)
# create a Session
session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)
session = Session()
Base = declarative_base()
################################################################################
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
# This has to match one-for-one the DB table
################################################################################
class EntryDirLink(Base):
__tablename__ = "entry_dir_link"
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
class Dir(Base):
__tablename__ = "dir"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
path_prefix = Column(String, unique=False, nullable=False )
num_files = Column(Integer)
last_import_date = Column(Float)
files = relationship("Entry", secondary="entry_dir_link")
def __repr__(self):
return f"<eid: {self.eid}, path_prefix: {self.path_prefix}, num_files: {self.num_files}, last_import_date: {self.last_import_date}, files: {self.files}>"
class Entry(Base):
__tablename__ = "entry"
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
type_id = Column(Integer, ForeignKey("file_type.id"))
exists_on_fs=Column(Boolean)
type=relationship("FileType")
dir_details = relationship( "Dir")
file_details = relationship( "File" )
in_dir = relationship ("Dir", secondary="entry_dir_link" )
def __repr__(self):
return f"<id: {self.id}, name: {self.name}, type={self.type}, exists_on_fs={self.exists_on_fs}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}>"
class FileRefimgLink(Base):
__tablename__ = "file_refimg_link"
file_id = Column(Integer, ForeignKey('file.eid'), unique=True, nullable=False, primary_key=True)
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
when_processed = Column(Float)
matched = Column(Boolean)
def __repr__(self):
return f"<file_id: {self.file_id}, refimg_id: {self.refimg_id} when_processed={self.when_processed}, matched={self.matched}"
class File(Base):
__tablename__ = "file"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
size_mb = Column(Integer, unique=False, nullable=False)
hash = Column(Integer, unique=True, nullable=True)
thumbnail = Column(String, unique=False, nullable=True)
last_hash_date = Column(Float)
faces = Column( LargeBinary )
faces_created_on = Column(Float)
def __repr__(self):
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, last_hash_date: {self.last_hash_date}>"
class FileType(Base):
__tablename__ = "file_type"
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
def __repr__(self):
return f"<id: {self.id}, name={self.name}>"
class Settings(Base):
__tablename__ = "settings"
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
import_path = Column(String)
def __repr__(self):
return f"<id: {self.id}, import_path: {self.import_path}>"
class PersonRefimgLink(Base):
__tablename__ = "person_refimg_link"
person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True)
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
def __repr__(self):
return f"<person_id: {self.person_id}, refimg_id: {self.refimg_id}>"
class Person(Base):
__tablename__ = "person"
id = Column(Integer, Sequence('person_id_seq'), primary_key=True )
tag = Column(String(48), unique=False, nullable=False)
surname = Column(String(48), unique=False, nullable=False)
firstname = Column(String(48), unique=False, nullable=False)
refimg = relationship('Refimg', secondary=PersonRefimgLink.__table__)
def __repr__(self):
return f"<tag: {self.tag}, firstname: {self.firstname}, surname: {self.surname}, refimg: {self.refimg}>"
class Refimg(Base):
__tablename__ = "refimg"
id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True )
fname = Column(String(256), unique=True, nullable=False)
encodings = Column(LargeBinary)
created_on = Column(Float)
def __repr__(self):
return f"<id: {self.id}, fname: {self.fname}, created_on: {self.created_on}, encodings: {self.encodings}>"
################################################################################
# classes for the job manager:
# Job (and Joblog, JobExtra) for each Job, and
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
################################################################################
class Joblog(Base):
__tablename__ = "joblog"
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
log_date = Column(DateTime(timezone=True))
log = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
class JobExtra(Base):
__tablename__ = "jobextra"
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
name = Column(String)
value = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
class Job(Base):
__tablename__ = "job"
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
start_time = Column(DateTime(timezone=True))
last_update = Column(DateTime(timezone=True))
name = Column(String)
state = Column(String)
num_files = Column(Integer)
current_file_num = Column(Integer)
current_file = Column(String)
wait_for = Column(Integer)
pa_job_state = Column(String)
logs = relationship( "Joblog")
extra = relationship( "JobExtra")
def __repr__(self):
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
class PA_JobManager_FE_Message(Base):
__tablename__ = "pa_job_manager_fe_message"
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id'), primary_key=True )
alert = Column(String)
message = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
##############################################################################
# Util (non Class) functions
##############################################################################
def MessageToFE( job_id, alert, message ):
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
session.add(msg)
session.commit()
def ProcessImportDirs(parent_job=None):
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
paths = settings.import_path.split("#")
now=datetime.now(pytz.utc)
for path in paths:
# make new Job; HandleJobs will make them run later
jex=JobExtra( name="path", value=path )
job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=0 )
job.extra.append(jex)
session.add(job)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a>".format( job.id, job.id, job.name ) )
# force commit to make job.id be valid in use of wait_for later
session.commit()
jex2=JobExtra( name="path", value=path )
job2=Job(start_time=now, last_update=now, name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 )
job2.extra.append(jex2)
session.add(job2)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) )
jex3=JobExtra( name="path", value=path )
job3=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
job3.extra.append(jex3)
session.add(job3)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
HandleJobs()
return
def AddLogForJob(job, message, current_file=''):
now=datetime.now(pytz.utc)
log=Joblog( job_id=job.id, log=message, log_date=now )
job.last_update=now
if current_file != '':
job.current_file=os.path.basename(current_file)
# this may not be set on an import job
if job.num_files:
job.current_file_num=job.current_file_num+1
session.add(log)
session.commit()
return
def RunJob(job):
# session = Session()
job.start_time=datetime.now(pytz.utc)
if job.name =="scannow":
JobScanNow(job)
elif job.name =="forcescan":
JobForceScan(job)
elif job.name =="importdir":
JobImportDir(job)
elif job.name =="getfiledetails":
JobGetFileDetails(job)
elif job.name == "processai":
JobProcessAI(job)
else:
print("ERROR: Requested to process unknown job type: {}".format(job.name))
# okay, we finished a job, so check for any jobs that are dependant on this and run them...
# session.close()
if job.pa_job_state != "Completed":
FinishJob(job, "PA Job Manager - This is a catchall to close of a Job, this sould never be seen and implies a job did not complete formally?", "Failed" )
HandleJobs()
return
def CancelJob(job,id):
for j in session.query(Job).filter(Job.wait_for==id).all():
if DEBUG==1:
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(j.id, job.id) )
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
CancelJob(j, j.id)
return
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
job.state=state
job.pa_job_state=pa_job_state
job.last_update=datetime.now(pytz.utc)
AddLogForJob(job, last_log)
if job.state=="Failed":
CancelJob(job,job.id)
return
def HandleJobs():
print("INFO: PA job manager is scanning for new jobs to process")
for job in session.query(Job).all():
if job.pa_job_state == 'New':
if job.wait_for != None:
j2 = session.query(Job).get(job.wait_for)
if not j2:
print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
for j in session.query(Job).all():
print ("j={}".format(j.id))
continue
if j2.pa_job_state != 'Completed':
continue
# use this to remove threads for easier debugging, and errors will stacktrace to the console
if DEBUG==1:
print("*************************************")
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
RunJob(job)
else:
try:
RunJob(job)
# threading.Thread(target=RunJob, args=(job,)).start()
except Exception as e:
try:
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
except Exception as e2:
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
print("INFO: PA job manager is waiting jobs")
return
def JobProgressState( job, state ):
job.pa_job_state = "In Progress"
job.state=state
session.commit()
return
def JobScanNow(job):
JobProgressState( job, "In Progress" )
ProcessImportDirs(job)
FinishJob( job, "Completed (scan for new files)" )
MessageToFE( job.id, "success", "Completed (scan for new files)" )
session.commit()
return
def JobForceScan(job):
JobProgressState( job, "In Progress" )
session.query(FileRefimgLink).delete()
session.query(EntryDirLink).delete()
session.query(Dir).delete()
session.query(File).delete()
session.query(Entry).delete()
session.commit()
ProcessImportDirs(job)
FinishJob(job, "Completed (forced remove and recreation of all file data)")
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
session.commit()
return
def SymlinkName(path, file):
sig_bit=file.replace(path, "")
last_dir=os.path.basename(path[0:-1])
if sig_bit[-1] == '/':
last_bit = os.path.dirname(sig_bit)[0:-1]
else:
last_bit = os.path.dirname(sig_bit)
symlink = 'static'+'/'+last_dir+'/'+last_bit
if symlink[-1] == '/':
symlink=symlink[0:-1]
return symlink
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
def CreateSymlink(job,path):
symlink='static/{}'.format(os.path.basename(path[0:-1]))
if not os.path.exists(symlink):
os.symlink(path, symlink)
return symlink
def AddDir(job, dirname, path_prefix, in_dir):
dir=session.query(Dir).filter(Dir.path_prefix==path_prefix).first()
if dir:
e=session.query(Entry).get(dir.eid)
e.exists_on_fs=True
return dir
dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0 )
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
e=Entry( name=dirname, type=dtype, exists_on_fs=True )
e.dir_details.append(dir)
# no in_dir occurs when we Add the actual Dir for the import_path (top of the tree)
if in_dir:
e.in_dir.append(in_dir)
if DEBUG==1:
print(f"AddDir: created d={dirname}, pp={path_prefix}")
AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) )
session.add(e)
return dir
def AddFile(job, fname, type_str, fsize, in_dir ):
# see if this exists already
e=session.query(Entry).filter(Entry.name==fname).first()
if e:
print(f"in theory we reset file: {e.name} back to on fs")
e.exists_on_fs=True
return e
ftype = session.query(FileType).filter(FileType.name==type_str).first()
e=Entry( name=fname, type=ftype, exists_on_fs=True )
f=File( size_mb=fsize, last_hash_date=0, faces_created_on=0 )
e.file_details.append(f)
e.in_dir.append(in_dir)
AddLogForJob(job, "Found new file: {}".format(fname) )
session.add(e)
return e
# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that
# is still false, has been deleted
def ResetExistsOnFS(job, path):
reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).filter(Dir.path_prefix.ilike(path+'%')).all()
for reset_dir in reset_dirs:
reset_dir.exists_on_fs=False
session.add(reset_dir)
reset_files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==reset_dir.id).all()
for reset_file in reset_files:
reset_file.exists_on_fs=False
session.add(reset_file)
return
def HandleAnyFSDeletions(job):
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all()
rm_cnt=0
for rm in rms:
session.query(EntryDirLink).filter(EntryDirLink.entry_id==rm.id).delete()
session.query(File).filter(File.eid==rm.id).delete()
session.query(Entry).filter(Entry.id==rm.id).delete()
AddLogForJob( job, f"INFO: Removing {rm.name} from system as it is no longer on the file system")
rm_cnt+=1
rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==1).order_by(Entry.id.desc()).all()
for rmdir in rmdirs:
print(f"We have a directory ({rmdir.name}) to delete from DB as it no longer exists on fs");
session.query(EntryDirLink).filter(EntryDirLink.entry_id==rmdir.id).delete()
session.query(Dir).filter(Dir.eid==rmdir.id).delete()
session.query(Entry).filter(Entry.id==rmdir.id).delete()
AddLogForJob( job, f"INFO: Removing {rmdir.name} from system as it is no longer on the file system")
rm_cnt+=1
return rm_cnt
def JobImportDir(job):
JobProgressState( job, "In Progress" )
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
path=[jex.value for jex in job.extra if jex.name == "path"][0]
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
if DEBUG==1:
print("DEBUG: Checking Import Directory: {}".format( path ) )
if not os.path.exists( path ):
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
return
symlink=CreateSymlink(job,path)
ResetExistsOnFS(job, symlink)
overall_file_cnt=0
walk=os.walk(path, topdown=True)
# root == path of dir, files are in dir... subdirs are in dir
parent_dir=None
for root, subdirs, files in walk:
overall_file_cnt+= len(subdirs) + len(files)
if root == path:
pp = symlink
else:
pp=SymlinkName( path, root )+'/'+os.path.basename(root)
if root[-1]=="/":
root=root[0:-1]
dir=AddDir(job, os.path.basename(root), pp, parent_dir)
parent_dir=dir
for basename in files:
fname=dir.path_prefix+'/'+basename
stat = os.stat(fname)
if stat.st_ctime > dir.last_import_date:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - is new/updated".format( basename ), basename )
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
if isImage(fname):
type_str = 'Image'
elif isVideo(fname):
type_str = 'Video'
else:
type_str = 'Unknown'
fsize = round(stat.st_size/(1024*1024))
e=AddFile( job, basename, type_str, fsize, dir )
else:
e=session.query(Entry).filter(Entry.name==basename).first()
e.exists_on_fs=True
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - is unchanged".format( basename, basename ) )
print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename )
dir.num_files=len(files)+len(subdirs)
dir.last_import_date = time.time()
job.num_files=overall_file_cnt
job.current_file_num=overall_file_cnt
rm_cnt=HandleAnyFSDeletions(job)
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)")
import_dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
import_dir.num_files=overall_file_cnt
session.commit()
return
def RunFuncOnFilesInPath( job, path, file_func ):
d=session.query(Dir).filter(Dir.path_prefix==path).first()
for e in d.files:
ProcessFilesInDir(job, e, file_func)
def JobProcessAI(job):
path=[jex.value for jex in job.extra if jex.name == "path"][0]
path = SymlinkName(path, '/')
d=session.query(Dir).filter(Dir.path_prefix==path).first()
job.num_files=d.num_files
people = session.query(Person).all()
for person in people:
generateKnownEncodings(person)
RunFuncOnFilesInPath( job, path, ProcessAI )
FinishJob(job, "Finished Processesing AI")
return
def GenHashAndThumb(job, e):
stat = os.stat( e.in_dir[0].path_prefix + '/' + e.name )
if stat.st_ctime < e.file_details[0].last_hash_date:
print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
job.current_file_num+=1
return
e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name )
if e.type.name == 'Image':
e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Video':
e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Unknown':
job.current_file_num+=1
e.file_details[0].last_hash_date = time.time()
return
def ProcessAI(job, e):
if e.type.name != 'Image':
job.current_file_num+=1
print("DDP: ProcessAI: adding 1 to current_file_num as we have a non-image file")
return
file = e.in_dir[0].path_prefix + '/' + e.name
stat = os.stat(file)
# find if file is newer than when we found faces before (fyi: first time faces_created_on == 0)
if stat.st_ctime > e.file_details[0].faces_created_on:
session.add(e)
im_orig = Image.open(file)
im = ImageOps.exif_transpose(im_orig)
faces = generateUnknownEncodings(im)
e.file_details[0].faces_created_on=time.time()
if faces:
flat_faces = numpy.array(faces)
e.file_details[0].faces = flat_faces.tobytes()
else:
e.file_details[0].faces = None
job.current_file_num+=1
return
else:
if not e.file_details[0].faces:
print("OPTIM: This image has no faces, skip it")
job.current_file_num+=1
return
recover=numpy.frombuffer(e.file_details[0].faces,dtype=numpy.float64)
real_recover=numpy.reshape(recover,(-1,128))
l=[]
for el in real_recover:
l.append(numpy.array(el))
faces = l
people = session.query(Person).all()
for unknown_encoding in faces:
for person in people:
lookForPersonInImage(job, person, unknown_encoding, e)
AddLogForJob(job, f"Finished processing {e.name}", e.name )
return
def lookForPersonInImage(job, person, unknown_encoding, e):
for refimg in person.refimg:
# lets see if we have tried this check before
frl=session.query(FileRefimgLink).filter(FileRefimgLink.file_id==e.id, FileRefimgLink.refimg_id==refimg.id).first()
if not frl:
frl = FileRefimgLink(refimg_id=refimg.id, file_id=e.file_details[0].eid)
else:
stat=os.stat(e.in_dir[0].path_prefix+'/'+ e.name)
# file & refimg are not newer then we dont need to check
if frl.matched and stat.st_ctime < frl.when_processed and refimg.created_on < frl.when_processed:
print(f"OPTIM: lookForPersonInImage: file {e.name} has a previous match for: {refimg.fname}, and the file & refimg haven't changed")
return
session.add(frl)
frl.matched=False
frl.when_processed=time.time()
deserialized_bytes = numpy.frombuffer(refimg.encodings, dtype=numpy.float64)
results = compareAI(deserialized_bytes, unknown_encoding)
if results[0]:
print(f'Found a match between: {person.tag} and {e.name}')
AddLogForJob(job, f'Found a match between: {person.tag} and {e.name}')
frl.matched=True
return
def generateUnknownEncodings(im):
unknown_image = numpy.array(im)
face_locations = face_recognition.face_locations(unknown_image)
if not face_locations:
return None
unknown_encodings = face_recognition.face_encodings(unknown_image, known_face_locations=face_locations)
return unknown_encodings
def generateKnownEncodings(person):
for refimg in person.refimg:
file = 'reference_images/'+refimg.fname
stat = os.stat(file)
if refimg.created_on and stat.st_ctime < refimg.created_on:
print("OPTIM: skipping re-creating encoding for refimg because file has not changed")
continue
img = face_recognition.load_image_file(file)
location = face_recognition.face_locations(img)
encodings = face_recognition.face_encodings(img, known_face_locations=location)
refimg.encodings = encodings[0].tobytes()
refimg.created_on = time.time()
session.add(refimg)
session.commit()
def compareAI(known_encoding, unknown_encoding):
results = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.55)
return results
def ProcessFilesInDir(job, e, file_func):
if DEBUG==1:
print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix))
if e.type.name != 'Directory':
file_func(job, e)
else:
dir=session.query(Dir).filter(Dir.eid==e.id).first()
job.current_file_num+=1
for sub in dir.files:
ProcessFilesInDir(job, sub, file_func)
def JobGetFileDetails(job):
JobProgressState( job, "In Progress" )
path=[jex.value for jex in job.extra if jex.name == "path"][0]
path='static'+'/'+os.path.basename(path[0:-1])
if DEBUG==1:
print("DEBUG: JobGetFileDetails for path={}".format( path ) )
dir=session.query(Dir).filter(Dir.path_prefix==path).first()
job.current_file_num = 0
job.num_files = dir.num_files
session.commit()
RunFuncOnFilesInPath( job, path, GenHashAndThumb )
FinishJob(job, "File Details job finished")
session.commit()
return
def isVideo(file):
try:
fileInfo = MediaInfo.parse(file)
for track in fileInfo.tracks:
if track.track_type == "Video":
return True
return False
except Exception as e:
return False
# Returns an md5 hash of the fnames' contents
def md5(job, fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
hash = hash_md5.hexdigest()
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
return hash
def isImage(file):
try:
img = Image.open(file)
return True
except:
return False
def GenImageThumbnail(job, file):
thumbnail=None
AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
try:
im_orig = Image.open(file)
im = ImageOps.exif_transpose(im_orig)
im.thumbnail((256,256))
img_bytearray = io.BytesIO()
im.save(img_bytearray, format='JPEG')
img_bytearray = img_bytearray.getvalue()
thumbnail = base64.b64encode(img_bytearray)
thumbnail = str(thumbnail)[2:-1]
except:
print('WARNING: NO EXIF TAGS?!?!?!?')
AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file))
return thumbnail
def GenVideoThumbnail(job, file):
AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file )
vcap = cv2.VideoCapture(file)
res, im_ar = vcap.read()
while im_ar.mean() < 15 and res:
res, im_ar = vcap.read()
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
bt = thumb_buf.tobytes()
thumbnail = base64.b64encode(bt)
thumbnail = str(thumbnail)[2:-1]
return thumbnail
if __name__ == "__main__":
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
ProcessImportDirs()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
s.listen()
while True:
conn, addr = s.accept()
HandleJobs()