Files
photoassistant/pa_job_manager.py

938 lines
38 KiB
Python

###
#
# This file controls the 'external' job control manager, that (periodically #
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
#
# It then stores the progress/status, etc. in job and joblog tables as needed
# via wrapper functions.
#
# The whole pa_job_manager is multi-threaded, and uses the database tables for
# state management and communication back to the pa web site
#
###
### SQLALCHEMY IMPORTS ###
# pylint: disable=no-member
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
### SQLALCHEMY IMPORTS ###
### LOCAL FILE IMPORTS ###
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName
from datetime import datetime, timedelta, date
import pytz
import time
import os
import glob
from PIL import Image, ImageOps
from pymediainfo import MediaInfo
import hashlib
import exifread
import base64
import numpy
from cv2 import cv2
import socket
import threading
import io
import face_recognition
import re
import sys
DEBUG=1
sys.setrecursionlimit(50000)
# an Manager, which the Session will use for connection resources
some_engine = create_engine(DB_URL)
# create a configured "Session" class
#Session = sessionmaker(bind=some_engine)
# create a Session
session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)
session = Session()
Base = declarative_base()
################################################################################
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
# This has to match one-for-one the DB table
################################################################################
class EntryDirLink(Base):
__tablename__ = "entry_dir_link"
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
class Dir(Base):
__tablename__ = "dir"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
path_prefix = Column(String, unique=True, nullable=False )
num_files = Column(Integer)
last_import_date = Column(Float)
files = relationship("Entry", secondary="entry_dir_link")
def __repr__(self):
return f"<eid: {self.eid}, path_prefix: {self.path_prefix}, num_files: {self.num_files}, last_import_date: {self.last_import_date}, files: {self.files}>"
class Entry(Base):
__tablename__ = "entry"
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
name = Column(String, unique=False, nullable=False )
type_id = Column(Integer, ForeignKey("file_type.id"))
exists_on_fs=Column(Boolean)
type=relationship("FileType")
dir_details = relationship( "Dir")
file_details = relationship( "File" )
in_dir = relationship ("Dir", secondary="entry_dir_link" )
def __repr__(self):
return f"<id: {self.id}, name: {self.name}, type={self.type}, exists_on_fs={self.exists_on_fs}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}>"
class FileRefimgLink(Base):
__tablename__ = "file_refimg_link"
file_id = Column(Integer, ForeignKey('file.eid'), unique=True, nullable=False, primary_key=True)
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
when_processed = Column(Float)
matched = Column(Boolean)
def __repr__(self):
return f"<file_id: {self.file_id}, refimg_id: {self.refimg_id} when_processed={self.when_processed}, matched={self.matched}"
class File(Base):
__tablename__ = "file"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
size_mb = Column(Integer, unique=False, nullable=False)
hash = Column(Integer, unique=True, nullable=True)
thumbnail = Column(String, unique=False, nullable=True)
year = Column(Integer)
month = Column(Integer)
day = Column(Integer)
woy = Column(Integer)
last_hash_date = Column(Float)
faces = Column( LargeBinary )
faces_created_on = Column(Float)
def __repr__(self):
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, last_hash_date: {self.last_hash_date}>"
class FileType(Base):
__tablename__ = "file_type"
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
def __repr__(self):
return f"<id: {self.id}, name={self.name}>"
class Settings(Base):
__tablename__ = "settings"
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
import_path = Column(String)
storage_path = Column(String)
def __repr__(self):
return f"<id: {self.id}, import_path: {self.import_path}>"
class PersonRefimgLink(Base):
__tablename__ = "person_refimg_link"
person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True)
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
def __repr__(self):
return f"<person_id: {self.person_id}, refimg_id: {self.refimg_id}>"
class Person(Base):
__tablename__ = "person"
id = Column(Integer, Sequence('person_id_seq'), primary_key=True )
tag = Column(String(48), unique=False, nullable=False)
surname = Column(String(48), unique=False, nullable=False)
firstname = Column(String(48), unique=False, nullable=False)
refimg = relationship('Refimg', secondary=PersonRefimgLink.__table__)
def __repr__(self):
return f"<tag: {self.tag}, firstname: {self.firstname}, surname: {self.surname}, refimg: {self.refimg}>"
class Refimg(Base):
__tablename__ = "refimg"
id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True )
fname = Column(String(256), unique=True, nullable=False)
encodings = Column(LargeBinary)
created_on = Column(Float)
def __repr__(self):
return f"<id: {self.id}, fname: {self.fname}, created_on: {self.created_on}, encodings: {self.encodings}>"
################################################################################
# classes for the job manager:
# Job (and Joblog, JobExtra) for each Job, and
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
################################################################################
class Joblog(Base):
__tablename__ = "joblog"
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
log_date = Column(DateTime(timezone=True))
log = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
class JobExtra(Base):
__tablename__ = "jobextra"
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
name = Column(String)
value = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
class Job(Base):
__tablename__ = "job"
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
start_time = Column(DateTime(timezone=True))
last_update = Column(DateTime(timezone=True))
name = Column(String)
state = Column(String)
num_files = Column(Integer)
current_file_num = Column(Integer)
current_file = Column(String)
wait_for = Column(Integer)
pa_job_state = Column(String)
logs = relationship( "Joblog")
extra = relationship( "JobExtra")
def __repr__(self):
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
class PA_JobManager_FE_Message(Base):
__tablename__ = "pa_job_manager_fe_message"
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
alert = Column(String)
message = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
##############################################################################
# Util (non Class) functions
##############################################################################
def MessageToFE( job_id, alert, message ):
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
session.add(msg)
session.commit()
return msg.id
def ProcessStorageDirs(parent_job):
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
paths = settings.storage_path.split("#")
JobsForPaths( parent_job, paths )
return
def ProcessImportDirs(parent_job):
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
paths = settings.import_path.split("#")
JobsForPaths( parent_job, paths )
return
def JobsForPaths( parent_job, paths ):
now=datetime.now(pytz.utc)
# make new set of Jobs per path... HandleJobs will make them run later
for path in paths:
d=session.query(Dir).filter(Dir.path_prefix==SymlinkName(path,path+'/')).first()
cfn=0
if d:
cfn=d.num_files
jex=JobExtra( name="path", value=path )
job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=cfn )
job.extra.append(jex)
session.add(job)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a>".format( job.id, job.id, job.name ) )
# force commit to make job.id be valid in use of wait_for later
session.commit()
jex2=JobExtra( name="path", value=path )
job2=Job(start_time=now, last_update=now, name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 )
job2.extra.append(jex2)
session.add(job2)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) )
jex3=JobExtra( name="path", value=path )
job3=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
job3.extra.append(jex3)
session.add(job3)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
"""
jex4=JobExtra( name="path", value=path )
job4=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
job4.extra.append(jex4)
session.add(job4)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
"""
HandleJobs()
return
def AddLogForJob(job, message, current_file=''):
now=datetime.now(pytz.utc)
log=Joblog( job_id=job.id, log=message, log_date=now )
job.last_update=now
if current_file != '':
job.current_file=os.path.basename(current_file)
# this may not be set on an import job
if job.num_files:
job.current_file_num=job.current_file_num+1
session.add(log)
return
def RunJob(job):
# session = Session()
job.start_time=datetime.now(pytz.utc)
if job.name =="scannow":
JobScanNow(job)
elif job.name =="forcescan":
JobForceScan(job)
elif job.name =="scan_sp":
JobScanStorageDir(job)
elif job.name =="importdir":
JobImportDir(job)
elif job.name =="getfiledetails":
JobGetFileDetails(job)
elif job.name == "checkdups":
CheckForDups(job)
elif job.name == "rmdups":
RemoveDups(job)
elif job.name == "processai":
JobProcessAI(job)
else:
print("ERROR: Requested to process unknown job type: {}".format(job.name))
# okay, we finished a job, so check for any jobs that are dependant on this and run them...
# session.close()
if job.pa_job_state != "Completed":
FinishJob(job, "PA Job Manager - This is a catchall to close of a Job, this should never be seen and implies a job did not actually complete?", "Failed" )
HandleJobs()
return
def CancelJob(job,id):
for j in session.query(Job).filter(Job.wait_for==id).all():
if DEBUG==1:
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(j.id, job.id) )
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
CancelJob(j, j.id)
return
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
job.state=state
job.pa_job_state=pa_job_state
job.last_update=datetime.now(pytz.utc)
AddLogForJob(job, last_log)
if job.state=="Failed":
CancelJob(job,job.id)
session.commit()
return
def HandleJobs():
print("INFO: PA job manager is scanning for new jobs to process")
for job in session.query(Job).all():
if job.pa_job_state == 'New':
if job.wait_for != None:
j2 = session.query(Job).get(job.wait_for)
if not j2:
print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
for j in session.query(Job).all():
print ("j={}".format(j.id))
continue
if j2.pa_job_state != 'Completed':
continue
# use this to remove threads for easier debugging, and errors will stacktrace to the console
if DEBUG==1:
print("*************************************")
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
RunJob(job)
else:
try:
RunJob(job)
# threading.Thread(target=RunJob, args=(job,)).start()
except Exception as e:
try:
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
except Exception as e2:
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
print("INFO: PA job manager is waiting jobs")
return
def JobProgressState( job, state ):
job.pa_job_state = "In Progress"
job.state=state
session.commit()
return
def JobScanNow(job):
JobProgressState( job, "In Progress" )
ProcessImportDirs(job)
FinishJob( job, "Completed (scan for new files)" )
MessageToFE( job.id, "success", "Completed (scan for new files)" )
session.commit()
return
def JobScanStorageDir(job):
JobProgressState( job, "In Progress" )
ProcessStorageDirs(job)
FinishJob( job, "Completed (scan for new files)" )
MessageToFE( job.id, "success", "Completed (scan for new files)" )
session.commit()
return
def JobForceScan(job):
JobProgressState( job, "In Progress" )
session.query(FileRefimgLink).delete()
session.query(EntryDirLink).delete()
session.query(Dir).delete()
session.query(File).delete()
session.query(Entry).delete()
session.commit()
ProcessImportDirs(job)
FinishJob(job, "Completed (forced remove and recreation of all file data)")
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
session.commit()
return
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
def CreateSymlink(job,path):
symlink='static/{}'.format(os.path.basename(path[0:-1]))
if not os.path.exists(symlink):
os.symlink(path, symlink)
return symlink
def AddDir(job, dirname, path_prefix, in_dir):
dir=session.query(Dir).filter(Dir.path_prefix==path_prefix).first()
if dir:
e=session.query(Entry).get(dir.eid)
e.exists_on_fs=True
return dir
dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0 )
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
e=Entry( name=dirname, type=dtype, exists_on_fs=True )
e.dir_details.append(dir)
# no in_dir occurs when we Add the actual Dir for the import_path (top of the tree)
if in_dir:
e.in_dir.append(in_dir)
if DEBUG==1:
print(f"AddDir: created d={dirname}, pp={path_prefix}")
AddLogForJob(job, f"DEBUG: Process new dir: {dirname}")
session.add(e)
return dir
def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ):
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first()
if e:
print( f"################################################ FILE EXISTS ALREADY: {fname} -- {in_dir.path_prefix} {e}" )
e.exists_on_fs=True
return e
ftype = session.query(FileType).filter(FileType.name==type_str).first()
e=Entry( name=fname, type=ftype, exists_on_fs=True )
f=File( size_mb=fsize, last_hash_date=0, faces_created_on=0, year=year, month=month, day=day, woy=woy )
e.file_details.append(f)
e.in_dir.append(in_dir)
AddLogForJob(job, "Found new file: {}".format(fname) )
session.add(e)
return e
# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that
# is still false, has been deleted
def ResetExistsOnFS(job, path):
reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).filter(Dir.path_prefix.ilike(path+'%')).all()
for reset_dir in reset_dirs:
reset_dir.exists_on_fs=False
session.add(reset_dir)
reset_files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==reset_dir.id).all()
for reset_file in reset_files:
reset_file.exists_on_fs=False
session.add(reset_file)
return
def HandleAnyFSDeletions(job):
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all()
rm_cnt=0
for rm in rms:
session.query(EntryDirLink).filter(EntryDirLink.entry_id==rm.id).delete()
session.query(File).filter(File.eid==rm.id).delete()
session.query(Entry).filter(Entry.id==rm.id).delete()
AddLogForJob( job, f"INFO: Removing {rm.name} from system as it is no longer on the file system")
rm_cnt+=1
rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==1).order_by(Entry.id.desc()).all()
for rmdir in rmdirs:
print(f"We have a directory ({rmdir.name}) to delete from DB as it no longer exists on fs")
session.query(EntryDirLink).filter(EntryDirLink.entry_id==rmdir.id).delete()
session.query(Dir).filter(Dir.eid==rmdir.id).delete()
session.query(Entry).filter(Entry.id==rmdir.id).delete()
AddLogForJob( job, f"INFO: Removing {rmdir.name} from system as it is no longer on the file system")
rm_cnt+=1
return rm_cnt
def GetDateFromFile(file, stat):
# try exif
try:
print(f"trying exif read of {file}")
f = open(file, 'rb')
tags = exifread.process_file(f)
date_str, _ = str(tags["EXIF DateTimeOriginal"]).split(" ")
print(date_str)
year, month, day = date_str.split(":")
year=int(year)
month=int(month)
day=int(day)
print(year)
check = datetime( year, month, day )
print( f"check={check}" )
except:
# try parsing filename
try:
m=re.search( r'(\d{4})(\d{2})(\d{2})', file)
year=int(m[1])
month=int(m[2])
day=int(m[3])
check2 = datetime( year, month, day )
print( f"check2={check2}" )
# give up and use file sys date
except:
year, month, day, _, _, _, _, _, _ = datetime.fromtimestamp(stat.st_ctime).timetuple()
c=date(year, month, day).isocalendar()
woy=c[1]
return year, month, day, woy
def JobImportDir(job):
JobProgressState( job, "In Progress" )
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / paths missing")
path=[jex.value for jex in job.extra if jex.name == "path"][0]
AddLogForJob(job, "Checking Directory: {}".format( path ) )
if DEBUG==1:
print("DEBUG: Checking Directory: {}".format( path ) )
if not os.path.exists( path ):
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
return
symlink=CreateSymlink(job,path)
ResetExistsOnFS(job, symlink)
walk=os.walk(path, topdown=True)
ftree=list(walk)
# go through data once to work out file_cnt so progress bar works from first import
overall_file_cnt=0
for root, subdirs, files in ftree:
overall_file_cnt+= len(subdirs) + len(files)
parent_dir=None
dir=AddDir(job, os.path.basename(symlink), symlink, parent_dir)
dir.num_files=overall_file_cnt
# session.add in case we already have imported this dir (as AddDir wont) & now we might have diff num of files to last time,
session.add(dir)
job.num_files=overall_file_cnt
AddLogForJob(job, f"Found {overall_file_cnt} file(s) to process")
session.commit()
# root == path of dir, files are in dir... subdirs are in dir
for root, subdirs, files in ftree:
# already create root above to work out num_files for whole os.walk
if root != path:
pp=SymlinkName( path, root )+'/'+os.path.basename(root)
dir=AddDir(job, os.path.basename(root), pp, parent_dir)
for basename in files:
# commit every 100 files to see progress being made but not hammer the database
if job.current_file_num % 100 == 0:
session.commit()
fname=dir.path_prefix+'/'+basename
stat = os.stat(fname)
if stat.st_ctime > dir.last_import_date:
AddLogForJob(job, f"Processing new/update file: {basename}", basename )
if DEBUG==1:
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
if isImage(fname):
type_str = 'Image'
elif isVideo(fname):
type_str = 'Video'
else:
type_str = 'Unknown'
fsize = round(stat.st_size/(1024*1024))
year, month, day, woy = GetDateFromFile(fname, stat)
e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy )
else:
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==basename,Dir.eid==dir.eid).first()
e.exists_on_fs=True
if DEBUG==1:
print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename )
job.current_file=basename
job.current_file_num+=1
dir.num_files=len(files)+len(subdirs)
dir.last_import_date = time.time()
parent_dir=dir
job.num_files=overall_file_cnt
job.current_file_num=overall_file_cnt
rm_cnt=HandleAnyFSDeletions(job)
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)")
return
def RunFuncOnFilesInPath( job, path, file_func ):
d=session.query(Dir).filter(Dir.path_prefix==path).first()
for e in d.files:
ProcessFilesInDir(job, e, file_func)
def JobProcessAI(job):
path=[jex.value for jex in job.extra if jex.name == "path"][0]
path = SymlinkName(path, '/')
d=session.query(Dir).filter(Dir.path_prefix==path).first()
job.num_files=d.num_files
people = session.query(Person).all()
for person in people:
generateKnownEncodings(person)
RunFuncOnFilesInPath( job, path, ProcessAI )
FinishJob(job, "Finished Processesing AI")
return
def GenHashAndThumb(job, e):
# commit every 100 files to see progress being made but not hammer the database
if job.current_file_num % 100 == 0:
session.commit()
stat = os.stat( e.in_dir[0].path_prefix + '/' + e.name )
if stat.st_ctime < e.file_details[0].last_hash_date:
print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
job.current_file_num+=1
return
e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name )
if DEBUG==1:
print( f"{e.name} - hash={e.file_details[0].hash}" )
if e.type.name == 'Image':
e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Video':
e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Unknown':
job.current_file_num+=1
e.file_details[0].last_hash_date = time.time()
return
def ProcessAI(job, e):
if e.type.name != 'Image':
job.current_file_num+=1
return
file = e.in_dir[0].path_prefix + '/' + e.name
stat = os.stat(file)
# find if file is newer than when we found faces before (fyi: first time faces_created_on == 0)
if stat.st_ctime > e.file_details[0].faces_created_on:
session.add(e)
im_orig = Image.open(file)
im = ImageOps.exif_transpose(im_orig)
faces = generateUnknownEncodings(im)
e.file_details[0].faces_created_on=time.time()
if faces:
flat_faces = numpy.array(faces)
e.file_details[0].faces = flat_faces.tobytes()
else:
e.file_details[0].faces = None
job.current_file_num+=1
return
else:
if not e.file_details[0].faces:
print("OPTIM: This image has no faces, skip it")
job.current_file_num+=1
return
recover=numpy.frombuffer(e.file_details[0].faces,dtype=numpy.float64)
real_recover=numpy.reshape(recover,(-1,128))
l=[]
for el in real_recover:
l.append(numpy.array(el))
faces = l
people = session.query(Person).all()
for unknown_encoding in faces:
for person in people:
lookForPersonInImage(job, person, unknown_encoding, e)
AddLogForJob(job, f"Finished processing {e.name}", e.name )
return
def lookForPersonInImage(job, person, unknown_encoding, e):
for refimg in person.refimg:
# lets see if we have tried this check before
frl=session.query(FileRefimgLink).filter(FileRefimgLink.file_id==e.id, FileRefimgLink.refimg_id==refimg.id).first()
if not frl:
frl = FileRefimgLink(refimg_id=refimg.id, file_id=e.file_details[0].eid)
else:
stat=os.stat(e.in_dir[0].path_prefix+'/'+ e.name)
# file & refimg are not newer then we dont need to check
if frl.matched and stat.st_ctime < frl.when_processed and refimg.created_on < frl.when_processed:
print(f"OPTIM: lookForPersonInImage: file {e.name} has a previous match for: {refimg.fname}, and the file & refimg haven't changed")
return
session.add(frl)
frl.matched=False
frl.when_processed=time.time()
deserialized_bytes = numpy.frombuffer(refimg.encodings, dtype=numpy.float64)
results = compareAI(deserialized_bytes, unknown_encoding)
if results[0]:
print(f'Found a match between: {person.tag} and {e.name}')
AddLogForJob(job, f'Found a match between: {person.tag} and {e.name}')
frl.matched=True
return
def generateUnknownEncodings(im):
unknown_image = numpy.array(im)
face_locations = face_recognition.face_locations(unknown_image)
if not face_locations:
return None
unknown_encodings = face_recognition.face_encodings(unknown_image, known_face_locations=face_locations)
return unknown_encodings
def generateKnownEncodings(person):
for refimg in person.refimg:
file = 'reference_images/'+refimg.fname
stat = os.stat(file)
if refimg.created_on and stat.st_ctime < refimg.created_on:
print("OPTIM: skipping re-creating encoding for refimg because file has not changed")
continue
img = face_recognition.load_image_file(file)
location = face_recognition.face_locations(img)
encodings = face_recognition.face_encodings(img, known_face_locations=location)
refimg.encodings = encodings[0].tobytes()
refimg.created_on = time.time()
session.add(refimg)
session.commit()
def compareAI(known_encoding, unknown_encoding):
results = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.55)
return results
def ProcessFilesInDir(job, e, file_func):
if DEBUG==1:
print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix))
if e.type.name != 'Directory':
file_func(job, e)
else:
dir=session.query(Dir).filter(Dir.eid==e.id).first()
job.current_file_num+=1
for sub in dir.files:
ProcessFilesInDir(job, sub, file_func)
def JobGetFileDetails(job):
JobProgressState( job, "In Progress" )
path=[jex.value for jex in job.extra if jex.name == "path"][0]
path='static'+'/'+os.path.basename(path[0:-1])
if DEBUG==1:
print("DEBUG: JobGetFileDetails for path={}".format( path ) )
dir=session.query(Dir).filter(Dir.path_prefix==path).first()
job.current_file_num = 0
job.num_files = dir.num_files
session.commit()
RunFuncOnFilesInPath( job, path, GenHashAndThumb )
FinishJob(job, "File Details job finished")
session.commit()
return
def isVideo(file):
try:
fileInfo = MediaInfo.parse(file)
for track in fileInfo.tracks:
if track.track_type == "Video":
return True
return False
except Exception:
return False
# Returns an md5 hash of the fnames' contents
def md5(job, fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
hash = hash_md5.hexdigest()
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
return hash
def isImage(file):
try:
Image.open(file)
return True
except:
return False
def GenImageThumbnail(job, file):
AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
try:
im_orig = Image.open(file)
im = ImageOps.exif_transpose(im_orig)
im.thumbnail((THUMBSIZE,THUMBSIZE))
img_bytearray = io.BytesIO()
im.save(img_bytearray, format='JPEG')
img_bytearray = img_bytearray.getvalue()
thumbnail = base64.b64encode(img_bytearray)
thumbnail = str(thumbnail)[2:-1]
except Exception as e:
print('WARNING: NO EXIF TAGS?!?!?!?')
AddLogForJob(job, f"WARNING: No EXIF TAF found for: {file} - error={e}")
return None
return thumbnail
def GenVideoThumbnail(job, file):
AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file )
try:
vcap = cv2.VideoCapture(file)
res, frame = vcap.read()
# if the mean pixel value is > 15, we have something worth making a sshot of (no black frame at start being the sshot)
while frame.mean() < 15 and res:
res, frame = vcap.read()
w = vcap.get(cv2.cv2.CAP_PROP_FRAME_WIDTH)
h = vcap.get(cv2.cv2.CAP_PROP_FRAME_HEIGHT)
if w > h:
factor = w / THUMBSIZE
else:
factor = h / THUMBSIZE
new_h = int(h / factor)
new_w = int(w / factor)
frame = cv2.resize(frame, (new_w, new_h), 0, 0, cv2.INTER_LINEAR)
res, thumb_buf = cv2.imencode('.jpeg', frame)
bt = thumb_buf.tobytes()
thumbnail = base64.b64encode(bt)
thumbnail = str(thumbnail)[2:-1]
except Exception as e:
AddLogForJob( job, f"ERROR: Failed to Generate thumbnail for video file: {file} - error={e}" )
return None
return thumbnail
def CheckForDups(job):
path=[jex.value for jex in job.extra if jex.name == "path"][0]
path='static'+'/'+os.path.basename(path[0:-1])
AddLogForJob( job, f"Check for duplicates in import path: {path}" )
res = session.execute( f"select count(e1.name) as count from entry e1, file f1, dir d1, entry_dir_link edl1, entry e2, file f2, dir d2, entry_dir_link edl2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and d1.path_prefix like '%{path}%' and f1.hash = f2.hash and e1.id != e2.id" )
for row in res:
if row.count > 0:
AddLogForJob(job, f"Found duplicates, Creating Status message in front-end for attention")
msg_id=MessageToFE( job.id, "danger", 'replaceme' )
session.query(PA_JobManager_FE_Message).filter(PA_JobManager_FE_Message.id==msg_id).update( { 'message' : f'Found duplicate(s), click&nbsp; <a href="javascript:document.body.innerHTML+=\'<form id=_fm method=POST action=/fix_dups><input type=hidden name=fe_msg_id value={msg_id}></form>\'; document.getElementById(\'_fm\').submit();">here</a>&nbsp;to finalise import by removing duplicates' } )
FinishJob(job, f"Finished Looking for Duplicates")
return
def RemoveDups(job):
# clear FE message we are deleting dups for this now...
fe_msg_id =[jex.value for jex in job.extra if jex.name == "fe_msg_id"][0]
session.query(PA_JobManager_FE_Message).filter(PA_JobManager_FE_Message.id==fe_msg_id).delete()
session.commit()
AddLogForJob(job, f"INFO: Starting Remove Duplicates job...")
dup_cnt=0
for jex in job.extra:
if 'kfid-' in jex.name:
_, which = jex.name.split('-')
hash=[jex.value for jex in job.extra if jex.name == f"kfhash-{which}"][0]
AddLogForJob(job, f"deleting duplicate files with hash: {hash} but keeping file with DB id={jex.value}" )
files=session.query(Entry).join(File).filter(File.hash==hash).all()
keeping=jex.value
found=None
del_me_lst = []
for f in files:
if os.path.isfile(f.in_dir[0].path_prefix+'/'+f.name) == False:
AddLogForJob( job, "ERROR: file (DB id: {f.eid} - {f.in_dir[0].path_prefix}/{f.name}) does not exist? ignorning file")
elif f.file_details[0].eid == int(keeping):
found = f
else:
del_me_lst.append(f)
if found == None:
AddLogForJob( job, f"ERROR: Cannot find file with hash={hash} to process - skipping it)" )
else:
AddLogForJob(job, f"Keep duplicate file: {found.in_dir[0].path_prefix}/{found.name}" )
for del_me in del_me_lst:
AddLogForJob(job, f"Remove duplicate file: {del_me.in_dir[0].path_prefix}/{del_me.name}" )
os.remove( del_me.in_dir[0].path_prefix+'/'+del_me.name )
dup_cnt += 1
if 'kdid-' in jex.name:
_, which = jex.name.split('-')
hashes=[jex.value for jex in job.extra if jex.name == f"kdhash-{which}"][0]
keeping=jex.value
tmp=session.query(Dir).filter(Dir.eid==keeping).first()
AddLogForJob(job, f"Keeping files in {tmp.path_prefix}" )
for hash in hashes[0:-1].split(","):
files=session.query(Entry).join(File).filter(File.hash==hash).all()
found=None
for f in files:
if os.path.isfile(f.in_dir[0].path_prefix+'/'+f.name) == False:
AddLogForJob( job, "ERROR: file (DB id: {f.eid} - {f.in_dir[0].path_prefix}/{f.name}) does not exist? ignorning file")
if f.in_dir[0].eid == int(keeping):
found=f
else:
del_me=f
if found == None:
AddLogForJob( job, f"ERROR: Cannot find file with hash={hash} to process - skipping it)" )
else:
AddLogForJob(job, f"Keep duplicate file: {found.in_dir[0].path_prefix}/{found.name}" )
AddLogForJob(job, f"Remove duplicate file: {del_me.in_dir[0].path_prefix}/{del_me.name}" )
os.remove( del_me.in_dir[0].path_prefix+'/'+del_me.name )
dup_cnt += 1
FinishJob(job, f"Finished removing {dup_cnt} duplicate files" )
return
if __name__ == "__main__":
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
# now=datetime.now(pytz.utc)
# job=Job(start_time=now, last_update=now, name="scannow", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=0 )
# session.add(job)
# session.commit()
HandleJobs()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
s.listen()
while True:
conn, addr = s.accept()
HandleJobs()