From 9474c12a0d6ae16f61358aa7217949ce3da1d902 Mon Sep 17 00:00:00 2001 From: Damien De Paoli Date: Tue, 19 Jan 2021 23:34:12 +1100 Subject: [PATCH] JobGetFileDetails now works - first pass, need to improve handling of files in dirs - I optimised/assumed last dir is what you are in, but that is not always true, so need to set actual dir each time --- pa_job_manager.py | 143 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 117 insertions(+), 26 deletions(-) diff --git a/pa_job_manager.py b/pa_job_manager.py index 0c57100..8823f28 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -61,10 +61,8 @@ class FileData(): try: tags = exifread.process_file(f) except: - print('NO EXIF TAGS?!?!?!?') + print('WARNING: NO EXIF TAGS?!?!?!?') AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file)) - f.close() - raise f.close() fthumbnail = base64.b64encode(tags['JPEGThumbnail']) @@ -132,6 +130,13 @@ class FileData(): job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None ) job.extra.append(jex) session.add(job) + # force commit to make job.id be valid in use of wait_for later + session.commit() + jex2=JobExtra( name="path", value=path ) + job2=Job(start_time='now()', last_update='now()', name="getfiledetails", state="New", wait_for=job.id ) + job2.extra.append(jex2) + session.add(job2) + print ("adding job2 id={}, wait_for={}, job is: {}".format( job2.id, job2.wait_for, job.id ) ) return @@ -151,6 +156,8 @@ class Dir(Base): __tablename__ = "dir" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) path_prefix = Column(String, unique=False, nullable=False ) + num_files = Column(Integer) + files = relationship("Entry", secondary="entry_dir_link") def __repr__(self): return "".format(self.eid, self.path_prefix) @@ -186,21 +193,6 @@ class FileType(Base): def __repr__(self): return "".format(self.id, self.name ) -class File(Base): - __tablename__ = "file" - id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) - name = Column(String, unique=True, nullable=False ) - type = Column(String, unique=False, nullable=False) - path_prefix = Column(String, unique=False, nullable=False) - size_mb = Column(Integer, unique=False, nullable=False) - # hash might not be unique, this could be the source of dupe problems - hash = Column(Integer, unique=True, nullable=True) - thumbnail = Column(String, unique=False, nullable=True) - - def __repr__(self): - return "".format(self.id, self.name ) - - class Settings(Base): __tablename__ = "settings" id = Column(Integer, Sequence('settings_id_seq'), primary_key=True ) @@ -269,7 +261,7 @@ class Job(Base): extra = relationship( "JobExtra") def __repr__(self): - return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file, self.extra, self.logs) + return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) class PA_JobManager_FE_Message(Base): __tablename__ = "pa_job_manager_fe_message" @@ -312,6 +304,8 @@ def RunJob(job): JobForceScan(job) elif job.name =="importdir": JobImportDir(job) + elif job.name =="getfiledetails": + JobGetFileDetails(job) else: print("Requested to process unknown job type: {}".format(job.name)) return @@ -326,8 +320,20 @@ def HandleJobs(): pa_eng.num_completed_jobs=0 for job in jobs: if job.pa_job_state != 'Completed': + if job.wait_for != None: + j2 = session.query(Job).get(job.wait_for) + if not j2: + print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for )) + for j in session.query(Job).all(): + print ("j={}".format(j.id)) + continue + if j2.pa_job_state != 'Completed': + continue + # use this to remove threads for easier debugging, and errors will stacktrace to the console if DEBUG==1: + print("*************************************") + print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for )) RunJob(job) else: try: @@ -373,10 +379,9 @@ def MakeSymlink(job,path): return symlink def AddDir(job, dirname, path_prefix, in_dir): - dir=Dir( path_prefix=path_prefix ) + dir=Dir( path_prefix=path_prefix, num_files=0 ) dtype = session.query(FileType).filter(FileType.name=='Directory').first() e=Entry( name=dirname, type=dtype ) - print( dtype) e.dir_details.append(dir) # this occurs when we Add the actual Dir for the import_path if in_dir: @@ -396,18 +401,21 @@ def AddFile(job, fname, type_str, fsize, in_dir ): return e def JobImportDir(job): - print("DEBUG: Importing dir: {}".format(job.id)) + print("DEBUG: Importing dir") settings = session.query(Settings).first() if settings == None: raise Exception("Cannot create file data with no settings / import path is missing") last_import_date = settings.last_import_date + file_cnt=0 for jex in job.extra: if jex.name =="path": path = FixPath( jex.value) AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) + print("DEBUG: Checking Import Directory: {}".format( path ) ) if os.path.exists( path ): symlink=MakeSymlink(job,path) dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None ) + import_dir=dir for file in sorted(glob.glob(path + '**', recursive=True)): if file == path: continue @@ -419,7 +427,9 @@ def JobImportDir(job): if os.path.isdir(file): path_prefix=os.path.join(symlink,fname) dir=AddDir( job, fname, path_prefix, dir ) + print("DEBUG(adddir)"); else: + file_cnt=file_cnt+1 if isImage(file): type_str = 'Image' elif isVideo(file): @@ -428,14 +438,65 @@ def JobImportDir(job): type_str = 'File' fsize = round(os.stat(file).st_size/(1024*1024)) e=AddFile( job, os.path.basename(fname), type_str, fsize, dir ) + print("DEBUG(addfile)"); else: AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file ) print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file ) - #settings.last_import_date = time.time() + import_dir.num_files=file_cnt + AddLogForJob(job, "Finished Importing: {} - Found {} new files".format( path, file_cnt ) ) + job.pa_job_state = "Completed" + job.state = "Completed" + job.last_updated = datetime.now(pytz.utc) +# settings.last_import_date = time.time() + print ("DEBUG-END: finished Job import dir: {}".format(job)) + else: + AddLogForJob(job, "Finished Importing: {} -- Path does not exist".format( path) ) + job.pa_job_state = "Completed" + job.state = "Failed" + job.last_updated = datetime.now(pytz.utc) + for j in session.query(Job).filter(Job.wait_for==job.id).all(): + print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) + j.pa_job_state = "Completed" + j.state = "Withdrawn" + j.last_updated = datetime.now(pytz.utc) + AddLogForJob(j, "Job has been withdrawn as the job being waited for failed") session.commit() - print ("DEBUG: finished Job import dir") return + +def FilesInDir( path ): + d=session.query(Dir).filter(Dir.path_prefix==path).first() + return d.files + +def ProcessFilesInDir(job, e): + print("files in dir - process: {}".format(e.name)) + if e.type.name != 'Directory': + e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) + if e.type.name == 'Image': + e.file_details[0].thumbnail = GenImageThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) + elif e.type.name == 'Video': + e.file_details[0].thumbnail = GenVideoThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) + else: + print("need to better process: {}".format(e)) + d=session.query(Dir).filter(Dir.eid==e.id).first() + for sub in d.files: + ProcessFilesInDir(job, sub ) + +def JobGetFileDetails(job): + print("JobGetFileDetails:") + for jex in job.extra: + if jex.name =="path": + path=jex.value + path=FixPath('static/{}'.format( os.path.basename(path[0:-1]))) + print(" for path={}".format( path ) ) + for e in FilesInDir( path ): + ProcessFilesInDir(job, e ) + job.pa_job_state = "Completed" + job.state = "Completed" + job.last_updated = datetime.now(pytz.utc) + session.commit() + return + def isVideo(file): try: fileInfo = MediaInfo.parse(file) @@ -454,12 +515,14 @@ def FixPath(p): return p # Returns an md5 hash of the fnames' contents -def md5(fname): +def md5(job, fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) - return hash_md5.hexdigest() + hash = hash_md5.hexdigest() + AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) ) + return hash def isImage(file): try: @@ -468,6 +531,34 @@ def isImage(file): except: return False +def GenImageThumbnail(job, file): + AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ) ) + f = open(file, 'rb') + try: + tags = exifread.process_file(f) + except: + print('WARNING: NO EXIF TAGS?!?!?!?') + AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file)) + f.close() + + thumbnail = base64.b64encode(tags['JPEGThumbnail']) + thumbnail = str(thumbnail)[2:-1] + return thumbnail + +def GenVideoThumbnail(job, file): + AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ) ) + vcap = cv2.VideoCapture(file) + res, im_ar = vcap.read() + while im_ar.mean() < 15 and res: + res, im_ar = vcap.read() + im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR) + res, thumb_buf = cv2.imencode('.jpeg', im_ar) + bt = thumb_buf.tostring() + thumbnail = base64.b64encode(bt) + thumbnail = str(thumbnail)[2:-1] + return thumbnail + + if __name__ == "__main__": print("PA job manager starting") try: