951 lines
38 KiB
Python
951 lines
38 KiB
Python
###
|
|
#
|
|
# This file controls the 'external' job control manager, that (periodically #
|
|
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
|
#
|
|
# It then stores the progress/status, etc. in job and joblog tables as needed
|
|
# via wrapper functions.
|
|
#
|
|
# The whole pa_job_manager is multi-threaded, and uses the database tables for
|
|
# state management and communication back to the pa web site
|
|
#
|
|
###
|
|
|
|
### SQLALCHEMY IMPORTS ###
|
|
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.orm import scoped_session
|
|
|
|
### SQLALCHEMY IMPORTS ###
|
|
|
|
### LOCAL FILE IMPORTS ###
|
|
|
|
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName
|
|
from datetime import datetime, timedelta, date
|
|
import pytz
|
|
import time
|
|
import os
|
|
import glob
|
|
from PIL import Image, ImageOps
|
|
from pymediainfo import MediaInfo
|
|
import hashlib
|
|
import exifread
|
|
import base64
|
|
import numpy
|
|
import cv2
|
|
import socket
|
|
import threading
|
|
import io
|
|
import face_recognition
|
|
import re
|
|
import sys
|
|
|
|
|
|
DEBUG=1
|
|
sys.setrecursionlimit(50000)
|
|
|
|
# an Manager, which the Session will use for connection resources
|
|
some_engine = create_engine(DB_URL)
|
|
|
|
# create a configured "Session" class
|
|
#Session = sessionmaker(bind=some_engine)
|
|
|
|
# create a Session
|
|
|
|
session_factory = sessionmaker(bind=some_engine)
|
|
Session = scoped_session(session_factory)
|
|
session = Session()
|
|
|
|
Base = declarative_base()
|
|
|
|
|
|
################################################################################
|
|
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
|
# This has to match one-for-one the DB table
|
|
################################################################################
|
|
class EntryDirLink(Base):
|
|
__tablename__ = "entry_dir_link"
|
|
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
class Dir(Base):
|
|
__tablename__ = "dir"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
path_prefix = Column(String, unique=True, nullable=False )
|
|
num_files = Column(Integer)
|
|
last_import_date = Column(Float)
|
|
files = relationship("Entry", secondary="entry_dir_link")
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, path_prefix: {self.path_prefix}, num_files: {self.num_files}, last_import_date: {self.last_import_date}, files: {self.files}>"
|
|
|
|
class Entry(Base):
|
|
__tablename__ = "entry"
|
|
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
|
name = Column(String, unique=False, nullable=False )
|
|
type_id = Column(Integer, ForeignKey("file_type.id"))
|
|
exists_on_fs=Column(Boolean)
|
|
type=relationship("FileType")
|
|
dir_details = relationship( "Dir")
|
|
file_details = relationship( "File" )
|
|
in_dir = relationship ("Dir", secondary="entry_dir_link" )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name: {self.name}, type={self.type}, exists_on_fs={self.exists_on_fs}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}>"
|
|
|
|
class FileRefimgLink(Base):
|
|
__tablename__ = "file_refimg_link"
|
|
file_id = Column(Integer, ForeignKey('file.eid'), unique=True, nullable=False, primary_key=True)
|
|
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
|
|
when_processed = Column(Float)
|
|
matched = Column(Boolean)
|
|
|
|
def __repr__(self):
|
|
return f"<file_id: {self.file_id}, refimg_id: {self.refimg_id} when_processed={self.when_processed}, matched={self.matched}"
|
|
|
|
class File(Base):
|
|
__tablename__ = "file"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
size_mb = Column(Integer, unique=False, nullable=False)
|
|
hash = Column(Integer, unique=True, nullable=True)
|
|
thumbnail = Column(String, unique=False, nullable=True)
|
|
year = Column(Integer)
|
|
month = Column(Integer)
|
|
day = Column(Integer)
|
|
woy = Column(Integer)
|
|
last_hash_date = Column(Float)
|
|
faces = Column( LargeBinary )
|
|
faces_created_on = Column(Float)
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, last_hash_date: {self.last_hash_date}>"
|
|
|
|
class FileType(Base):
|
|
__tablename__ = "file_type"
|
|
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}>"
|
|
|
|
class Settings(Base):
|
|
__tablename__ = "settings"
|
|
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
|
|
import_path = Column(String)
|
|
storage_path = Column(String)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, import_path: {self.import_path}>"
|
|
|
|
class PersonRefimgLink(Base):
|
|
__tablename__ = "person_refimg_link"
|
|
person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True)
|
|
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
|
|
|
|
def __repr__(self):
|
|
return f"<person_id: {self.person_id}, refimg_id: {self.refimg_id}>"
|
|
|
|
class Person(Base):
|
|
__tablename__ = "person"
|
|
id = Column(Integer, Sequence('person_id_seq'), primary_key=True )
|
|
tag = Column(String(48), unique=False, nullable=False)
|
|
surname = Column(String(48), unique=False, nullable=False)
|
|
firstname = Column(String(48), unique=False, nullable=False)
|
|
refimg = relationship('Refimg', secondary=PersonRefimgLink.__table__)
|
|
|
|
def __repr__(self):
|
|
return f"<tag: {self.tag}, firstname: {self.firstname}, surname: {self.surname}, refimg: {self.refimg}>"
|
|
|
|
class Refimg(Base):
|
|
__tablename__ = "refimg"
|
|
id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True )
|
|
fname = Column(String(256), unique=True, nullable=False)
|
|
encodings = Column(LargeBinary)
|
|
created_on = Column(Float)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, fname: {self.fname}, created_on: {self.created_on}, encodings: {self.encodings}>"
|
|
|
|
|
|
|
|
################################################################################
|
|
# classes for the job manager:
|
|
# Job (and Joblog, JobExtra) for each Job, and
|
|
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
|
|
################################################################################
|
|
class Joblog(Base):
|
|
__tablename__ = "joblog"
|
|
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
log_date = Column(DateTime(timezone=True))
|
|
log = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
|
|
|
|
class JobExtra(Base):
|
|
__tablename__ = "jobextra"
|
|
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
name = Column(String)
|
|
value = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
|
|
|
|
class Job(Base):
|
|
__tablename__ = "job"
|
|
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
|
|
start_time = Column(DateTime(timezone=True))
|
|
last_update = Column(DateTime(timezone=True))
|
|
name = Column(String)
|
|
state = Column(String)
|
|
num_files = Column(Integer)
|
|
current_file_num = Column(Integer)
|
|
current_file = Column(String)
|
|
wait_for = Column(Integer)
|
|
pa_job_state = Column(String)
|
|
|
|
logs = relationship( "Joblog")
|
|
extra = relationship( "JobExtra")
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
|
|
|
|
class PA_JobManager_FE_Message(Base):
|
|
__tablename__ = "pa_job_manager_fe_message"
|
|
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
alert = Column(String)
|
|
message = Column(String)
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
|
|
|
##############################################################################
|
|
# Util (non Class) functions
|
|
##############################################################################
|
|
def MessageToFE( job_id, alert, message ):
|
|
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
|
session.add(msg)
|
|
session.commit()
|
|
return msg.id
|
|
|
|
def ProcessStorageDirs(parent_job):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
|
paths = settings.storage_path.split("#")
|
|
JobsForPaths( parent_job, paths )
|
|
return
|
|
|
|
def ProcessImportDirs(parent_job):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
|
paths = settings.import_path.split("#")
|
|
JobsForPaths( parent_job, paths )
|
|
return
|
|
|
|
def JobsForPaths( parent_job, paths ):
|
|
|
|
now=datetime.now(pytz.utc)
|
|
# make new set of Jobs per path... HandleJobs will make them run later
|
|
for path in paths:
|
|
d=session.query(Dir).filter(Dir.path_prefix==SymlinkName(path,path+'/')).first()
|
|
cfn=0
|
|
if d:
|
|
cfn=d.num_files
|
|
|
|
jex=JobExtra( name="path", value=path )
|
|
job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=cfn )
|
|
job.extra.append(jex)
|
|
session.add(job)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a>".format( job.id, job.id, job.name ) )
|
|
# force commit to make job.id be valid in use of wait_for later
|
|
session.commit()
|
|
jex2=JobExtra( name="path", value=path )
|
|
job2=Job(start_time=now, last_update=now, name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 )
|
|
job2.extra.append(jex2)
|
|
session.add(job2)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) )
|
|
|
|
jex3=JobExtra( name="path", value=path )
|
|
job3=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
|
|
job3.extra.append(jex3)
|
|
session.add(job3)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
|
|
"""
|
|
jex4=JobExtra( name="path", value=path )
|
|
job4=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
|
|
job4.extra.append(jex4)
|
|
session.add(job4)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
|
|
"""
|
|
HandleJobs()
|
|
return
|
|
|
|
def AddLogForJob(job, message, current_file=''):
|
|
now=datetime.now(pytz.utc)
|
|
log=Joblog( job_id=job.id, log=message, log_date=now )
|
|
job.last_update=now
|
|
if current_file != '':
|
|
job.current_file=os.path.basename(current_file)
|
|
# this may not be set on an import job
|
|
if job.num_files:
|
|
job.current_file_num=job.current_file_num+1
|
|
session.add(log)
|
|
return
|
|
|
|
def RunJob(job):
|
|
# session = Session()
|
|
job.start_time=datetime.now(pytz.utc)
|
|
if job.name =="scannow":
|
|
JobScanNow(job)
|
|
elif job.name =="forcescan":
|
|
JobForceScan(job)
|
|
elif job.name =="scan_sp":
|
|
JobScanStorageDir(job)
|
|
elif job.name =="importdir":
|
|
JobImportDir(job)
|
|
elif job.name =="getfiledetails":
|
|
JobGetFileDetails(job)
|
|
elif job.name == "checkdups":
|
|
CheckForDups(job)
|
|
elif job.name == "rmdups":
|
|
RemoveDups(job)
|
|
elif job.name == "processai":
|
|
JobProcessAI(job)
|
|
else:
|
|
print("ERROR: Requested to process unknown job type: {}".format(job.name))
|
|
# okay, we finished a job, so check for any jobs that are dependant on this and run them...
|
|
# session.close()
|
|
if job.pa_job_state != "Completed":
|
|
FinishJob(job, "PA Job Manager - This is a catchall to close of a Job, this should never be seen and implies a job did not actually complete?", "Failed" )
|
|
HandleJobs()
|
|
return
|
|
|
|
def CancelJob(job,id):
|
|
for j in session.query(Job).filter(Job.wait_for==id).all():
|
|
if DEBUG==1:
|
|
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(j.id, job.id) )
|
|
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
|
|
CancelJob(j, j.id)
|
|
return
|
|
|
|
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
|
job.state=state
|
|
job.pa_job_state=pa_job_state
|
|
job.last_update=datetime.now(pytz.utc)
|
|
AddLogForJob(job, last_log)
|
|
if job.state=="Failed":
|
|
CancelJob(job,job.id)
|
|
session.commit()
|
|
return
|
|
|
|
def HandleJobs():
|
|
|
|
print("INFO: PA job manager is scanning for new jobs to process")
|
|
for job in session.query(Job).all():
|
|
if job.pa_job_state == 'New':
|
|
if job.wait_for != None:
|
|
j2 = session.query(Job).get(job.wait_for)
|
|
if not j2:
|
|
print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
|
|
for j in session.query(Job).all():
|
|
print ("j={}".format(j.id))
|
|
continue
|
|
if j2.pa_job_state != 'Completed':
|
|
continue
|
|
|
|
# use this to remove threads for easier debugging, and errors will stacktrace to the console
|
|
if DEBUG==1:
|
|
print("*************************************")
|
|
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
|
|
RunJob(job)
|
|
else:
|
|
try:
|
|
RunJob(job)
|
|
# threading.Thread(target=RunJob, args=(job,)).start()
|
|
except Exception as e:
|
|
try:
|
|
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
|
|
except Exception as e2:
|
|
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
|
print("INFO: PA job manager is waiting jobs")
|
|
return
|
|
|
|
def JobProgressState( job, state ):
|
|
job.pa_job_state = "In Progress"
|
|
job.state=state
|
|
session.commit()
|
|
return
|
|
|
|
def JobScanNow(job):
|
|
JobProgressState( job, "In Progress" )
|
|
ProcessImportDirs(job)
|
|
FinishJob( job, "Completed (scan for new files)" )
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
session.commit()
|
|
return
|
|
|
|
def JobScanStorageDir(job):
|
|
JobProgressState( job, "In Progress" )
|
|
ProcessStorageDirs(job)
|
|
FinishJob( job, "Completed (scan for new files)" )
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
session.commit()
|
|
return
|
|
|
|
def JobForceScan(job):
|
|
JobProgressState( job, "In Progress" )
|
|
session.query(FileRefimgLink).delete()
|
|
session.query(EntryDirLink).delete()
|
|
session.query(Dir).delete()
|
|
session.query(File).delete()
|
|
session.query(Entry).delete()
|
|
session.commit()
|
|
ProcessImportDirs(job)
|
|
FinishJob(job, "Completed (forced remove and recreation of all file data)")
|
|
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
|
|
session.commit()
|
|
return
|
|
|
|
def SymlinkName(path, file):
|
|
sig_bit=file.replace(path, "")
|
|
last_dir=os.path.basename(path[0:-1])
|
|
|
|
if sig_bit[-1] == '/':
|
|
last_bit = os.path.dirname(sig_bit)[0:-1]
|
|
else:
|
|
last_bit = os.path.dirname(sig_bit)
|
|
symlink = 'static/'+last_dir+'/'+last_bit
|
|
if symlink[-1] == '/':
|
|
symlink=symlink[0:-1]
|
|
return symlink
|
|
|
|
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
|
|
def CreateSymlink(job,path):
|
|
symlink='static/{}'.format(os.path.basename(path[0:-1]))
|
|
if not os.path.exists(symlink):
|
|
os.symlink(path, symlink)
|
|
return symlink
|
|
|
|
def AddDir(job, dirname, path_prefix, in_dir):
|
|
dir=session.query(Dir).filter(Dir.path_prefix==path_prefix).first()
|
|
if dir:
|
|
e=session.query(Entry).get(dir.eid)
|
|
e.exists_on_fs=True
|
|
return dir
|
|
dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0 )
|
|
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
|
e=Entry( name=dirname, type=dtype, exists_on_fs=True )
|
|
e.dir_details.append(dir)
|
|
# no in_dir occurs when we Add the actual Dir for the import_path (top of the tree)
|
|
if in_dir:
|
|
e.in_dir.append(in_dir)
|
|
if DEBUG==1:
|
|
print(f"AddDir: created d={dirname}, pp={path_prefix}")
|
|
AddLogForJob(job, f"DEBUG: Process new dir: {dirname}")
|
|
session.add(e)
|
|
return dir
|
|
|
|
def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ):
|
|
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first()
|
|
if e:
|
|
print( f"################################################ FILE EXISTS ALREADY: {fname} -- {in_dir.path_prefix} {e}" )
|
|
e.exists_on_fs=True
|
|
return e
|
|
ftype = session.query(FileType).filter(FileType.name==type_str).first()
|
|
e=Entry( name=fname, type=ftype, exists_on_fs=True )
|
|
f=File( size_mb=fsize, last_hash_date=0, faces_created_on=0, year=year, month=month, day=day, woy=woy )
|
|
e.file_details.append(f)
|
|
e.in_dir.append(in_dir)
|
|
AddLogForJob(job, "Found new file: {}".format(fname) )
|
|
session.add(e)
|
|
return e
|
|
|
|
# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that
|
|
# is still false, has been deleted
|
|
def ResetExistsOnFS(job, path):
|
|
reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).filter(Dir.path_prefix.ilike(path+'%')).all()
|
|
for reset_dir in reset_dirs:
|
|
reset_dir.exists_on_fs=False
|
|
session.add(reset_dir)
|
|
reset_files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==reset_dir.id).all()
|
|
for reset_file in reset_files:
|
|
reset_file.exists_on_fs=False
|
|
session.add(reset_file)
|
|
return
|
|
|
|
def HandleAnyFSDeletions(job):
|
|
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
|
rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all()
|
|
rm_cnt=0
|
|
for rm in rms:
|
|
session.query(EntryDirLink).filter(EntryDirLink.entry_id==rm.id).delete()
|
|
session.query(File).filter(File.eid==rm.id).delete()
|
|
session.query(Entry).filter(Entry.id==rm.id).delete()
|
|
AddLogForJob( job, f"INFO: Removing {rm.name} from system as it is no longer on the file system")
|
|
rm_cnt+=1
|
|
|
|
rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==1).order_by(Entry.id.desc()).all()
|
|
for rmdir in rmdirs:
|
|
print(f"We have a directory ({rmdir.name}) to delete from DB as it no longer exists on fs");
|
|
session.query(EntryDirLink).filter(EntryDirLink.entry_id==rmdir.id).delete()
|
|
session.query(Dir).filter(Dir.eid==rmdir.id).delete()
|
|
session.query(Entry).filter(Entry.id==rmdir.id).delete()
|
|
AddLogForJob( job, f"INFO: Removing {rmdir.name} from system as it is no longer on the file system")
|
|
rm_cnt+=1
|
|
return rm_cnt
|
|
|
|
def GetDateFromFile(file, stat):
|
|
# try exif
|
|
try:
|
|
print(f"trying exif read of {file}")
|
|
f = open(file, 'rb')
|
|
tags = exifread.process_file(f)
|
|
date_str, time_str = str(tags["EXIF DateTimeOriginal"]).split(" ")
|
|
print(date_str)
|
|
year, month, day = date_str.split(":")
|
|
year=int(year)
|
|
month=int(month)
|
|
day=int(day)
|
|
print(year)
|
|
check = datetime( year, month, day )
|
|
print( f"check={check}" )
|
|
except:
|
|
# try parsing filename
|
|
try:
|
|
m=re.search( '(\d{4})(\d{2})(\d{2})', file)
|
|
year=int(m[1])
|
|
month=int(m[2])
|
|
day=int(m[3])
|
|
check2 = datetime( year, month, day )
|
|
print( f"check2={check2}" )
|
|
# give up and use file sys date
|
|
except:
|
|
year, month, day, _, _, _, _, _, _ = datetime.fromtimestamp(stat.st_ctime).timetuple()
|
|
c=date(year, month, day).isocalendar()
|
|
woy=c[1]
|
|
return year, month, day, woy
|
|
|
|
|
|
def JobImportDir(job):
|
|
JobProgressState( job, "In Progress" )
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / paths missing")
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
AddLogForJob(job, "Checking Directory: {}".format( path ) )
|
|
if DEBUG==1:
|
|
print("DEBUG: Checking Directory: {}".format( path ) )
|
|
if not os.path.exists( path ):
|
|
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
|
|
return
|
|
symlink=CreateSymlink(job,path)
|
|
ResetExistsOnFS(job, symlink)
|
|
|
|
walk=os.walk(path, topdown=True)
|
|
ftree=list(walk)
|
|
|
|
# go through data once to work out file_cnt so progress bar works from first import
|
|
overall_file_cnt=0
|
|
for root, subdirs, files in ftree:
|
|
overall_file_cnt+= len(subdirs) + len(files)
|
|
|
|
parent_dir=None
|
|
dir=AddDir(job, os.path.basename(symlink), symlink, parent_dir)
|
|
dir.num_files=overall_file_cnt
|
|
# session.add in case we already have imported this dir (as AddDir wont) & now we might have diff num of files to last time,
|
|
session.add(dir)
|
|
job.num_files=overall_file_cnt
|
|
AddLogForJob(job, f"Found {overall_file_cnt} file(s) to process")
|
|
session.commit()
|
|
|
|
# root == path of dir, files are in dir... subdirs are in dir
|
|
for root, subdirs, files in ftree:
|
|
# already create root above to work out num_files for whole os.walk
|
|
if root != path:
|
|
pp=SymlinkName( path, root )+'/'+os.path.basename(root)
|
|
dir=AddDir(job, os.path.basename(root), pp, parent_dir)
|
|
for basename in files:
|
|
# commit every 100 files to see progress being made but not hammer the database
|
|
if job.current_file_num % 100 == 0:
|
|
session.commit()
|
|
fname=dir.path_prefix+'/'+basename
|
|
|
|
stat = os.stat(fname)
|
|
if stat.st_ctime > dir.last_import_date:
|
|
AddLogForJob(job, f"Processing new/update file: {basename}", basename )
|
|
if DEBUG==1:
|
|
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
|
|
if isImage(fname):
|
|
type_str = 'Image'
|
|
elif isVideo(fname):
|
|
type_str = 'Video'
|
|
else:
|
|
type_str = 'Unknown'
|
|
fsize = round(stat.st_size/(1024*1024))
|
|
|
|
year, month, day, woy = GetDateFromFile(fname, stat)
|
|
e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy )
|
|
else:
|
|
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==basename,Dir.eid==dir.eid).first()
|
|
e.exists_on_fs=True
|
|
if DEBUG==1:
|
|
print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename )
|
|
job.current_file=basename
|
|
job.current_file_num+=1
|
|
dir.num_files=len(files)+len(subdirs)
|
|
dir.last_import_date = time.time()
|
|
parent_dir=dir
|
|
job.num_files=overall_file_cnt
|
|
job.current_file_num=overall_file_cnt
|
|
|
|
rm_cnt=HandleAnyFSDeletions(job)
|
|
|
|
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)")
|
|
return
|
|
|
|
def RunFuncOnFilesInPath( job, path, file_func ):
|
|
d=session.query(Dir).filter(Dir.path_prefix==path).first()
|
|
for e in d.files:
|
|
ProcessFilesInDir(job, e, file_func)
|
|
|
|
|
|
def JobProcessAI(job):
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path = SymlinkName(path, '/')
|
|
d=session.query(Dir).filter(Dir.path_prefix==path).first()
|
|
job.num_files=d.num_files
|
|
|
|
people = session.query(Person).all()
|
|
for person in people:
|
|
generateKnownEncodings(person)
|
|
|
|
RunFuncOnFilesInPath( job, path, ProcessAI )
|
|
|
|
FinishJob(job, "Finished Processesing AI")
|
|
return
|
|
|
|
|
|
def GenHashAndThumb(job, e):
|
|
# commit every 100 files to see progress being made but not hammer the database
|
|
if job.current_file_num % 100 == 0:
|
|
session.commit()
|
|
stat = os.stat( e.in_dir[0].path_prefix + '/' + e.name )
|
|
if stat.st_ctime < e.file_details[0].last_hash_date:
|
|
print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
|
|
job.current_file_num+=1
|
|
return
|
|
|
|
e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name )
|
|
if DEBUG==1:
|
|
print( f"{e.name} - hash={e.file_details[0].hash}" )
|
|
if e.type.name == 'Image':
|
|
e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
|
|
elif e.type.name == 'Video':
|
|
e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
|
|
elif e.type.name == 'Unknown':
|
|
job.current_file_num+=1
|
|
e.file_details[0].last_hash_date = time.time()
|
|
return
|
|
|
|
def ProcessAI(job, e):
|
|
if e.type.name != 'Image':
|
|
job.current_file_num+=1
|
|
return
|
|
|
|
file = e.in_dir[0].path_prefix + '/' + e.name
|
|
stat = os.stat(file)
|
|
# find if file is newer than when we found faces before (fyi: first time faces_created_on == 0)
|
|
if stat.st_ctime > e.file_details[0].faces_created_on:
|
|
session.add(e)
|
|
im_orig = Image.open(file)
|
|
im = ImageOps.exif_transpose(im_orig)
|
|
|
|
faces = generateUnknownEncodings(im)
|
|
e.file_details[0].faces_created_on=time.time()
|
|
if faces:
|
|
flat_faces = numpy.array(faces)
|
|
e.file_details[0].faces = flat_faces.tobytes()
|
|
else:
|
|
e.file_details[0].faces = None
|
|
job.current_file_num+=1
|
|
return
|
|
else:
|
|
if not e.file_details[0].faces:
|
|
print("OPTIM: This image has no faces, skip it")
|
|
job.current_file_num+=1
|
|
return
|
|
recover=numpy.frombuffer(e.file_details[0].faces,dtype=numpy.float64)
|
|
real_recover=numpy.reshape(recover,(-1,128))
|
|
l=[]
|
|
for el in real_recover:
|
|
l.append(numpy.array(el))
|
|
faces = l
|
|
people = session.query(Person).all()
|
|
for unknown_encoding in faces:
|
|
for person in people:
|
|
lookForPersonInImage(job, person, unknown_encoding, e)
|
|
AddLogForJob(job, f"Finished processing {e.name}", e.name )
|
|
return
|
|
|
|
def lookForPersonInImage(job, person, unknown_encoding, e):
|
|
for refimg in person.refimg:
|
|
# lets see if we have tried this check before
|
|
frl=session.query(FileRefimgLink).filter(FileRefimgLink.file_id==e.id, FileRefimgLink.refimg_id==refimg.id).first()
|
|
if not frl:
|
|
frl = FileRefimgLink(refimg_id=refimg.id, file_id=e.file_details[0].eid)
|
|
else:
|
|
stat=os.stat(e.in_dir[0].path_prefix+'/'+ e.name)
|
|
# file & refimg are not newer then we dont need to check
|
|
if frl.matched and stat.st_ctime < frl.when_processed and refimg.created_on < frl.when_processed:
|
|
print(f"OPTIM: lookForPersonInImage: file {e.name} has a previous match for: {refimg.fname}, and the file & refimg haven't changed")
|
|
return
|
|
|
|
session.add(frl)
|
|
frl.matched=False
|
|
frl.when_processed=time.time()
|
|
deserialized_bytes = numpy.frombuffer(refimg.encodings, dtype=numpy.float64)
|
|
results = compareAI(deserialized_bytes, unknown_encoding)
|
|
if results[0]:
|
|
print(f'Found a match between: {person.tag} and {e.name}')
|
|
AddLogForJob(job, f'Found a match between: {person.tag} and {e.name}')
|
|
frl.matched=True
|
|
return
|
|
|
|
def generateUnknownEncodings(im):
|
|
unknown_image = numpy.array(im)
|
|
face_locations = face_recognition.face_locations(unknown_image)
|
|
if not face_locations:
|
|
return None
|
|
unknown_encodings = face_recognition.face_encodings(unknown_image, known_face_locations=face_locations)
|
|
return unknown_encodings
|
|
|
|
|
|
def generateKnownEncodings(person):
|
|
for refimg in person.refimg:
|
|
file = 'reference_images/'+refimg.fname
|
|
stat = os.stat(file)
|
|
if refimg.created_on and stat.st_ctime < refimg.created_on:
|
|
print("OPTIM: skipping re-creating encoding for refimg because file has not changed")
|
|
continue
|
|
img = face_recognition.load_image_file(file)
|
|
location = face_recognition.face_locations(img)
|
|
encodings = face_recognition.face_encodings(img, known_face_locations=location)
|
|
refimg.encodings = encodings[0].tobytes()
|
|
refimg.created_on = time.time()
|
|
session.add(refimg)
|
|
session.commit()
|
|
|
|
def compareAI(known_encoding, unknown_encoding):
|
|
results = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.55)
|
|
return results
|
|
|
|
|
|
def ProcessFilesInDir(job, e, file_func):
|
|
if DEBUG==1:
|
|
print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix))
|
|
if e.type.name != 'Directory':
|
|
file_func(job, e)
|
|
else:
|
|
dir=session.query(Dir).filter(Dir.eid==e.id).first()
|
|
job.current_file_num+=1
|
|
for sub in dir.files:
|
|
ProcessFilesInDir(job, sub, file_func)
|
|
|
|
def JobGetFileDetails(job):
|
|
JobProgressState( job, "In Progress" )
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path='static'+'/'+os.path.basename(path[0:-1])
|
|
if DEBUG==1:
|
|
print("DEBUG: JobGetFileDetails for path={}".format( path ) )
|
|
dir=session.query(Dir).filter(Dir.path_prefix==path).first()
|
|
job.current_file_num = 0
|
|
job.num_files = dir.num_files
|
|
session.commit()
|
|
RunFuncOnFilesInPath( job, path, GenHashAndThumb )
|
|
FinishJob(job, "File Details job finished")
|
|
session.commit()
|
|
return
|
|
|
|
def isVideo(file):
|
|
try:
|
|
fileInfo = MediaInfo.parse(file)
|
|
for track in fileInfo.tracks:
|
|
if track.track_type == "Video":
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
|
|
# Returns an md5 hash of the fnames' contents
|
|
def md5(job, fname):
|
|
hash_md5 = hashlib.md5()
|
|
with open(fname, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
hash = hash_md5.hexdigest()
|
|
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
|
|
return hash
|
|
|
|
def isImage(file):
|
|
try:
|
|
img = Image.open(file)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def GenImageThumbnail(job, file):
|
|
AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
|
|
try:
|
|
im_orig = Image.open(file)
|
|
im = ImageOps.exif_transpose(im_orig)
|
|
im.thumbnail((THUMBSIZE,THUMBSIZE))
|
|
img_bytearray = io.BytesIO()
|
|
im.save(img_bytearray, format='JPEG')
|
|
img_bytearray = img_bytearray.getvalue()
|
|
thumbnail = base64.b64encode(img_bytearray)
|
|
thumbnail = str(thumbnail)[2:-1]
|
|
except Exception as e:
|
|
print('WARNING: NO EXIF TAGS?!?!?!?')
|
|
AddLogForJob(job, f"WARNING: No EXIF TAF found for: {file} - error={e}")
|
|
return None
|
|
return thumbnail
|
|
|
|
def GenVideoThumbnail(job, file):
|
|
AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file )
|
|
try:
|
|
vcap = cv2.VideoCapture(file)
|
|
res, frame = vcap.read()
|
|
# if the mean pixel value is > 15, we have something worth making a sshot of (no black frame at start being the sshot)
|
|
while frame.mean() < 15 and res:
|
|
res, frame = vcap.read()
|
|
w = vcap.get(cv2.cv2.CAP_PROP_FRAME_WIDTH)
|
|
h = vcap.get(cv2.cv2.CAP_PROP_FRAME_HEIGHT)
|
|
if w > h:
|
|
factor = w / THUMBSIZE
|
|
else:
|
|
factor = h / THUMBSIZE
|
|
new_h = int(h / factor)
|
|
new_w = int(w / factor)
|
|
frame = cv2.resize(frame, (new_w, new_h), 0, 0, cv2.INTER_LINEAR)
|
|
|
|
res, thumb_buf = cv2.imencode('.jpeg', frame)
|
|
bt = thumb_buf.tobytes()
|
|
thumbnail = base64.b64encode(bt)
|
|
thumbnail = str(thumbnail)[2:-1]
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to Generate thumbnail for video file: {file} - error={e}" )
|
|
return None
|
|
return thumbnail
|
|
|
|
def CheckForDups(job):
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path='static'+'/'+os.path.basename(path[0:-1])
|
|
|
|
AddLogForJob( job, f"Check for duplicates in import path: {path}" )
|
|
res = session.execute( f"select count(e1.name) as count from entry e1, file f1, dir d1, entry_dir_link edl1, entry e2, file f2, dir d2, entry_dir_link edl2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and d1.path_prefix like '%{path}%' and f1.hash = f2.hash and e1.id != e2.id" )
|
|
for row in res:
|
|
if row.count > 0:
|
|
AddLogForJob(job, f"Found duplicates, Creating Status message in front-end for attention")
|
|
msg_id=MessageToFE( job.id, "danger", 'replaceme' )
|
|
session.query(PA_JobManager_FE_Message).filter(PA_JobManager_FE_Message.id==msg_id).update( { 'message' : f'Found duplicate(s), click <a href="javascript:document.body.innerHTML+=\'<form id=_fm method=POST action=/fix_dups><input type=hidden name=fe_msg_id value={msg_id}></form>\'; document.getElementById(\'_fm\').submit();">here</a> to finalise import by removing duplicates' } )
|
|
FinishJob(job, f"Finished Looking for Duplicates")
|
|
return
|
|
|
|
def RemoveDups(job):
|
|
# clear FE message we are deleting dups for this now...
|
|
fe_msg_id =[jex.value for jex in job.extra if jex.name == "fe_msg_id"][0]
|
|
msg=session.query(PA_JobManager_FE_Message).get(fe_msg_id)
|
|
session.query(PA_JobManager_FE_Message).filter(PA_JobManager_FE_Message.id==fe_msg_id).delete()
|
|
session.commit()
|
|
|
|
AddLogForJob(job, f"INFO: Starting Remove Duplicates job...")
|
|
dup_cnt=0
|
|
for jex in job.extra:
|
|
if 'kfid-' in jex.name:
|
|
pfx, which = jex.name.split('-')
|
|
hash=[jex.value for jex in job.extra if jex.name == f"kfhash-{which}"][0]
|
|
AddLogForJob(job, f"deleting duplicate files with hash: {hash} but keeping file with DB id={jex.value}" )
|
|
files=session.query(Entry).join(File).filter(File.hash==hash).all()
|
|
keeping=jex.value
|
|
found=None
|
|
del_me_lst = []
|
|
for f in files:
|
|
if os.path.isfile(f.in_dir[0].path_prefix+'/'+f.name) == False:
|
|
AddLogForJob( job, "ERROR: file (DB id: {f.eid} - {f.in_dir[0].path_prefix}/{f.name}) does not exist? ignorning file")
|
|
elif f.file_details[0].eid == int(keeping):
|
|
found = f
|
|
else:
|
|
del_me_lst.append(f)
|
|
if found == None:
|
|
AddLogForJob( job, f"ERROR: Cannot find file with hash={hash} to process - skipping it)" )
|
|
else:
|
|
AddLogForJob(job, f"Keep duplicate file: {found.in_dir[0].path_prefix}/{found.name}" )
|
|
for del_me in del_me_lst:
|
|
AddLogForJob(job, f"Remove duplicate file: {del_me.in_dir[0].path_prefix}/{del_me.name}" )
|
|
os.remove( del_me.in_dir[0].path_prefix+'/'+del_me.name )
|
|
dup_cnt += 1
|
|
|
|
if 'kdid-' in jex.name:
|
|
pfx, which = jex.name.split('-')
|
|
hashes=[jex.value for jex in job.extra if jex.name == f"kdhash-{which}"][0]
|
|
keeping=jex.value
|
|
tmp=session.query(Dir).filter(Dir.eid==keeping).first()
|
|
AddLogForJob(job, f"Keeping files in {tmp.path_prefix}" )
|
|
for hash in hashes[0:-1].split(","):
|
|
files=session.query(Entry).join(File).filter(File.hash==hash).all()
|
|
found=None
|
|
for f in files:
|
|
if os.path.isfile(f.in_dir[0].path_prefix+'/'+f.name) == False:
|
|
AddLogForJob( job, "ERROR: file (DB id: {f.eid} - {f.in_dir[0].path_prefix}/{f.name}) does not exist? ignorning file")
|
|
if f.in_dir[0].eid == int(keeping):
|
|
found=f
|
|
else:
|
|
del_me=f
|
|
|
|
if found == None:
|
|
AddLogForJob( job, f"ERROR: Cannot find file with hash={hash} to process - skipping it)" )
|
|
else:
|
|
AddLogForJob(job, f"Keep duplicate file: {found.in_dir[0].path_prefix}/{found.name}" )
|
|
AddLogForJob(job, f"Remove duplicate file: {del_me.in_dir[0].path_prefix}/{del_me.name}" )
|
|
os.remove( del_me.in_dir[0].path_prefix+'/'+del_me.name )
|
|
dup_cnt += 1
|
|
|
|
FinishJob(job, f"Finished removing {dup_cnt} duplicate files" )
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
|
|
|
|
# now=datetime.now(pytz.utc)
|
|
# job=Job(start_time=now, last_update=now, name="scannow", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=0 )
|
|
# session.add(job)
|
|
# session.commit()
|
|
HandleJobs()
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
|
s.listen()
|
|
while True:
|
|
conn, addr = s.accept()
|
|
HandleJobs()
|