diff --git a/pa_job_manager.py b/pa_job_manager.py index f8fabef..e73ddaa 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -50,96 +50,6 @@ Base = declarative_base() # global for us to keep state / let front-end know our state pa_eng=None - -################################################################################ -# FileData class... -################################################################################ - -class FileData(): - def getExif(self, file): - f = open(file, 'rb') - try: - tags = exifread.process_file(f) - except: - print('WARNING: NO EXIF TAGS?!?!?!?') - AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file)) - f.close() - - fthumbnail = base64.b64encode(tags['JPEGThumbnail']) - fthumbnail = str(fthumbnail)[2:-1] - return fthumbnail - - def isVideo(self, file): - try: - fileInfo = MediaInfo.parse(file) - for track in fileInfo.tracks: - if track.track_type == "Video": - return True - return False - except Exception as e: - return False - - # Converts linux paths into windows paths - # HACK: assumes c:, might be best to just look for [a-z]: ? - def FixPath(self, p): - if p.startswith('c:'): - p = p.replace('/', '\\') - return p - - # Returns an md5 hash of the fnames' contents - def md5(self, fname): - hash_md5 = hashlib.md5() - with open(fname, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - hash_md5.update(chunk) - return hash_md5.hexdigest() - - def isImage(self, file): - try: - img = Image.open(file) - return True - except: - return False - - def generateVideoThumbnail(self, file): - #overall wrapper function for generating video thumbnails - vcap = cv2.VideoCapture(file) - res, im_ar = vcap.read() - while im_ar.mean() < 15 and res: - res, im_ar = vcap.read() - im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR) - #save on a buffer for direct transmission - res, thumb_buf = cv2.imencode('.jpeg', im_ar) - # '.jpeg' etc are permitted - #get the bytes content - bt = thumb_buf.tostring() - fthumbnail = base64.b64encode(bt) - fthumbnail = str(fthumbnail)[2:-1] - return fthumbnail - - ############################################################################## - def ProcessImportDirs(self): - settings = session.query(Settings).first() - if settings == None: - raise Exception("Cannot create file data with no settings / import path is missing") - last_import_date = settings.last_import_date - paths = settings.import_path.split("#") - for path in paths: - # make new Job; HandleJobs will make them run later - jex=JobExtra( name="path", value=path ) - job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None ) - job.extra.append(jex) - session.add(job) - # force commit to make job.id be valid in use of wait_for later - session.commit() - jex2=JobExtra( name="path", value=path ) - job2=Job(start_time='now()', last_update='now()', name="getfiledetails", state="New", wait_for=job.id ) - job2.extra.append(jex2) - session.add(job2) - print ("adding job2 id={}, wait_for={}, job is: {}".format( job2.id, job2.wait_for, job.id ) ) - return - - ################################################################################ # Class describing File in the database, and via sqlalchemy, connected to the DB as well # This has to match one-for-one the DB table @@ -157,10 +67,11 @@ class Dir(Base): eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) path_prefix = Column(String, unique=False, nullable=False ) num_files = Column(Integer) + last_import_date = Column(Float) files = relationship("Entry", secondary="entry_dir_link") def __repr__(self): - return "".format(self.eid, self.path_prefix) + return "".format(self.eid, self.path_prefix, self.num_files) class Entry(Base): __tablename__ = "entry" @@ -197,15 +108,11 @@ class Settings(Base): __tablename__ = "settings" id = Column(Integer, Sequence('settings_id_seq'), primary_key=True ) import_path = Column(String) - last_import_date = Column(Float) def __repr__(self): - return "".format(self.id, self.import_path, self.last_import_date) + return "".format(self.id, self.import_path ) -### Initiatlise the file data set -filedata = FileData() - ################################################################################ # classes for the job manager: # PA_JobManager overall status tracking), @@ -249,8 +156,6 @@ class Job(Base): last_update = Column(DateTime(timezone=True)) name = Column(String) state = Column(String) - num_passes = Column(Integer) - current_pass = Column(Integer) num_files = Column(Integer) current_file_num = Column(Integer) current_file = Column(String) @@ -261,7 +166,7 @@ class Job(Base): extra = relationship( "JobExtra") def __repr__(self): - return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) + return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) class PA_JobManager_FE_Message(Base): __tablename__ = "pa_job_manager_fe_message" @@ -272,14 +177,12 @@ class PA_JobManager_FE_Message(Base): def __repr__(self): return " last_import_date: - AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) ) - print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) ) + dirname=SymlinkName(path, file) + if keep_dirs[dirname].last_import_date == 0 or stat.st_ctime > keep_dirs[dirname].last_import_date: + AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) + print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) if os.path.isdir(file): path_prefix=os.path.join(symlink,fname) dir=AddDir( job, fname, path_prefix, dir ) + fcnt[path_prefix]=0 + keep_dirs[dir.path_prefix]=dir else: - file_cnt=file_cnt+1 + overall_file_cnt=overall_file_cnt+1 + dirname=SymlinkName(path, file) + fcnt[dirname]=fcnt[dirname]+1 if isImage(file): type_str = 'Image' elif isVideo(file): @@ -438,26 +413,21 @@ def JobImportDir(job): fsize = round(os.stat(file).st_size/(1024*1024)) e=AddFile( job, os.path.basename(fname), type_str, fsize, dir ) else: - AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file ) - print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file ) - import_dir.num_files=file_cnt - AddLogForJob(job, "Finished Importing: {} - Found {} new files".format( path, file_cnt ) ) - job.pa_job_state = "Completed" - job.state = "Completed" - job.last_updated = datetime.now(pytz.utc) -# settings.last_import_date = time.time() + AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) + print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) + FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) ) + for d in keep_dirs: + keep_dirs[d].num_files = fcnt[d] + keep_dirs[d].last_import_date = time.time() + # override this to be all the files in dir & its sub dirs... (used to know how many files in jobs for this import dir) + import_dir.num_files=overall_file_cnt else: - AddLogForJob(job, "Finished Importing: {} -- Path does not exist".format( path) ) - job.pa_job_state = "Completed" - job.state = "Failed" - job.last_updated = datetime.now(pytz.utc) + FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) for j in session.query(Job).filter(Job.wait_for==job.id).all(): print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) - j.pa_job_state = "Completed" - j.state = "Withdrawn" - j.last_updated = datetime.now(pytz.utc) - AddLogForJob(j, "Job has been withdrawn as the job being waited for failed") + FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) session.commit() + print("fcnt:", fcnt) return def FilesInDir( path ): @@ -466,6 +436,7 @@ def FilesInDir( path ): def ProcessFilesInDir(job, e): + time.sleep(0.3) print("files in dir - process: {}".format(e.name)) if e.type.name != 'Directory': e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) @@ -480,17 +451,21 @@ def ProcessFilesInDir(job, e): ProcessFilesInDir(job, sub ) def JobGetFileDetails(job): + JobProgressState( job, "In Progress" ) print("JobGetFileDetails:") for jex in job.extra: if jex.name =="path": path=jex.value path=FixPath('static/{}'.format( os.path.basename(path[0:-1]))) print(" for path={}".format( path ) ) + dir=session.query(Dir).filter(Dir.path_prefix==path).first() + job.current_file_num = 0 + job.num_files = dir.num_files + print("cfm set to 0 for job: {}, num_files: {}".format(job.id, job.num_files)); + session.commit() for e in FilesInDir( path ): ProcessFilesInDir(job, e ) - job.pa_job_state = "Completed" - job.state = "Completed" - job.last_updated = datetime.now(pytz.utc) + FinishJob(job, "File Details processed") session.commit() return @@ -529,7 +504,7 @@ def isImage(file): return False def GenImageThumbnail(job, file): - AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ) ) + AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file ) f = open(file, 'rb') try: tags = exifread.process_file(f) @@ -543,7 +518,7 @@ def GenImageThumbnail(job, file): return thumbnail def GenVideoThumbnail(job, file): - AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ) ) + AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file ) vcap = cv2.VideoCapture(file) res, im_ar = vcap.read() while im_ar.mean() < 15 and res: @@ -560,12 +535,10 @@ if __name__ == "__main__": print("PA job manager starting") try: InitialiseManager() - filedata.ProcessImportDirs() - session.commit() + ProcessImportDirs() except Exception as e: print( "Failed to initialise PA Job Manager: {}".format(e) ) session.rollback() - HandleJobs() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) s.listen()