### # # This file controls the 'external' job control manager, that (periodically # # looks / somehow is pushed an event?) picks up new jobs, and processes them. # # It then stores the progress/status, etc. in job and joblog tables as needed # via wrapper functions. # # The whole pa_job_manager is multi-threaded, and uses the database tables for # state management and communication back to the pa web site # ### ### SQLALCHEMY IMPORTS ### from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session ### SQLALCHEMY IMPORTS ### ### LOCAL FILE IMPORTS ### from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT from datetime import datetime, timedelta import pytz import time import os import glob from PIL import Image, ImageOps from pymediainfo import MediaInfo import hashlib #import exifread import base64 import numpy import cv2 import socket import threading import io import face_recognition DEBUG=1 # an Manager, which the Session will use for connection resources some_engine = create_engine(DB_URL) # create a configured "Session" class #Session = sessionmaker(bind=some_engine) # create a Session session_factory = sessionmaker(bind=some_engine) Session = scoped_session(session_factory) session = Session() Base = declarative_base() ################################################################################ # Class describing File in the database, and via sqlalchemy, connected to the DB as well # This has to match one-for-one the DB table ################################################################################ class EntryDirLink(Base): __tablename__ = "entry_dir_link" entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True ) dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True ) def __repr__(self): return "".format(self.entry_id, self.dir_eid) class Dir(Base): __tablename__ = "dir" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) path_prefix = Column(String, unique=False, nullable=False ) num_files = Column(Integer) last_import_date = Column(Float) last_hash_date = Column(Float) files = relationship("Entry", secondary="entry_dir_link") def __repr__(self): return "".format(self.eid, self.path_prefix, self.num_files, self.last_import_date, self.last_hash_date) class Entry(Base): __tablename__ = "entry" id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) name = Column(String, unique=True, nullable=False ) type_id = Column(Integer, ForeignKey("file_type.id")) type=relationship("FileType") dir_details = relationship( "Dir") file_details = relationship( "File" ) in_dir = relationship ("Dir", secondary="entry_dir_link" ) def __repr__(self): return "".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir) class File(Base): __tablename__ = "file" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) size_mb = Column(Integer, unique=False, nullable=False) hash = Column(Integer, unique=True, nullable=True) thumbnail = Column(String, unique=False, nullable=True) def __repr__(self): return "".format(self.eid, self.size_mb, self.hash ) class FileType(Base): __tablename__ = "file_type" id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True ) name = Column(String, unique=True, nullable=False ) def __repr__(self): return "".format(self.id, self.name ) class Settings(Base): __tablename__ = "settings" id = Column(Integer, Sequence('settings_id_seq'), primary_key=True ) import_path = Column(String) def __repr__(self): return "".format(self.id, self.import_path ) class Person_Refimg_Link(Base): __tablename__ = "person_refimg_link" person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True) refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True) def __repr__(self): return "".format(self.person_id, self.refimg_id) class Person(Base): __tablename__ = "person" id = Column(Integer, Sequence('person_id_seq'), primary_key=True ) tag = Column(String(48), unique=False, nullable=False) surname = Column(String(48), unique=False, nullable=False) firstname = Column(String(48), unique=False, nullable=False) refimg = relationship('Refimg', secondary=Person_Refimg_Link.__table__) def __repr__(self): return "".format(self.tag,self.firstname, self.surname, self.refimg) class Refimg(Base): __tablename__ = "refimg" id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True ) fname = Column(String(256), unique=True, nullable=False) encodings = Column(LargeBinary) created_on = Column(Float) def __repr__(self): return f"" class File_Person_Link(Base): __tablename__ = "file_person_link" file_id = Column(Integer, ForeignKey('file.eid'), unique=True, nullable=False, primary_key=True) person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True) def __repr__(self): return "".format(self.file_id, self.person_id) ################################################################################ # classes for the job manager: # Job (and Joblog, JobExtra) for each Job, and # PA_Jobmanager_fe_message (to pass messages to the front-end web) ################################################################################ class Joblog(Base): __tablename__ = "joblog" id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id') ) log_date = Column(DateTime(timezone=True)) log = Column(String) def __repr__(self): return "".format(self.id, self.job_id, self.name, self.value ) class Job(Base): __tablename__ = "job" id = Column(Integer, Sequence('job_id_seq'), primary_key=True ) start_time = Column(DateTime(timezone=True)) last_update = Column(DateTime(timezone=True)) name = Column(String) state = Column(String) num_files = Column(Integer) current_file_num = Column(Integer) current_file = Column(String) wait_for = Column(Integer) pa_job_state = Column(String) logs = relationship( "Joblog") extra = relationship( "JobExtra") def __repr__(self): return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) class PA_JobManager_FE_Message(Base): __tablename__ = "pa_job_manager_fe_message" id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id'), primary_key=True ) alert = Column(String) message = Column(String) def __repr__(self): return "job id={} {}".format( job.id, job.id, job.name ) ) # force commit to make job.id be valid in use of wait_for later session.commit() jex2=JobExtra( name="path", value=path ) job2=Job(start_time=now, last_update=now, name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 ) job2.extra.append(jex2) session.add(job2) session.commit() if parent_job: AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) ) jex3=JobExtra( name="path", value=path ) job3=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 ) job3.extra.append(jex3) session.add(job3) session.commit() if parent_job: AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) ) HandleJobs() return def AddLogForJob(job, message, current_file=''): now=datetime.now(pytz.utc) log=Joblog( job_id=job.id, log=message, log_date=now ) job.last_update=now if current_file != '': job.current_file=os.path.basename(current_file) # this may not be set on an import job if job.num_files: job.current_file_num=job.current_file_num+1 session.add(log) session.commit() return def RunJob(job): # session = Session() job.start_time=datetime.now(pytz.utc) if job.name =="scannow": JobScanNow(job) elif job.name =="forcescan": JobForceScan(job) elif job.name =="importdir": JobImportDir(job) elif job.name =="getfiledetails": JobGetFileDetails(job) elif job.name == "processai": JobProcessAI(job) else: print("ERROR: Requested to process unknown job type: {}".format(job.name)) # okay, we finished a job, so check for any jobs that are dependant on this and run them... # session.close() if job.pa_job_state != "Completed": FinishJob(job, "PA Job Manager - This is a catchall to close of a Job, this sould never be seen and implies a job did not complete formally?", "Failed" ) HandleJobs() return def CancelJob(job,id): for j in session.query(Job).filter(Job.wait_for==id).all(): if DEBUG==1: print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(j.id, job.id) ) FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) CancelJob(j, j.id) return def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): job.state=state job.pa_job_state=pa_job_state job.last_update=datetime.now(pytz.utc) AddLogForJob(job, last_log) if job.state=="Failed": CancelJob(job,job.id) return def HandleJobs(): print("INFO: PA job manager is scanning for new jobs to process") for job in session.query(Job).all(): if job.pa_job_state == 'New': if job.wait_for != None: j2 = session.query(Job).get(job.wait_for) if not j2: print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for )) for j in session.query(Job).all(): print ("j={}".format(j.id)) continue if j2.pa_job_state != 'Completed': continue # use this to remove threads for easier debugging, and errors will stacktrace to the console if DEBUG==1: print("*************************************") print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for )) RunJob(job) else: try: RunJob(job) # threading.Thread(target=RunJob, args=(job,)).start() except Exception as e: try: MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) except Exception as e2: print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) ) print("INFO: PA job manager is waiting jobs") return def JobProgressState( job, state ): job.pa_job_state = "In Progress" job.state=state session.commit() return def JobScanNow(job): JobProgressState( job, "In Progress" ) ProcessImportDirs(job) FinishJob( job, "Completed (scan for new files)" ) MessageToFE( job.id, "success", "Completed (scan for new files)" ) session.commit() return def JobForceScan(job): JobProgressState( job, "In Progress" ) session.query(File_Person_Link).delete() session.query(EntryDirLink).delete() session.query(Dir).delete() session.query(File).delete() session.query(Entry).delete() session.commit() ProcessImportDirs(job) FinishJob(job, "Completed (forced remove and recreation of all file data)") MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" ) session.commit() return def SymlinkName(path, file): sig_bit=file.replace(path, "") last_dir=os.path.basename(path[0:-1]) if sig_bit[-1] == '/': last_bit = os.path.dirname(sig_bit)[0:-1] else: last_bit = os.path.dirname(sig_bit) symlink = 'static'+'/'+last_dir+'/'+last_bit if symlink[-1] == '/': symlink=symlink[0:-1] return symlink # to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists def CreateSymlink(job,path): symlink='static/{}'.format(os.path.basename(path[0:-1])) if not os.path.exists(symlink): os.symlink(path, symlink) return symlink def AddDir(job, dirname, path_prefix, in_dir): dir=session.query(Dir).filter(Dir.path_prefix==path_prefix).first() if dir: return dir dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0, last_hash_date=0 ) dtype=session.query(FileType).filter(FileType.name=='Directory').first() e=Entry( name=dirname, type=dtype ) e.dir_details.append(dir) # no in_dir occurs when we Add the actual Dir for the import_path (top of the tree) if in_dir: e.in_dir.append(in_dir) if DEBUG==1: print(f"AddDir: created d={dirname}, pp={path_prefix}") AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) ) session.add(e) return dir def AddFile(job, fname, type_str, fsize, in_dir ): # see if this exists already e=session.query(Entry).filter(Entry.name==fname).first() if e: return e ftype = session.query(FileType).filter(FileType.name==type_str).first() e=Entry( name=fname, type=ftype ) f=File( size_mb=fsize ) e.file_details.append(f) e.in_dir.append(in_dir) AddLogForJob(job, "Found new file: {}".format(fname) ) session.add(e) return e def JobImportDir(job): JobProgressState( job, "In Progress" ) settings = session.query(Settings).first() if settings == None: raise Exception("Cannot create file data with no settings / import path is missing") path=[jex.value for jex in job.extra if jex.name == "path"][0] AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) if DEBUG==1: print("DEBUG: Checking Import Directory: {}".format( path ) ) if not os.path.exists( path ): FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) return symlink=CreateSymlink(job,path) overall_file_cnt=0 walk=os.walk(path, topdown=True) # root == path of dir, files are in dir... subdirs are in dir parent_dir=None for root, subdirs, files in walk: print(f"walk: r={root} s={subdirs} f={files}") overall_file_cnt+= len(subdirs) + len(files) if root == path: pp = symlink else: pp=SymlinkName( path, root )+'/'+os.path.basename(root) if root[-1]=="/": root=root[0:-1] dir=AddDir(job, os.path.basename(root), pp, parent_dir) parent_dir=dir for basename in files: fname=dir.path_prefix+'/'+basename stat = os.stat(fname) if stat.st_ctime > dir.last_import_date: if DEBUG==1: AddLogForJob(job, "DEBUG: {} - is new/updated".format( basename ), basename ) print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) ) if isImage(fname): type_str = 'Image' elif isVideo(fname): type_str = 'Video' else: type_str = 'Unknown' fsize = round(stat.st_size/(1024*1024)) e=AddFile( job, basename, type_str, fsize, dir ) else: if DEBUG==1: AddLogForJob(job, "DEBUG: {} - is unchanged".format( basename, basename ) ) print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename ) dir.num_files=len(files)+len(subdirs) dir.last_import_date = time.time() job.num_files=overall_file_cnt job.current_file_num=overall_file_cnt FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) ) import_dir=session.query(Dir).filter(Dir.path_prefix==symlink).first() import_dir.num_files=overall_file_cnt session.commit() return def JobProcessAI(job): path=[jex.value for jex in job.extra if jex.name == "path"][0] path = SymlinkName(path, '/') print('REMOVE AFTER TESTING ON WINDOWS... path=',path) d=session.query(Dir).filter(Dir.path_prefix==path).first() job.num_files=d.num_files for e in FilesInDir( path ): ProcessFilesInDir(job, e, ProcessAI) FinishJob(job, "Finished Processesing AI") return def FilesInDir( path ): d=session.query(Dir).filter(Dir.path_prefix==path).first() return d.files def GenHashAndThumb(job, e): e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name ) if e.type.name == 'Image': e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) elif e.type.name == 'Video': e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) elif e.type.name == 'Unknown': job.current_file_num+=1 return def ProcessAI(job, e): if e.type.name != 'Image': job.current_file_num+=1 return people = session.query(Person).all() for person in people: generateKnownEncodings(person) file = e.in_dir[0].path_prefix + '/' + e.name im_orig = Image.open(file) im = ImageOps.exif_transpose(im_orig) unknown_encodings = generateUnknownEncodings(im) for unknown_encoding in unknown_encodings: for person in people: lookForPersonInImage(job, person, unknown_encoding, e) AddLogForJob(job, f"Finished processing {e.name}", e.name ) return def lookForPersonInImage(job, person, unknown_encoding, e): for refimg in person.refimg: deserialized_bytes = numpy.frombuffer(refimg.encodings, dtype=numpy.float64) #deserialized_x = numpy.reshape(deserialized_bytes, newshape=(2,2)) results = compareAI(deserialized_bytes, unknown_encoding) if results[0]: print(f'Found a match between: {person.tag} and {e.name}') AddLogForJob(job, f'Found a match between: {person.tag} and {e.name}') fpl = File_Person_Link(person_id=person.id, file_id=e.file_details[0].eid) session.add(fpl) return def generateUnknownEncodings(im): unknown_image = numpy.array(im) face_locations = face_recognition.face_locations(unknown_image) unknown_encodings = face_recognition.face_encodings(unknown_image, known_face_locations=face_locations) # should save these to the db # file.locations = face_locations return unknown_encodings def generateKnownEncodings(person): for refimg in person.refimg: file = 'reference_images/'+refimg.fname stat = os.stat(file) if refimg.created_on and stat.st_ctime < refimg.created_on: print("DEBUG: skipping re-creating encoding for refimg because file has changed since we did this before") continue img = face_recognition.load_image_file(file) location = face_recognition.face_locations(img) encodings = face_recognition.face_encodings(img, known_face_locations=location) refimg.encodings = encodings[0].tobytes() refimg.created_on = time.time() session.add(refimg) session.commit() def compareAI(known_encoding, unknown_encoding): results = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.55) return results def ProcessFilesInDir(job, e, file_func): if DEBUG==1: print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix)) if e.type.name != 'Directory': file_func(job, e) else: dir=session.query(Dir).filter(Dir.eid==e.id).first() job.current_file_num+=1 for sub in dir.files: ProcessFilesInDir(job, sub, file_func) def JobGetFileDetails(job): JobProgressState( job, "In Progress" ) path=[jex.value for jex in job.extra if jex.name == "path"][0] path='static'+'/'+os.path.basename(path[0:-1]) if DEBUG==1: print("DEBUG: JobGetFileDetails for path={}".format( path ) ) dir=session.query(Dir).filter(Dir.path_prefix==path).first() stat=os.stat( path ) if stat.st_ctime < dir.last_hash_date: session.add(dir) dir.last_hash_date = time.time() FinishJob(job, "{} has not changed since last hashing - finished job".format(dir.path_prefix)) if DEBUG==1: print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) return dir.last_hash_date = time.time() job.current_file_num = 0 job.num_files = dir.num_files session.commit() for e in FilesInDir( path ): ProcessFilesInDir(job, e, GenHashAndThumb ) FinishJob(job, "File Details job finished") session.commit() return def isVideo(file): try: fileInfo = MediaInfo.parse(file) for track in fileInfo.tracks: if track.track_type == "Video": return True return False except Exception as e: return False # Returns an md5 hash of the fnames' contents def md5(job, fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) hash = hash_md5.hexdigest() AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) ) return hash def isImage(file): try: img = Image.open(file) return True except: return False def GenImageThumbnail(job, file): thumbnail=None AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file ) try: im_orig = Image.open(file) im = ImageOps.exif_transpose(im_orig) im.thumbnail((256,256)) img_bytearray = io.BytesIO() im.save(img_bytearray, format='JPEG') img_bytearray = img_bytearray.getvalue() thumbnail = base64.b64encode(img_bytearray) thumbnail = str(thumbnail)[2:-1] except: print('WARNING: NO EXIF TAGS?!?!?!?') AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file)) return thumbnail def GenVideoThumbnail(job, file): AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file ) vcap = cv2.VideoCapture(file) res, im_ar = vcap.read() while im_ar.mean() < 15 and res: res, im_ar = vcap.read() im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR) res, thumb_buf = cv2.imencode('.jpeg', im_ar) bt = thumb_buf.tobytes() thumbnail = base64.b64encode(bt) thumbnail = str(thumbnail)[2:-1] return thumbnail if __name__ == "__main__": print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) ) ProcessImportDirs() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) s.listen() while True: conn, addr = s.accept() HandleJobs()