diff --git a/pa_job_manager.py b/pa_job_manager.py index 225cbf3..8f500c0 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -38,7 +38,7 @@ import hashlib import exifread import base64 import numpy -from cv2 import cv2 +import cv2 import socket import threading import io @@ -47,7 +47,7 @@ import re import sys -DEBUG=1 +DEBUG=0 sys.setrecursionlimit(50000) # an Manager, which the Session will use for connection resources @@ -256,7 +256,6 @@ def ProcessImportDirs(parent_job): return def JobsForPaths( parent_job, paths ): - now=datetime.now(pytz.utc) # make new set of Jobs per path... HandleJobs will make them run later for path in paths: @@ -301,15 +300,17 @@ def JobsForPaths( parent_job, paths ): HandleJobs() return -def AddLogForJob(job, message, current_file=''): +def ProcessFileForJob(job, message, current_file): + job.current_file=os.path.basename(current_file) + if job.num_files: + job.current_file_num=job.current_file_num+1 + AddLogForJob(job, message ) + return + +def AddLogForJob(job, message): now=datetime.now(pytz.utc) log=Joblog( job_id=job.id, log=message, log_date=now ) job.last_update=now - if current_file != '': - job.current_file=os.path.basename(current_file) - # this may not be set on an import job - if job.num_files: - job.current_file_num=job.current_file_num+1 session.add(log) return @@ -360,8 +361,8 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): return def HandleJobs(): - - print("INFO: PA job manager is scanning for new jobs to process") + if DEBUG==1: + print("INFO: PA job manager is scanning for new jobs to process") for job in session.query(Job).all(): if job.pa_job_state == 'New': if job.wait_for != None: @@ -388,7 +389,7 @@ def HandleJobs(): MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) except Exception as e2: print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) ) - print("INFO: PA job manager is waiting jobs") + print("INFO: PA job manager is waiting for a job") return def JobProgressState( job, state ): @@ -422,6 +423,7 @@ def JobForceScan(job): session.query(Entry).delete() session.commit() ProcessImportDirs(job) + ProcessStorageDirs(job) FinishJob(job, "Completed (forced remove and recreation of all file data)") MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" ) session.commit() @@ -505,18 +507,14 @@ def HandleAnyFSDeletions(job): def GetDateFromFile(file, stat): # try exif try: - print(f"trying exif read of {file}") f = open(file, 'rb') tags = exifread.process_file(f) date_str, _ = str(tags["EXIF DateTimeOriginal"]).split(" ") - print(date_str) year, month, day = date_str.split(":") year=int(year) month=int(month) day=int(day) - print(year) check = datetime( year, month, day ) - print( f"check={check}" ) except: # try parsing filename try: @@ -525,7 +523,6 @@ def GetDateFromFile(file, stat): month=int(m[2]) day=int(m[3]) check2 = datetime( year, month, day ) - print( f"check2={check2}" ) # give up and use file sys date except: year, month, day, _, _, _, _, _, _ = datetime.fromtimestamp(stat.st_ctime).timetuple() @@ -533,7 +530,6 @@ def GetDateFromFile(file, stat): woy=c[1] return year, month, day, woy - def JobImportDir(job): JobProgressState( job, "In Progress" ) settings = session.query(Settings).first() @@ -580,7 +576,6 @@ def JobImportDir(job): stat = os.stat(fname) if stat.st_ctime > dir.last_import_date: - AddLogForJob(job, f"Processing new/update file: {basename}", basename ) if DEBUG==1: print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) ) if isImage(fname): @@ -600,11 +595,12 @@ def JobImportDir(job): print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename ) job.current_file=basename job.current_file_num+=1 - dir.num_files=len(files)+len(subdirs) + job.current_file_num += len(subdirs) dir.last_import_date = time.time() + if parent_dir != None: + dir.num_files=len(files)+len(subdirs) parent_dir=dir job.num_files=overall_file_cnt - job.current_file_num=overall_file_cnt rm_cnt=HandleAnyFSDeletions(job) @@ -692,7 +688,7 @@ def ProcessAI(job, e): for unknown_encoding in faces: for person in people: lookForPersonInImage(job, person, unknown_encoding, e) - AddLogForJob(job, f"Finished processing {e.name}", e.name ) + ProcessFileForJob(job, f"Finished processing {e.name}", e.name ) return def lookForPersonInImage(job, person, unknown_encoding, e): @@ -802,10 +798,13 @@ def isImage(file): return False def GenImageThumbnail(job, file): - AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file ) + ProcessFileForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file ) try: im_orig = Image.open(file) im = ImageOps.exif_transpose(im_orig) + bands = im.getbands() + if 'A' in bands: + im = im.convert('RGB') im.thumbnail((THUMBSIZE,THUMBSIZE)) img_bytearray = io.BytesIO() im.save(img_bytearray, format='JPEG') @@ -813,13 +812,12 @@ def GenImageThumbnail(job, file): thumbnail = base64.b64encode(img_bytearray) thumbnail = str(thumbnail)[2:-1] except Exception as e: - print('WARNING: NO EXIF TAGS?!?!?!?') AddLogForJob(job, f"WARNING: No EXIF TAF found for: {file} - error={e}") return None return thumbnail def GenVideoThumbnail(job, file): - AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file ) + ProcessFileForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file ) try: vcap = cv2.VideoCapture(file) res, frame = vcap.read()