From 574c12f7f522a2734c89b0b67a1dfb13e8ad9ab4 Mon Sep 17 00:00:00 2001 From: Damien De Paoli Date: Wed, 20 Jan 2021 21:46:41 +1100 Subject: [PATCH] cleaned up debugs, and optimised gen hash / thumbs to only process dirs with new content --- pa_job_manager.py | 60 ++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/pa_job_manager.py b/pa_job_manager.py index 358a419..af7d75b 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -68,10 +68,11 @@ class Dir(Base): path_prefix = Column(String, unique=False, nullable=False ) num_files = Column(Integer) last_import_date = Column(Float) + last_hash_date = Column(Float) files = relationship("Entry", secondary="entry_dir_link") def __repr__(self): - return "".format(self.eid, self.path_prefix, self.num_files) + return "".format(self.eid, self.path_prefix, self.num_files, self.last_import_date, self.last_hash_date) class Entry(Base): __tablename__ = "entry" @@ -205,7 +206,7 @@ def ProcessImportDirs(parent_job=None): session.add(job) session.commit() if parent_job: - AddLogForJob(parent_job, "adding job id={}, job is: {}".format( job.id, job.name ) ) + AddLogForJob(parent_job, "adding job id={} {}".format( job.id, job.id, job.name ) ) # force commit to make job.id be valid in use of wait_for later session.commit() jex2=JobExtra( name="path", value=path ) @@ -214,8 +215,7 @@ def ProcessImportDirs(parent_job=None): session.add(job2) session.commit() if parent_job: - AddLogForJob(parent_job, "adding job2 id={}, wait_for={}, job is: {}".format( job2.id, job2.wait_for, job2.name ) ) - print("about to handleJobs") + AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) ) HandleJobs() return @@ -228,7 +228,6 @@ def AddLogForJob(job, message, current_file=''): # this may not be set on an import job if job.num_files: job.current_file_num=job.current_file_num+1 - print("cfm for job: {} is now {}".format(job.id, job.current_file_num)); session.add(log) session.commit() return @@ -243,7 +242,7 @@ def RunJob(job): elif job.name =="getfiledetails": JobGetFileDetails(job) else: - print("Requested to process unknown job type: {}".format(job.name)) + print("ERROR: Requested to process unknown job type: {}".format(job.name)) return def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): @@ -256,10 +255,9 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): def HandleJobs(): global pa_eng - print("PA job manager is scanning for new jobs to process") + print("INFO: PA job manager is scanning for new jobs to process") pa_eng.state = 'Scanning Jobs' for job in session.query(Job).all(): - print("processing: {}".format(job)) if job.pa_job_state == 'New': if job.wait_for != None: j2 = session.query(Job).get(job.wait_for) @@ -284,7 +282,7 @@ def HandleJobs(): MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) except Exception as e2: print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) ) - print("PA job manager is waiting jobs") + print("INFO: PA job manager is waiting jobs") pa_eng.state = 'Waiting for new Jobs' return @@ -336,8 +334,8 @@ def CreateSymlink(job,path): return symlink def AddDir(job, dirname, path_prefix, in_dir): - dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0 ) - dtype = session.query(FileType).filter(FileType.name=='Directory').first() + dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0, last_hash_date=0 ) + dtype=session.query(FileType).filter(FileType.name=='Directory').first() e=Entry( name=dirname, type=dtype ) e.dir_details.append(dir) # this occurs when we Add the actual Dir for the import_path @@ -359,7 +357,6 @@ def AddFile(job, fname, type_str, fsize, in_dir ): def JobImportDir(job): JobProgressState( job, "In Progress" ) - print("DEBUG: Importing dir") settings = session.query(Settings).first() if settings == None: raise Exception("Cannot create file data with no settings / import path is missing") @@ -373,11 +370,12 @@ def JobImportDir(job): print("DEBUG: Checking Import Directory: {}".format( path ) ) if os.path.exists( path ): symlink=CreateSymlink(job,path) - # DDP: dont want to do add a Dir, if this already exists (and if so, check any modificaiton on fs, since last import)!!!! + # dont want to do add a Dir, if this already exists dir=session.query(Dir).filter(Dir.path_prefix==symlink).first() if dir != None: - stat = os.stat( os.path.basename(path[0:-1]) ) - if stat.st_ctime < dir.last_import_date: + stat = os.stat( symlink ) + # check any modificaiton on fs, since last import, if none we are done + if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date: print( "DEBUG: Directory has not been altered since the last import, just return" ) FinishJob( job, "No new files in directory since the last import") return @@ -392,7 +390,7 @@ def JobImportDir(job): fname=file.replace(path, "") stat = os.stat(file) dirname=SymlinkName(path, file) - if keep_dirs[dirname].last_import_date == 0 or stat.st_ctime > keep_dirs[dirname].last_import_date: + if stat.st_ctime > keep_dirs[dirname].last_import_date: AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) if os.path.isdir(file): @@ -427,7 +425,6 @@ def JobImportDir(job): print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) session.commit() - print("fcnt:", fcnt) return def FilesInDir( path ): @@ -436,7 +433,7 @@ def FilesInDir( path ): def ProcessFilesInDir(job, e): - print("files in dir - process: {}".format(e.name)) + print("DEBUG: files in dir - process: {}".format(e.name)) if e.type.name != 'Directory': e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) if e.type.name == 'Image': @@ -444,9 +441,17 @@ def ProcessFilesInDir(job, e): elif e.type.name == 'Video': e.file_details[0].thumbnail = GenVideoThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) else: - print("need to better process: {}".format(e)) - d=session.query(Dir).filter(Dir.eid==e.id).first() - for sub in d.files: + dir=session.query(Dir).filter(Dir.eid==e.id).first() + stat = os.stat( dir.path_prefix ) + # check any modificaiton on fs, since last import, if none we are done + if stat.st_ctime < dir.last_hash_date: + session.add(dir) + dir.last_hash_date = time.time() + AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix)) + print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) + return + dir.last_hash_date = time.time() + for sub in dir.files: ProcessFilesInDir(job, sub ) def JobGetFileDetails(job): @@ -457,6 +462,14 @@ def JobGetFileDetails(job): path=FixPath('static/{}'.format( os.path.basename(path[0:-1]))) print("DEBUG: JobGetFileDetails for path={}".format( path ) ) dir=session.query(Dir).filter(Dir.path_prefix==path).first() + stat=os.stat( path ) + if stat.st_ctime < dir.last_hash_date: + session.add(dir) + dir.last_hash_date = time.time() + AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix)) + print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) + return + dir.last_hash_date = time.time() job.current_file_num = 0 job.num_files = dir.num_files session.commit() @@ -529,17 +542,16 @@ def GenVideoThumbnail(job, file): if __name__ == "__main__": - print("PA job manager starting") + print("INFO: PA job manager starting") try: InitialiseManager() ProcessImportDirs() except Exception as e: - print( "Failed to initialise PA Job Manager: {}".format(e) ) + print( "ERROR: Failed to initialise PA Job Manager: {}".format(e) ) session.rollback() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) s.listen() while True: conn, addr = s.accept() - print("Connection from: {} so HandleJobs".format(addr)) HandleJobs()