### # # This file controls the 'external' job control manager, that (periodically # # looks / somehow is pushed an event?) picks up new jobs, and processes them. # # It then stores the progress/status, etc. in job and joblog tables as needed # via wrapper functions. # # The whole pa_job_manager is multi-threaded, and uses the database tables for # state management and communication back to the pa web site # ### ### SQLALCHEMY IMPORTS ### from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session ### SQLALCHEMY IMPORTS ### ### LOCAL FILE IMPORTS ### from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE from datetime import datetime, timedelta, date import pytz import time import os import glob from PIL import Image, ImageOps from pymediainfo import MediaInfo import hashlib import exifread import base64 import numpy import cv2 import socket import threading import io import face_recognition import re DEBUG=1 # an Manager, which the Session will use for connection resources some_engine = create_engine(DB_URL) # create a configured "Session" class #Session = sessionmaker(bind=some_engine) # create a Session session_factory = sessionmaker(bind=some_engine) Session = scoped_session(session_factory) session = Session() Base = declarative_base() ################################################################################ # Class describing File in the database, and via sqlalchemy, connected to the DB as well # This has to match one-for-one the DB table ################################################################################ class EntryDirLink(Base): __tablename__ = "entry_dir_link" entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True ) dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True ) def __repr__(self): return f"" class Dir(Base): __tablename__ = "dir" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) path_prefix = Column(String, unique=True, nullable=False ) num_files = Column(Integer) last_import_date = Column(Float) files = relationship("Entry", secondary="entry_dir_link") def __repr__(self): return f"" class Entry(Base): __tablename__ = "entry" id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) name = Column(String, unique=False, nullable=False ) type_id = Column(Integer, ForeignKey("file_type.id")) exists_on_fs=Column(Boolean) type=relationship("FileType") dir_details = relationship( "Dir") file_details = relationship( "File" ) in_dir = relationship ("Dir", secondary="entry_dir_link" ) def __repr__(self): return f"" class FileRefimgLink(Base): __tablename__ = "file_refimg_link" file_id = Column(Integer, ForeignKey('file.eid'), unique=True, nullable=False, primary_key=True) refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True) when_processed = Column(Float) matched = Column(Boolean) def __repr__(self): return f"" class FileType(Base): __tablename__ = "file_type" id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True ) name = Column(String, unique=True, nullable=False ) def __repr__(self): return f"" class Settings(Base): __tablename__ = "settings" id = Column(Integer, Sequence('settings_id_seq'), primary_key=True ) import_path = Column(String) def __repr__(self): return f"" class PersonRefimgLink(Base): __tablename__ = "person_refimg_link" person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True) refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True) def __repr__(self): return f"" class Person(Base): __tablename__ = "person" id = Column(Integer, Sequence('person_id_seq'), primary_key=True ) tag = Column(String(48), unique=False, nullable=False) surname = Column(String(48), unique=False, nullable=False) firstname = Column(String(48), unique=False, nullable=False) refimg = relationship('Refimg', secondary=PersonRefimgLink.__table__) def __repr__(self): return f"" class Refimg(Base): __tablename__ = "refimg" id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True ) fname = Column(String(256), unique=True, nullable=False) encodings = Column(LargeBinary) created_on = Column(Float) def __repr__(self): return f"" ################################################################################ # classes for the job manager: # Job (and Joblog, JobExtra) for each Job, and # PA_Jobmanager_fe_message (to pass messages to the front-end web) ################################################################################ class Joblog(Base): __tablename__ = "joblog" id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id') ) log_date = Column(DateTime(timezone=True)) log = Column(String) def __repr__(self): return "".format(self.id, self.job_id, self.name, self.value ) class Job(Base): __tablename__ = "job" id = Column(Integer, Sequence('job_id_seq'), primary_key=True ) start_time = Column(DateTime(timezone=True)) last_update = Column(DateTime(timezone=True)) name = Column(String) state = Column(String) num_files = Column(Integer) current_file_num = Column(Integer) current_file = Column(String) wait_for = Column(Integer) pa_job_state = Column(String) logs = relationship( "Joblog") extra = relationship( "JobExtra") def __repr__(self): return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) class PA_JobManager_FE_Message(Base): __tablename__ = "pa_job_manager_fe_message" id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id'), primary_key=True ) alert = Column(String) message = Column(String) def __repr__(self): return "job id={} {}".format( job.id, job.id, job.name ) ) # force commit to make job.id be valid in use of wait_for later session.commit() jex2=JobExtra( name="path", value=path ) job2=Job(start_time=now, last_update=now, name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 ) job2.extra.append(jex2) session.add(job2) session.commit() if parent_job: AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) ) """ jex3=JobExtra( name="path", value=path ) job3=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 ) job3.extra.append(jex3) session.add(job3) session.commit() if parent_job: AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) ) """ HandleJobs() return def AddLogForJob(job, message, current_file=''): now=datetime.now(pytz.utc) log=Joblog( job_id=job.id, log=message, log_date=now ) job.last_update=now if current_file != '': job.current_file=os.path.basename(current_file) # this may not be set on an import job if job.num_files: job.current_file_num=job.current_file_num+1 session.add(log) return def RunJob(job): # session = Session() job.start_time=datetime.now(pytz.utc) if job.name =="scannow": JobScanNow(job) elif job.name =="forcescan": JobForceScan(job) elif job.name =="importdir": JobImportDir(job) elif job.name =="getfiledetails": JobGetFileDetails(job) elif job.name == "processai": JobProcessAI(job) else: print("ERROR: Requested to process unknown job type: {}".format(job.name)) # okay, we finished a job, so check for any jobs that are dependant on this and run them... # session.close() if job.pa_job_state != "Completed": FinishJob(job, "PA Job Manager - This is a catchall to close of a Job, this sould never be seen and implies a job did not complete formally?", "Failed" ) HandleJobs() return def CancelJob(job,id): for j in session.query(Job).filter(Job.wait_for==id).all(): if DEBUG==1: print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(j.id, job.id) ) FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) CancelJob(j, j.id) return def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): job.state=state job.pa_job_state=pa_job_state job.last_update=datetime.now(pytz.utc) AddLogForJob(job, last_log) if job.state=="Failed": CancelJob(job,job.id) session.commit() return def HandleJobs(): print("INFO: PA job manager is scanning for new jobs to process") for job in session.query(Job).all(): if job.pa_job_state == 'New': if job.wait_for != None: j2 = session.query(Job).get(job.wait_for) if not j2: print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for )) for j in session.query(Job).all(): print ("j={}".format(j.id)) continue if j2.pa_job_state != 'Completed': continue # use this to remove threads for easier debugging, and errors will stacktrace to the console if DEBUG==1: print("*************************************") print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for )) RunJob(job) else: try: RunJob(job) # threading.Thread(target=RunJob, args=(job,)).start() except Exception as e: try: MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) except Exception as e2: print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) ) print("INFO: PA job manager is waiting jobs") return def JobProgressState( job, state ): job.pa_job_state = "In Progress" job.state=state session.commit() return def JobScanNow(job): JobProgressState( job, "In Progress" ) ProcessImportDirs(job) FinishJob( job, "Completed (scan for new files)" ) MessageToFE( job.id, "success", "Completed (scan for new files)" ) session.commit() return def JobForceScan(job): JobProgressState( job, "In Progress" ) session.query(FileRefimgLink).delete() session.query(EntryDirLink).delete() session.query(Dir).delete() session.query(File).delete() session.query(Entry).delete() session.commit() ProcessImportDirs(job) FinishJob(job, "Completed (forced remove and recreation of all file data)") MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" ) session.commit() return def SymlinkName(path, file): sig_bit=file.replace(path, "") last_dir=os.path.basename(path[0:-1]) if sig_bit[-1] == '/': last_bit = os.path.dirname(sig_bit)[0:-1] else: last_bit = os.path.dirname(sig_bit) symlink = 'static/'+last_dir+'/'+last_bit if symlink[-1] == '/': symlink=symlink[0:-1] return symlink # to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists def CreateSymlink(job,path): symlink='static/{}'.format(os.path.basename(path[0:-1])) if not os.path.exists(symlink): os.symlink(path, symlink) return symlink def AddDir(job, dirname, path_prefix, in_dir): dir=session.query(Dir).filter(Dir.path_prefix==path_prefix).first() if dir: e=session.query(Entry).get(dir.eid) e.exists_on_fs=True return dir dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0 ) dtype=session.query(FileType).filter(FileType.name=='Directory').first() e=Entry( name=dirname, type=dtype, exists_on_fs=True ) e.dir_details.append(dir) # no in_dir occurs when we Add the actual Dir for the import_path (top of the tree) if in_dir: e.in_dir.append(in_dir) if DEBUG==1: print(f"AddDir: created d={dirname}, pp={path_prefix}") AddLogForJob(job, f"DEBUG: Process new dir: {dirname}") session.add(e) return dir def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ): e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first() if e: e.exists_on_fs=True return e ftype = session.query(FileType).filter(FileType.name==type_str).first() e=Entry( name=fname, type=ftype, exists_on_fs=True ) f=File( size_mb=fsize, last_hash_date=0, faces_created_on=0, year=year, month=month, day=day, woy=woy ) e.file_details.append(f) e.in_dir.append(in_dir) AddLogForJob(job, "Found new file: {}".format(fname) ) session.add(e) return e # reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that # is still false, has been deleted def ResetExistsOnFS(job, path): reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).filter(Dir.path_prefix.ilike(path+'%')).all() for reset_dir in reset_dirs: reset_dir.exists_on_fs=False session.add(reset_dir) reset_files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==reset_dir.id).all() for reset_file in reset_files: reset_file.exists_on_fs=False session.add(reset_file) return def HandleAnyFSDeletions(job): dtype=session.query(FileType).filter(FileType.name=='Directory').first() rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all() rm_cnt=0 for rm in rms: session.query(EntryDirLink).filter(EntryDirLink.entry_id==rm.id).delete() session.query(File).filter(File.eid==rm.id).delete() session.query(Entry).filter(Entry.id==rm.id).delete() AddLogForJob( job, f"INFO: Removing {rm.name} from system as it is no longer on the file system") rm_cnt+=1 rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==1).order_by(Entry.id.desc()).all() for rmdir in rmdirs: print(f"We have a directory ({rmdir.name}) to delete from DB as it no longer exists on fs"); session.query(EntryDirLink).filter(EntryDirLink.entry_id==rmdir.id).delete() session.query(Dir).filter(Dir.eid==rmdir.id).delete() session.query(Entry).filter(Entry.id==rmdir.id).delete() AddLogForJob( job, f"INFO: Removing {rmdir.name} from system as it is no longer on the file system") rm_cnt+=1 return rm_cnt def GetDateFromFile(file, stat): # try exif try: print(f"trying exif read of {file}") f = open(file, 'rb') tags = exifread.process_file(f) date_str, time_str = str(tags["EXIF DateTimeOriginal"]).split(" ") print(date_str) year, month, day = date_str.split(":") year=int(year) month=int(month) day=int(day) print(year) check = datetime( year, month, day ) print( f"check={check}" ) except: # try parsing filename try: m=re.search( '(\d{4})(\d{2})(\d{2})', file) year=int(m[1]) month=int(m[2]) day=int(m[3]) check2 = datetime( year, month, day ) print( f"check2={check2}" ) # give up and use file sys date except: year, month, day, _, _, _, _, _, _ = datetime.fromtimestamp(stat.st_ctime).timetuple() c=date(year, month, day).isocalendar() woy=c[1] print(f"DEL ME: year={year}, month={month}, day={day}") return year, month, day, woy def JobImportDir(job): JobProgressState( job, "In Progress" ) settings = session.query(Settings).first() if settings == None: raise Exception("Cannot create file data with no settings / import path is missing") path=[jex.value for jex in job.extra if jex.name == "path"][0] AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) if DEBUG==1: print("DEBUG: Checking Import Directory: {}".format( path ) ) if not os.path.exists( path ): FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) return symlink=CreateSymlink(job,path) ResetExistsOnFS(job, symlink) overall_file_cnt=0 walk=os.walk(path, topdown=True) # root == path of dir, files are in dir... subdirs are in dir parent_dir=None for root, subdirs, files in walk: overall_file_cnt+= len(subdirs) + len(files) if root == path: pp = symlink else: pp=SymlinkName( path, root )+'/'+os.path.basename(root) if root[-1]=="/": root=root[0:-1] dir=AddDir(job, os.path.basename(root), pp, parent_dir) parent_dir=dir for basename in files: # commit every 100 files to see progress being made but not hammer the database if job.current_file_num % 100 == 0: session.commit() fname=dir.path_prefix+'/'+basename stat = os.stat(fname) if stat.st_ctime > dir.last_import_date: AddLogForJob(job, f"Processing new/update file: {basename}", basename ) if DEBUG==1: print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) ) if isImage(fname): type_str = 'Image' elif isVideo(fname): type_str = 'Video' else: type_str = 'Unknown' fsize = round(stat.st_size/(1024*1024)) year, month, day, woy = GetDateFromFile(fname, stat) e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy ) else: e=session.query(Entry).filter(Entry.name==basename).first() e.exists_on_fs=True if DEBUG==1: print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename ) job.current_file=basename job.current_file_num+=1 dir.num_files=len(files)+len(subdirs) dir.last_import_date = time.time() job.num_files=overall_file_cnt job.current_file_num=overall_file_cnt rm_cnt=HandleAnyFSDeletions(job) # reset overall path with overall_file_cnt, we use this for future jobs # to measure progress when dealing with this path import_dir=session.query(Dir).filter(Dir.path_prefix==symlink).first() import_dir.num_files=overall_file_cnt session.add(import_dir) FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)") return def RunFuncOnFilesInPath( job, path, file_func ): d=session.query(Dir).filter(Dir.path_prefix==path).first() for e in d.files: ProcessFilesInDir(job, e, file_func) def JobProcessAI(job): path=[jex.value for jex in job.extra if jex.name == "path"][0] path = SymlinkName(path, '/') d=session.query(Dir).filter(Dir.path_prefix==path).first() job.num_files=d.num_files people = session.query(Person).all() for person in people: generateKnownEncodings(person) RunFuncOnFilesInPath( job, path, ProcessAI ) FinishJob(job, "Finished Processesing AI") return def GenHashAndThumb(job, e): # commit every 100 files to see progress being made but not hammer the database if job.current_file_num % 100 == 0: session.commit() stat = os.stat( e.in_dir[0].path_prefix + '/' + e.name ) if stat.st_ctime < e.file_details[0].last_hash_date: # print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this") job.current_file_num+=1 return e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name ) if e.type.name == 'Image': e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) elif e.type.name == 'Video': e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) elif e.type.name == 'Unknown': job.current_file_num+=1 e.file_details[0].last_hash_date = time.time() return def ProcessAI(job, e): if e.type.name != 'Image': job.current_file_num+=1 print("DDP: ProcessAI: adding 1 to current_file_num as we have a non-image file") return file = e.in_dir[0].path_prefix + '/' + e.name stat = os.stat(file) # find if file is newer than when we found faces before (fyi: first time faces_created_on == 0) if stat.st_ctime > e.file_details[0].faces_created_on: session.add(e) im_orig = Image.open(file) im = ImageOps.exif_transpose(im_orig) faces = generateUnknownEncodings(im) e.file_details[0].faces_created_on=time.time() if faces: flat_faces = numpy.array(faces) e.file_details[0].faces = flat_faces.tobytes() else: e.file_details[0].faces = None job.current_file_num+=1 return else: if not e.file_details[0].faces: print("OPTIM: This image has no faces, skip it") job.current_file_num+=1 return recover=numpy.frombuffer(e.file_details[0].faces,dtype=numpy.float64) real_recover=numpy.reshape(recover,(-1,128)) l=[] for el in real_recover: l.append(numpy.array(el)) faces = l people = session.query(Person).all() for unknown_encoding in faces: for person in people: lookForPersonInImage(job, person, unknown_encoding, e) AddLogForJob(job, f"Finished processing {e.name}", e.name ) return def lookForPersonInImage(job, person, unknown_encoding, e): for refimg in person.refimg: # lets see if we have tried this check before frl=session.query(FileRefimgLink).filter(FileRefimgLink.file_id==e.id, FileRefimgLink.refimg_id==refimg.id).first() if not frl: frl = FileRefimgLink(refimg_id=refimg.id, file_id=e.file_details[0].eid) else: stat=os.stat(e.in_dir[0].path_prefix+'/'+ e.name) # file & refimg are not newer then we dont need to check if frl.matched and stat.st_ctime < frl.when_processed and refimg.created_on < frl.when_processed: print(f"OPTIM: lookForPersonInImage: file {e.name} has a previous match for: {refimg.fname}, and the file & refimg haven't changed") return session.add(frl) frl.matched=False frl.when_processed=time.time() deserialized_bytes = numpy.frombuffer(refimg.encodings, dtype=numpy.float64) results = compareAI(deserialized_bytes, unknown_encoding) if results[0]: print(f'Found a match between: {person.tag} and {e.name}') AddLogForJob(job, f'Found a match between: {person.tag} and {e.name}') frl.matched=True return def generateUnknownEncodings(im): unknown_image = numpy.array(im) face_locations = face_recognition.face_locations(unknown_image) if not face_locations: return None unknown_encodings = face_recognition.face_encodings(unknown_image, known_face_locations=face_locations) return unknown_encodings def generateKnownEncodings(person): for refimg in person.refimg: file = 'reference_images/'+refimg.fname stat = os.stat(file) if refimg.created_on and stat.st_ctime < refimg.created_on: print("OPTIM: skipping re-creating encoding for refimg because file has not changed") continue img = face_recognition.load_image_file(file) location = face_recognition.face_locations(img) encodings = face_recognition.face_encodings(img, known_face_locations=location) refimg.encodings = encodings[0].tobytes() refimg.created_on = time.time() session.add(refimg) session.commit() def compareAI(known_encoding, unknown_encoding): results = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.55) return results def ProcessFilesInDir(job, e, file_func): if DEBUG==1: print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix)) if e.type.name != 'Directory': file_func(job, e) else: dir=session.query(Dir).filter(Dir.eid==e.id).first() job.current_file_num+=1 for sub in dir.files: ProcessFilesInDir(job, sub, file_func) def JobGetFileDetails(job): JobProgressState( job, "In Progress" ) path=[jex.value for jex in job.extra if jex.name == "path"][0] path='static'+'/'+os.path.basename(path[0:-1]) if DEBUG==1: print("DEBUG: JobGetFileDetails for path={}".format( path ) ) dir=session.query(Dir).filter(Dir.path_prefix==path).first() job.current_file_num = 0 job.num_files = dir.num_files session.commit() RunFuncOnFilesInPath( job, path, GenHashAndThumb ) FinishJob(job, "File Details job finished") session.commit() return def isVideo(file): try: fileInfo = MediaInfo.parse(file) for track in fileInfo.tracks: if track.track_type == "Video": return True return False except Exception as e: return False # Returns an md5 hash of the fnames' contents def md5(job, fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) hash = hash_md5.hexdigest() AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) ) return hash def isImage(file): try: img = Image.open(file) return True except: return False def GenImageThumbnail(job, file): AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file ) try: im_orig = Image.open(file) im = ImageOps.exif_transpose(im_orig) im.thumbnail((THUMBSIZE,THUMBSIZE)) img_bytearray = io.BytesIO() im.save(img_bytearray, format='JPEG') img_bytearray = img_bytearray.getvalue() thumbnail = base64.b64encode(img_bytearray) thumbnail = str(thumbnail)[2:-1] except Exception as e: print('WARNING: NO EXIF TAGS?!?!?!?') AddLogForJob(job, f"WARNING: No EXIF TAF found for: {file} - error={e}") return None return thumbnail def GenVideoThumbnail(job, file): AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file ) try: vcap = cv2.VideoCapture(file) res, frame = vcap.read() # if the mean pixel value is > 15, we have something worth making a sshot of (no black frame at start being the sshot) while frame.mean() < 15 and res: res, frame = vcap.read() w = vcap.get(cv2.cv2.CAP_PROP_FRAME_WIDTH) h = vcap.get(cv2.cv2.CAP_PROP_FRAME_HEIGHT) if w > h: factor = w / THUMBSIZE else: factor = h / THUMBSIZE new_h = int(h / factor) new_w = int(w / factor) frame = cv2.resize(frame, (new_w, new_h), 0, 0, cv2.INTER_LINEAR) res, thumb_buf = cv2.imencode('.jpeg', frame) bt = thumb_buf.tobytes() thumbnail = base64.b64encode(bt) thumbnail = str(thumbnail)[2:-1] except Exception as e: AddLogForJob( job, f"ERROR: Failed to Generate thumbnail for video file: {file} - error={e}" ) return None return thumbnail if __name__ == "__main__": print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) ) ProcessImportDirs() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) s.listen() while True: conn, addr = s.accept() HandleJobs()