### # # This file controls the 'external' job control manager, that (periodically # # looks / somehow is pushed an event?) picks up new jobs, and processes them. # # It then stores the progress/status, etc. in job and joblog tables as needed # via wrapper functions. # # The whole pa_job_manager is multi-threaded, and uses the database tables for # state management and communication back to the pa web site # ### ### SQLALCHEMY IMPORTS ### from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session ### SQLALCHEMY IMPORTS ### ### LOCAL FILE IMPORTS ### from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE from datetime import datetime, timedelta, date import pytz import time import os import glob from PIL import Image, ImageOps from pymediainfo import MediaInfo import hashlib #import exifread import base64 import numpy import cv2 import socket import threading import io import face_recognition import re DEBUG=1 # an Manager, which the Session will use for connection resources some_engine = create_engine(DB_URL) # create a configured "Session" class #Session = sessionmaker(bind=some_engine) # create a Session session_factory = sessionmaker(bind=some_engine) Session = scoped_session(session_factory) session = Session() Base = declarative_base() ################################################################################ # Class describing File in the database, and via sqlalchemy, connected to the DB as well # This has to match one-for-one the DB table ################################################################################ class EntryDirLink(Base): __tablename__ = "entry_dir_link" entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True ) dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True ) def __repr__(self): return f"" class Dir(Base): __tablename__ = "dir" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) path_prefix = Column(String, unique=True, nullable=False ) num_files = Column(Integer) last_import_date = Column(Float) files = relationship("Entry", secondary="entry_dir_link") def __repr__(self): return f"" class Entry(Base): __tablename__ = "entry" id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) name = Column(String, unique=False, nullable=False ) type_id = Column(Integer, ForeignKey("file_type.id")) exists_on_fs=Column(Boolean) type=relationship("FileType") dir_details = relationship( "Dir") file_details = relationship( "File" ) in_dir = relationship ("Dir", secondary="entry_dir_link" ) def __repr__(self): return f"" class FileRefimgLink(Base): __tablename__ = "file_refimg_link" file_id = Column(Integer, ForeignKey('file.eid'), unique=True, nullable=False, primary_key=True) refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True) when_processed = Column(Float) matched = Column(Boolean) def __repr__(self): return f"" class FileType(Base): __tablename__ = "file_type" id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True ) name = Column(String, unique=True, nullable=False ) def __repr__(self): return f"" class Settings(Base): __tablename__ = "settings" id = Column(Integer, Sequence('settings_id_seq'), primary_key=True ) import_path = Column(String) def __repr__(self): return f"" class PersonRefimgLink(Base): __tablename__ = "person_refimg_link" person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True) refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True) def __repr__(self): return f"" class Person(Base): __tablename__ = "person" id = Column(Integer, Sequence('person_id_seq'), primary_key=True ) tag = Column(String(48), unique=False, nullable=False) surname = Column(String(48), unique=False, nullable=False) firstname = Column(String(48), unique=False, nullable=False) refimg = relationship('Refimg', secondary=PersonRefimgLink.__table__) def __repr__(self): return f"" class Refimg(Base): __tablename__ = "refimg" id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True ) fname = Column(String(256), unique=True, nullable=False) encodings = Column(LargeBinary) created_on = Column(Float) def __repr__(self): return f"" ################################################################################ # classes for the job manager: # Job (and Joblog, JobExtra) for each Job, and # PA_Jobmanager_fe_message (to pass messages to the front-end web) ################################################################################ class Joblog(Base): __tablename__ = "joblog" id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id') ) log_date = Column(DateTime(timezone=True)) log = Column(String) def __repr__(self): return "".format(self.id, self.job_id, self.name, self.value ) class Job(Base): __tablename__ = "job" id = Column(Integer, Sequence('job_id_seq'), primary_key=True ) start_time = Column(DateTime(timezone=True)) last_update = Column(DateTime(timezone=True)) name = Column(String) state = Column(String) num_files = Column(Integer) current_file_num = Column(Integer) current_file = Column(String) wait_for = Column(Integer) pa_job_state = Column(String) logs = relationship( "Joblog") extra = relationship( "JobExtra") def __repr__(self): return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) class PA_JobManager_FE_Message(Base): __tablename__ = "pa_job_manager_fe_message" id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id'), primary_key=True ) alert = Column(String) message = Column(String) def __repr__(self): return "job id={} {}".format( job.id, job.id, job.name ) ) # force commit to make job.id be valid in use of wait_for later session.commit() jex2=JobExtra( name="path", value=path ) job2=Job(start_time=now, last_update=now, name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 ) job2.extra.append(jex2) session.add(job2) session.commit() """ if parent_job: AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) ) jex3=JobExtra( name="path", value=path ) job3=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 ) job3.extra.append(jex3) session.add(job3) session.commit() if parent_job: AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) ) """ HandleJobs() return def AddLogForJob(job, message, current_file=''): now=datetime.now(pytz.utc) log=Joblog( job_id=job.id, log=message, log_date=now ) job.last_update=now if current_file != '': job.current_file=os.path.basename(current_file) # this may not be set on an import job if job.num_files: job.current_file_num=job.current_file_num+1 session.add(log) return def RunJob(job): # session = Session() job.start_time=datetime.now(pytz.utc) if job.name =="scannow": JobScanNow(job) elif job.name =="forcescan": JobForceScan(job) elif job.name =="importdir": JobImportDir(job) elif job.name =="getfiledetails": JobGetFileDetails(job) elif job.name == "processai": JobProcessAI(job) else: print("ERROR: Requested to process unknown job type: {}".format(job.name)) # okay, we finished a job, so check for any jobs that are dependant on this and run them... # session.close() if job.pa_job_state != "Completed": FinishJob(job, "PA Job Manager - This is a catchall to close of a Job, this sould never be seen and implies a job did not complete formally?", "Failed" ) HandleJobs() return def CancelJob(job,id): for j in session.query(Job).filter(Job.wait_for==id).all(): if DEBUG==1: print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(j.id, job.id) ) FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) CancelJob(j, j.id) return def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): job.state=state job.pa_job_state=pa_job_state job.last_update=datetime.now(pytz.utc) AddLogForJob(job, last_log) if job.state=="Failed": CancelJob(job,job.id) session.commit() return def HandleJobs(): print("INFO: PA job manager is scanning for new jobs to process") for job in session.query(Job).all(): if job.pa_job_state == 'New': if job.wait_for != None: j2 = session.query(Job).get(job.wait_for) if not j2: print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for )) for j in session.query(Job).all(): print ("j={}".format(j.id)) continue if j2.pa_job_state != 'Completed': continue # use this to remove threads for easier debugging, and errors will stacktrace to the console if DEBUG==1: print("*************************************") print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for )) RunJob(job) else: try: RunJob(job) # threading.Thread(target=RunJob, args=(job,)).start() except Exception as e: try: MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) except Exception as e2: print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) ) print("INFO: PA job manager is waiting jobs") return def JobProgressState( job, state ): job.pa_job_state = "In Progress" job.state=state session.commit() return def JobScanNow(job): JobProgressState( job, "In Progress" ) ProcessImportDirs(job) FinishJob( job, "Completed (scan for new files)" ) MessageToFE( job.id, "success", "Completed (scan for new files)" ) session.commit() return def JobForceScan(job): JobProgressState( job, "In Progress" ) session.query(FileRefimgLink).delete() session.query(EntryDirLink).delete() session.query(Dir).delete() session.query(File).delete() session.query(Entry).delete() session.commit() ProcessImportDirs(job) FinishJob(job, "Completed (forced remove and recreation of all file data)") MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" ) session.commit() return def SymlinkName(path, file): sig_bit=file.replace(path, "") last_dir=os.path.basename(path[0:-1]) if sig_bit[-1] == '/': last_bit = os.path.dirname(sig_bit)[0:-1] else: last_bit = os.path.dirname(sig_bit) symlink = 'static/'+last_dir+'/'+last_bit if symlink[-1] == '/': symlink=symlink[0:-1] return symlink # to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists def CreateSymlink(job,path): symlink='static/{}'.format(os.path.basename(path[0:-1])) if not os.path.exists(symlink): os.symlink(path, symlink) return symlink def AddDir(job, dirname, path_prefix, in_dir): dir=session.query(Dir).filter(Dir.path_prefix==path_prefix).first() if dir: e=session.query(Entry).get(dir.eid) e.exists_on_fs=True return dir dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0 ) dtype=session.query(FileType).filter(FileType.name=='Directory').first() e=Entry( name=dirname, type=dtype, exists_on_fs=True ) e.dir_details.append(dir) # no in_dir occurs when we Add the actual Dir for the import_path (top of the tree) if in_dir: e.in_dir.append(in_dir) if DEBUG==1: print(f"AddDir: created d={dirname}, pp={path_prefix}") AddLogForJob(job, f"DEBUG: Process new dir: {dirname}") session.add(e) return dir def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ): e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first() if e: e.exists_on_fs=True return e ftype = session.query(FileType).filter(FileType.name==type_str).first() e=Entry( name=fname, type=ftype, exists_on_fs=True ) f=File( size_mb=fsize, last_hash_date=0, faces_created_on=0, year=year, month=month, day=day, woy=woy ) e.file_details.append(f) e.in_dir.append(in_dir) AddLogForJob(job, "Found new file: {}".format(fname) ) session.add(e) return e # reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that # is still false, has been deleted def ResetExistsOnFS(job, path): reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).filter(Dir.path_prefix.ilike(path+'%')).all() for reset_dir in reset_dirs: reset_dir.exists_on_fs=False session.add(reset_dir) reset_files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==reset_dir.id).all() for reset_file in reset_files: reset_file.exists_on_fs=False session.add(reset_file) return def HandleAnyFSDeletions(job): dtype=session.query(FileType).filter(FileType.name=='Directory').first() rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all() rm_cnt=0 for rm in rms: session.query(EntryDirLink).filter(EntryDirLink.entry_id==rm.id).delete() session.query(File).filter(File.eid==rm.id).delete() session.query(Entry).filter(Entry.id==rm.id).delete() AddLogForJob( job, f"INFO: Removing {rm.name} from system as it is no longer on the file system") rm_cnt+=1 rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==1).order_by(Entry.id.desc()).all() for rmdir in rmdirs: print(f"We have a directory ({rmdir.name}) to delete from DB as it no longer exists on fs"); session.query(EntryDirLink).filter(EntryDirLink.entry_id==rmdir.id).delete() session.query(Dir).filter(Dir.eid==rmdir.id).delete() session.query(Entry).filter(Entry.id==rmdir.id).delete() AddLogForJob( job, f"INFO: Removing {rmdir.name} from system as it is no longer on the file system") rm_cnt+=1 return rm_cnt def JobImportDir(job): JobProgressState( job, "In Progress" ) settings = session.query(Settings).first() if settings == None: raise Exception("Cannot create file data with no settings / import path is missing") path=[jex.value for jex in job.extra if jex.name == "path"][0] AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) if DEBUG==1: print("DEBUG: Checking Import Directory: {}".format( path ) ) if not os.path.exists( path ): FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) return symlink=CreateSymlink(job,path) ResetExistsOnFS(job, symlink) overall_file_cnt=0 walk=os.walk(path, topdown=True) # root == path of dir, files are in dir... subdirs are in dir parent_dir=None for root, subdirs, files in walk: overall_file_cnt+= len(subdirs) + len(files) if root == path: pp = symlink else: pp=SymlinkName( path, root )+'/'+os.path.basename(root) if root[-1]=="/": root=root[0:-1] dir=AddDir(job, os.path.basename(root), pp, parent_dir) parent_dir=dir for basename in files: # commit every 100 files to see progress being made but not hammer the database if job.current_file_num % 100 == 0: session.commit() fname=dir.path_prefix+'/'+basename stat = os.stat(fname) if stat.st_ctime > dir.last_import_date: AddLogForJob(job, f"Processing new/update file: {basename}", basename ) if DEBUG==1: print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) ) if isImage(fname): type_str = 'Image' elif isVideo(fname): type_str = 'Video' else: type_str = 'Unknown' fsize = round(stat.st_size/(1024*1024)) m=re.search( '(\d{4})(\d{2})(\d{2})', fname) if m: year=int(m[1]) month=int(m[2]) day=int(m[3]) else: year, month, day, _, _, _, _, _, _ = datetime.fromtimestamp(stat.st_ctime).timetuple() print(f"year={year}, month={month}, day={day}") c=date(year, month, day).isocalendar() woy=c[1] e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy ) else: e=session.query(Entry).filter(Entry.name==basename).first() e.exists_on_fs=True if DEBUG==1: print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename ) job.current_file=basename job.current_file_num+=1 dir.num_files=len(files)+len(subdirs) dir.last_import_date = time.time() job.num_files=overall_file_cnt job.current_file_num=overall_file_cnt rm_cnt=HandleAnyFSDeletions(job) # reset overall path with overall_file_cnt, we use this for future jobs # to measure progress when dealing with this path import_dir=session.query(Dir).filter(Dir.path_prefix==symlink).first() import_dir.num_files=overall_file_cnt session.add(import_dir) FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)") return def RunFuncOnFilesInPath( job, path, file_func ): d=session.query(Dir).filter(Dir.path_prefix==path).first() for e in d.files: ProcessFilesInDir(job, e, file_func) def JobProcessAI(job): path=[jex.value for jex in job.extra if jex.name == "path"][0] path = SymlinkName(path, '/') d=session.query(Dir).filter(Dir.path_prefix==path).first() job.num_files=d.num_files people = session.query(Person).all() for person in people: generateKnownEncodings(person) RunFuncOnFilesInPath( job, path, ProcessAI ) FinishJob(job, "Finished Processesing AI") return def GenHashAndThumb(job, e): # commit every 100 files to see progress being made but not hammer the database if job.current_file_num % 100 == 0: session.commit() stat = os.stat( e.in_dir[0].path_prefix + '/' + e.name ) if stat.st_ctime < e.file_details[0].last_hash_date: # print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this") job.current_file_num+=1 return e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name ) if e.type.name == 'Image': e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) elif e.type.name == 'Video': e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) elif e.type.name == 'Unknown': job.current_file_num+=1 e.file_details[0].last_hash_date = time.time() return def ProcessAI(job, e): if e.type.name != 'Image': job.current_file_num+=1 print("DDP: ProcessAI: adding 1 to current_file_num as we have a non-image file") return file = e.in_dir[0].path_prefix + '/' + e.name stat = os.stat(file) # find if file is newer than when we found faces before (fyi: first time faces_created_on == 0) if stat.st_ctime > e.file_details[0].faces_created_on: session.add(e) im_orig = Image.open(file) im = ImageOps.exif_transpose(im_orig) faces = generateUnknownEncodings(im) e.file_details[0].faces_created_on=time.time() if faces: flat_faces = numpy.array(faces) e.file_details[0].faces = flat_faces.tobytes() else: e.file_details[0].faces = None job.current_file_num+=1 return else: if not e.file_details[0].faces: print("OPTIM: This image has no faces, skip it") job.current_file_num+=1 return recover=numpy.frombuffer(e.file_details[0].faces,dtype=numpy.float64) real_recover=numpy.reshape(recover,(-1,128)) l=[] for el in real_recover: l.append(numpy.array(el)) faces = l people = session.query(Person).all() for unknown_encoding in faces: for person in people: lookForPersonInImage(job, person, unknown_encoding, e) AddLogForJob(job, f"Finished processing {e.name}", e.name ) return def lookForPersonInImage(job, person, unknown_encoding, e): for refimg in person.refimg: # lets see if we have tried this check before frl=session.query(FileRefimgLink).filter(FileRefimgLink.file_id==e.id, FileRefimgLink.refimg_id==refimg.id).first() if not frl: frl = FileRefimgLink(refimg_id=refimg.id, file_id=e.file_details[0].eid) else: stat=os.stat(e.in_dir[0].path_prefix+'/'+ e.name) # file & refimg are not newer then we dont need to check if frl.matched and stat.st_ctime < frl.when_processed and refimg.created_on < frl.when_processed: print(f"OPTIM: lookForPersonInImage: file {e.name} has a previous match for: {refimg.fname}, and the file & refimg haven't changed") return session.add(frl) frl.matched=False frl.when_processed=time.time() deserialized_bytes = numpy.frombuffer(refimg.encodings, dtype=numpy.float64) results = compareAI(deserialized_bytes, unknown_encoding) if results[0]: print(f'Found a match between: {person.tag} and {e.name}') AddLogForJob(job, f'Found a match between: {person.tag} and {e.name}') frl.matched=True return def generateUnknownEncodings(im): unknown_image = numpy.array(im) face_locations = face_recognition.face_locations(unknown_image) if not face_locations: return None unknown_encodings = face_recognition.face_encodings(unknown_image, known_face_locations=face_locations) return unknown_encodings def generateKnownEncodings(person): for refimg in person.refimg: file = 'reference_images/'+refimg.fname stat = os.stat(file) if refimg.created_on and stat.st_ctime < refimg.created_on: print("OPTIM: skipping re-creating encoding for refimg because file has not changed") continue img = face_recognition.load_image_file(file) location = face_recognition.face_locations(img) encodings = face_recognition.face_encodings(img, known_face_locations=location) refimg.encodings = encodings[0].tobytes() refimg.created_on = time.time() session.add(refimg) session.commit() def compareAI(known_encoding, unknown_encoding): results = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.55) return results def ProcessFilesInDir(job, e, file_func): if DEBUG==1: print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix)) if e.type.name != 'Directory': file_func(job, e) else: dir=session.query(Dir).filter(Dir.eid==e.id).first() job.current_file_num+=1 for sub in dir.files: ProcessFilesInDir(job, sub, file_func) def JobGetFileDetails(job): JobProgressState( job, "In Progress" ) path=[jex.value for jex in job.extra if jex.name == "path"][0] path='static'+'/'+os.path.basename(path[0:-1]) if DEBUG==1: print("DEBUG: JobGetFileDetails for path={}".format( path ) ) dir=session.query(Dir).filter(Dir.path_prefix==path).first() job.current_file_num = 0 job.num_files = dir.num_files session.commit() RunFuncOnFilesInPath( job, path, GenHashAndThumb ) FinishJob(job, "File Details job finished") session.commit() return def isVideo(file): try: fileInfo = MediaInfo.parse(file) for track in fileInfo.tracks: if track.track_type == "Video": return True return False except Exception as e: return False # Returns an md5 hash of the fnames' contents def md5(job, fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) hash = hash_md5.hexdigest() AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) ) return hash def isImage(file): try: img = Image.open(file) return True except: return False def GenImageThumbnail(job, file): AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file ) try: im_orig = Image.open(file) im = ImageOps.exif_transpose(im_orig) im.thumbnail((THUMBSIZE,THUMBSIZE)) img_bytearray = io.BytesIO() im.save(img_bytearray, format='JPEG') img_bytearray = img_bytearray.getvalue() thumbnail = base64.b64encode(img_bytearray) thumbnail = str(thumbnail)[2:-1] except Exception as e: print('WARNING: NO EXIF TAGS?!?!?!?') AddLogForJob(job, f"WARNING: No EXIF TAF found for: {file} - error={e}") return None return thumbnail def GenVideoThumbnail(job, file): AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file ) try: vcap = cv2.VideoCapture(file) res, frame = vcap.read() # if the mean pixel value is > 15, we have something worth making a sshot of (no black frame at start being the sshot) while frame.mean() < 15 and res: res, frame = vcap.read() w = vcap.get(cv2.cv2.CAP_PROP_FRAME_WIDTH) h = vcap.get(cv2.cv2.CAP_PROP_FRAME_HEIGHT) if w > h: factor = w / THUMBSIZE else: factor = h / THUMBSIZE new_h = int(h / factor) new_w = int(w / factor) frame = cv2.resize(frame, (new_w, new_h), 0, 0, cv2.INTER_LINEAR) res, thumb_buf = cv2.imencode('.jpeg', frame) bt = thumb_buf.tobytes() thumbnail = base64.b64encode(bt) thumbnail = str(thumbnail)[2:-1] except Exception as e: AddLogForJob( job, f"ERROR: Failed to Generate thumbnail for video file: {file} - error={e}" ) return None return thumbnail if __name__ == "__main__": print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) ) ProcessImportDirs() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) s.listen() while True: conn, addr = s.accept() HandleJobs()