Files
photoassistant/pa_job_manager.py

680 lines
26 KiB
Python

###
#
# This file controls the 'external' job control manager, that (periodically #
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
#
# It then stores the progress/status, etc. in job and joblog tables as needed
# via wrapper functions.
#
# The whole pa_job_manager is multi-threaded, and uses the database tables for
# state management and communication back to the pa web site
#
###
### SQLALCHEMY IMPORTS ###
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
### SQLALCHEMY IMPORTS ###
### LOCAL FILE IMPORTS ###
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
from datetime import datetime, timedelta
import pytz
import time
import os
import glob
from PIL import Image, ImageOps
from pymediainfo import MediaInfo
import hashlib
#import exifread
import base64
import numpy
import cv2
import socket
import threading
import io
import face_recognition
DEBUG=1
# an Manager, which the Session will use for connection resources
some_engine = create_engine(DB_URL)
# create a configured "Session" class
#Session = sessionmaker(bind=some_engine)
# create a Session
session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)
session = Session()
Base = declarative_base()
################################################################################
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
# This has to match one-for-one the DB table
################################################################################
class EntryDirLink(Base):
__tablename__ = "entry_dir_link"
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return "<entry_id: {}, dir_eid: {}>".format(self.entry_id, self.dir_eid)
class Dir(Base):
__tablename__ = "dir"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
path_prefix = Column(String, unique=False, nullable=False )
num_files = Column(Integer)
last_import_date = Column(Float)
last_hash_date = Column(Float)
files = relationship("Entry", secondary="entry_dir_link")
def __repr__(self):
return "<eid: {}, path_prefix: {}, num_files: {}, last_import_date: {}, last_hash_date: {}>".format(self.eid, self.path_prefix, self.num_files, self.last_import_date, self.last_hash_date)
class Entry(Base):
__tablename__ = "entry"
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
type_id = Column(Integer, ForeignKey("file_type.id"))
type=relationship("FileType")
dir_details = relationship( "Dir")
file_details = relationship( "File" )
in_dir = relationship ("Dir", secondary="entry_dir_link" )
def __repr__(self):
return "<id: {}, name: {}, type={}, dir_details={}, file_details={}, in_dir={}>".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir)
class File(Base):
__tablename__ = "file"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
size_mb = Column(Integer, unique=False, nullable=False)
hash = Column(Integer, unique=True, nullable=True)
thumbnail = Column(String, unique=False, nullable=True)
def __repr__(self):
return "<eid: {}, size_mb={}, hash={}>".format(self.eid, self.size_mb, self.hash )
class FileType(Base):
__tablename__ = "file_type"
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
def __repr__(self):
return "<id: {}, name={}>".format(self.id, self.name )
class Settings(Base):
__tablename__ = "settings"
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
import_path = Column(String)
def __repr__(self):
return "<id: {}, import_path: {}>".format(self.id, self.import_path )
class Person_Refimg_Link(Base):
__tablename__ = "person_refimg_link"
person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True)
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
def __repr__(self):
return "<person_id: {}, refimg_id>".format(self.person_id, self.refimg_id)
class Person(Base):
__tablename__ = "person"
id = Column(Integer, Sequence('person_id_seq'), primary_key=True )
tag = Column(String(48), unique=False, nullable=False)
surname = Column(String(48), unique=False, nullable=False)
firstname = Column(String(48), unique=False, nullable=False)
refimg = relationship('Refimg', secondary=Person_Refimg_Link.__table__)
def __repr__(self):
return "<tag: {}, firstname: {}, surname: {}, refimg: {}>".format(self.tag,self.firstname, self.surname, self.refimg)
class Refimg(Base):
__tablename__ = "refimg"
id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True )
fname = Column(String(256), unique=True, nullable=False)
encodings = Column(LargeBinary)
def __repr__(self):
return "<id: {}, fname: {}, encodings: {}>".format(self.id, self.fname, self.encodings )
class File_Person_Link(Base):
__tablename__ = "file_person_link"
file_id = Column(Integer, ForeignKey('file.eid'), unique=True, nullable=False, primary_key=True)
person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True)
def __repr__(self):
return "<file_id: {}, person_id: {}>".format(self.file_id, self.person_id)
################################################################################
# classes for the job manager:
# Job (and Joblog, JobExtra) for each Job, and
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
################################################################################
class Joblog(Base):
__tablename__ = "joblog"
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
log_date = Column(DateTime(timezone=True))
log = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
class JobExtra(Base):
__tablename__ = "jobextra"
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
name = Column(String)
value = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
class Job(Base):
__tablename__ = "job"
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
start_time = Column(DateTime(timezone=True))
last_update = Column(DateTime(timezone=True))
name = Column(String)
state = Column(String)
num_files = Column(Integer)
current_file_num = Column(Integer)
current_file = Column(String)
wait_for = Column(Integer)
pa_job_state = Column(String)
logs = relationship( "Joblog")
extra = relationship( "JobExtra")
def __repr__(self):
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
class PA_JobManager_FE_Message(Base):
__tablename__ = "pa_job_manager_fe_message"
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id'), primary_key=True )
alert = Column(String)
message = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
##############################################################################
# Util (non Class) functions
##############################################################################
def MessageToFE( job_id, alert, message ):
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
session.add(msg)
session.commit()
def ProcessImportDirs(parent_job=None):
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
paths = settings.import_path.split("#")
for path in paths:
# make new Job; HandleJobs will make them run later
jex=JobExtra( name="path", value=path )
job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None, pa_job_state="New" )
job.extra.append(jex)
session.add(job)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a>".format( job.id, job.id, job.name ) )
# force commit to make job.id be valid in use of wait_for later
session.commit()
jex2=JobExtra( name="path", value=path )
job2=Job(start_time='now()', last_update='now()', name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 )
job2.extra.append(jex2)
session.add(job2)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) )
jex3=JobExtra( name="path", value=path )
job3=Job(start_time='now()', last_update='now()', name="processai", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 )
job3.extra.append(jex3)
session.add(job3)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
HandleJobs()
return
def AddLogForJob(job, message, current_file=''):
now=datetime.now(pytz.utc)
log=Joblog( job_id=job.id, log=message, log_date=now )
job.last_update=now
if current_file != '':
job.current_file=os.path.basename(current_file)
# this may not be set on an import job
if job.num_files:
job.current_file_num=job.current_file_num+1
session.add(log)
session.commit()
return
def RunJob(job):
# session = Session()
if job.name =="scannow":
JobScanNow(job)
elif job.name =="forcescan":
JobForceScan(job)
elif job.name =="importdir":
JobImportDir(job)
elif job.name =="getfiledetails":
JobGetFileDetails(job)
elif job.name == "processai":
JobProcessAI(job)
else:
print("ERROR: Requested to process unknown job type: {}".format(job.name))
# okay, we finished a job, so check for any jobs that are dependant on this and run them...
# session.close()
HandleJobs()
return
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
job.state=state
job.pa_job_state=pa_job_state
job.last_update=datetime.now(pytz.utc)
AddLogForJob(job, last_log)
return
def HandleJobs():
print("INFO: PA job manager is scanning for new jobs to process")
for job in session.query(Job).all():
if job.pa_job_state == 'New':
if job.wait_for != None:
j2 = session.query(Job).get(job.wait_for)
if not j2:
print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
for j in session.query(Job).all():
print ("j={}".format(j.id))
continue
if j2.pa_job_state != 'Completed':
continue
# use this to remove threads for easier debugging, and errors will stacktrace to the console
if DEBUG==1:
print("*************************************")
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
RunJob(job)
else:
try:
RunJob(job)
# threading.Thread(target=RunJob, args=(job,)).start()
except Exception as e:
try:
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
except Exception as e2:
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
print("INFO: PA job manager is waiting jobs")
return
def JobProgressState( job, state ):
job.pa_job_state = "In Progress"
job.state=state
session.commit()
return
def JobScanNow(job):
JobProgressState( job, "In Progress" )
ProcessImportDirs(job)
FinishJob( job, "Completed (scan for new files)" )
MessageToFE( job.id, "success", "Completed (scan for new files)" )
session.commit()
return
def JobForceScan(job):
JobProgressState( job, "In Progress" )
session.query(File_Person_Link).delete()
session.query(EntryDirLink).delete()
session.query(Dir).delete()
session.query(File).delete()
session.query(Entry).delete()
session.commit()
ProcessImportDirs(job)
FinishJob(job, "Completed (forced remove and recreation of all file data)")
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
session.commit()
return
def SymlinkName(path, file):
sig_bit=file.replace(path, "")
last_dir=os.path.basename(path[0:-1])
if sig_bit[-1] == '/':
last_bit = os.path.dirname(sig_bit)[0:-1]
else:
last_bit = os.path.dirname(sig_bit)
symlink = 'static'+'/'+last_dir+'/'+last_bit
if symlink[-1] == '/':
symlink=symlink[0:-1]
return symlink
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
def CreateSymlink(job,path):
symlink='static/{}'.format(os.path.basename(path[0:-1]))
if not os.path.exists(symlink):
os.symlink(path, symlink)
return symlink
def AddDir(job, dirname, path_prefix, in_dir):
# see if this exists already
dir=session.query(Dir).filter(Dir.path_prefix==dirname).first()
if dir:
if DEBUG==1:
print("Found {} returning DB object".format(dirname))
return dir
dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0, last_hash_date=0 )
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
e=Entry( name=dirname, type=dtype )
e.dir_details.append(dir)
# no in_dir occurs when we Add the actual Dir for the import_path (top of the tree)
if in_dir:
e.in_dir.append(in_dir)
if DEBUG==1:
print("AddDir: created {}".format(dirname))
AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) )
session.add(e)
return dir
def AddFile(job, fname, type_str, fsize, in_dir ):
ftype = session.query(FileType).filter(FileType.name==type_str).first()
e=Entry( name=fname, type=ftype )
f=File( size_mb=fsize )
e.file_details.append(f)
e.in_dir.append(in_dir)
AddLogForJob(job, "Found new file: {}".format(fname) )
session.add(e)
return e
def JobImportDir(job):
JobProgressState( job, "In Progress" )
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
path=[jex.value for jex in job.extra if jex.name == "path"][0]
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
if DEBUG==1:
print("DEBUG: Checking Import Directory: {}".format( path ) )
if not os.path.exists( path ):
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
for j in session.query(Job).filter(Job.wait_for==job.id).all():
if DEBUG==1:
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
return
symlink=CreateSymlink(job,path)
overall_file_cnt=0
walk=os.walk(path, topdown=True)
# root == path of dir, files are in dir... subdirs are in dir
parent_dir=None
for root, subdirs, files in walk:
overall_file_cnt+= len(subdirs) + len(files)
if root == path:
pp = symlink
else:
pp=SymlinkName( path, root )+'/'+os.path.basename(root)
dir=AddDir(job, os.path.basename(root), pp, parent_dir)
parent_dir=dir
stat = os.stat( dir.path_prefix )
# check any modificaiton on fs, since last import, if none we are done
if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date:
if DEBUG==1:
print( "DEBUG: Directory has not been altered since the last import, just ignore contents" )
job.current_file_num=dir.num_files
job.num_files+=dir.num_files
continue
for basename in files:
fname=dir.path_prefix+'/'+basename
stat = os.stat(fname)
if stat.st_ctime > dir.last_import_date:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - is new/updated".format( basename ), basename )
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
if isImage(fname):
type_str = 'Image'
elif isVideo(fname):
type_str = 'Video'
else:
type_str = 'Unknown'
fsize = round(stat.st_size/(1024*1024))
e=AddFile( job, basename, type_str, fsize, dir )
else:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - is unchanged".format( basename, basename ) )
print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename )
dir.num_files=len(files)+len(subdirs)
dir.last_import_date = time.time()
job.num_files=overall_file_cnt
job.current_file_num=overall_file_cnt
FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) )
import_dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
import_dir.num_files=overall_file_cnt
session.commit()
return
def JobProcessAI(job):
path=[jex.value for jex in job.extra if jex.name == "path"][0]
path = SymlinkName(path, '/')
print('REMOVE AFTER TESTING ON WINDOWS... path=',path)
d=session.query(Dir).filter(Dir.path_prefix==path).first()
job.num_files=d.num_files
for e in FilesInDir( path ):
ProcessFilesInDir(job, e, ProcessAI, lambda a: True)
FinishJob(job, "Finished Processesing AI")
return
def FilesInDir( path ):
d=session.query(Dir).filter(Dir.path_prefix==path).first()
return d.files
def GenHashAndThumb(job, e):
e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name )
if e.type.name == 'Image':
e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Video':
e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Unknown':
job.current_file_num+=1
return
def HashAndThumbDirHasNew(dir):
session.add(dir)
stat = os.stat( dir.path_prefix )
# check any modificaiton on fs, since last import, if none we are done
if stat.st_ctime < dir.last_hash_date:
dir.last_hash_date = time.time()
AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix))
if DEBUG==1:
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
return 0
dir.last_hash_date = time.time()
return 1
def ProcessAI(job, e):
if e.type.name != 'Image':
job.current_file_num+=1
return
people = session.query(Person).all()
for person in people:
generateKnownEncodings(person)
file = e.in_dir[0].path_prefix + '/' + e.name
im_orig = Image.open(file)
im = ImageOps.exif_transpose(im_orig)
unknown_encodings = generateUnknownEncodings(im)
for unknown_encoding in unknown_encodings:
for person in people:
lookForPersonInImage(job, person, unknown_encoding, e)
AddLogForJob(job, f"Finished processing {e.name}", e.name )
return
def lookForPersonInImage(job, person, unknown_encoding, e):
for refimg in person.refimg:
deserialized_bytes = numpy.frombuffer(refimg.encodings, dtype=numpy.float64)
#deserialized_x = numpy.reshape(deserialized_bytes, newshape=(2,2))
results = compareAI(deserialized_bytes, unknown_encoding)
if results[0]:
print(f'Found a match between: {person.tag} and {e.name}')
AddLogForJob(job, f'Found a match between: {person.tag} and {e.name}')
fpl = File_Person_Link(person_id=person.id, file_id=e.file_details[0].eid)
session.add(fpl)
return
def generateUnknownEncodings(im):
unknown_image = numpy.array(im)
face_locations = face_recognition.face_locations(unknown_image)
unknown_encodings = face_recognition.face_encodings(unknown_image, known_face_locations=face_locations)
# should save these to the db
# file.locations = face_locations
return unknown_encodings
def generateKnownEncodings(person):
for refimg in person.refimg:
img = face_recognition.load_image_file('reference_images/'+refimg.fname)
location = face_recognition.face_locations(img)
encodings = face_recognition.face_encodings(img, known_face_locations=location)
refimg.encodings = encodings[0].tobytes()
session.add(refimg)
session.commit()
def compareAI(known_encoding, unknown_encoding):
results = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.55)
return results
def ProcessFilesInDir(job, e, file_func, go_into_dir_func):
if DEBUG==1:
print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix))
if e.type.name != 'Directory':
file_func(job, e)
else:
dir=session.query(Dir).filter(Dir.eid==e.id).first()
job.current_file_num+=1
# if this func returns
if not go_into_dir_func(dir):
return
for sub in dir.files:
ProcessFilesInDir(job, sub, file_func, go_into_dir_func)
def JobGetFileDetails(job):
JobProgressState( job, "In Progress" )
path=[jex.value for jex in job.extra if jex.name == "path"][0]
path='static'+'/'+os.path.basename(path[0:-1])
if DEBUG==1:
print("DEBUG: JobGetFileDetails for path={}".format( path ) )
dir=session.query(Dir).filter(Dir.path_prefix==path).first()
stat=os.stat( path )
if stat.st_ctime < dir.last_hash_date:
session.add(dir)
dir.last_hash_date = time.time()
FinishJob(job, "{} has not changed since last hashing - finished job".format(dir.path_prefix))
if DEBUG==1:
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
return
dir.last_hash_date = time.time()
job.current_file_num = 0
job.num_files = dir.num_files
session.commit()
for e in FilesInDir( path ):
ProcessFilesInDir(job, e, GenHashAndThumb, HashAndThumbDirHasNew )
FinishJob(job, "File Details job finished")
session.commit()
return
def isVideo(file):
try:
fileInfo = MediaInfo.parse(file)
for track in fileInfo.tracks:
if track.track_type == "Video":
return True
return False
except Exception as e:
return False
# Converts linux paths into windows paths
# HACK: assumes c:, might be best to just look for [a-z]: ?
def FixPath(p):
if p.startswith('c:'):
p = p.replace('/', '\\')
return p
# Returns an md5 hash of the fnames' contents
def md5(job, fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
hash = hash_md5.hexdigest()
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
return hash
def isImage(file):
try:
img = Image.open(file)
return True
except:
return False
def GenImageThumbnail(job, file):
thumbnail=None
AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
try:
im_orig = Image.open(file)
im = ImageOps.exif_transpose(im_orig)
im.thumbnail((256,256))
img_bytearray = io.BytesIO()
im.save(img_bytearray, format='JPEG')
img_bytearray = img_bytearray.getvalue()
thumbnail = base64.b64encode(img_bytearray)
except:
print('WARNING: NO EXIF TAGS?!?!?!?')
AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file))
return thumbnail
def GenVideoThumbnail(job, file):
AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file )
vcap = cv2.VideoCapture(file)
res, im_ar = vcap.read()
while im_ar.mean() < 15 and res:
res, im_ar = vcap.read()
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
# bt = thumb_buf.tostring()
bt = thumb_buf.tobytes()
thumbnail = base64.b64encode(bt)
thumbnail = str(thumbnail)[2:-1]
return thumbnail
if __name__ == "__main__":
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
ProcessImportDirs()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
s.listen()
while True:
conn, addr = s.accept()
HandleJobs()