diff --git a/pa_job_manager.py b/pa_job_manager.py index d539c1c..63f874d 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -43,7 +43,7 @@ import threading import io import face_recognition -DEBUG=1 +DEBUG=0 # an Manager, which the Session will use for connection resources some_engine = create_engine(DB_URL) @@ -75,7 +75,7 @@ class EntryDirLink(Base): class Dir(Base): __tablename__ = "dir" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) - path_prefix = Column(String, unique=False, nullable=False ) + path_prefix = Column(String, unique=True, nullable=False ) num_files = Column(Integer) last_import_date = Column(Float) files = relationship("Entry", secondary="entry_dir_link") @@ -86,7 +86,7 @@ class Dir(Base): class Entry(Base): __tablename__ = "entry" id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) - name = Column(String, unique=True, nullable=False ) + name = Column(String, unique=False, nullable=False ) type_id = Column(Integer, ForeignKey("file_type.id")) exists_on_fs=Column(Boolean) type=relationship("FileType") @@ -234,10 +234,15 @@ def ProcessImportDirs(parent_job=None): raise Exception("Cannot create file data with no settings / import path is missing") paths = settings.import_path.split("#") now=datetime.now(pytz.utc) + # make new set of Jobs per path... HandleJobs will make them run later for path in paths: - # make new Job; HandleJobs will make them run later + d=session.query(Dir).filter(Dir.path_prefix==SymlinkName(path,path+'/')).first() + cfn=0 + if d: + cfn=d.num_files + jex=JobExtra( name="path", value=path ) - job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=0 ) + job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=cfn ) job.extra.append(jex) session.add(job) session.commit() @@ -274,7 +279,6 @@ def AddLogForJob(job, message, current_file=''): if job.num_files: job.current_file_num=job.current_file_num+1 session.add(log) - session.commit() return def RunJob(job): @@ -314,6 +318,7 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): AddLogForJob(job, last_log) if job.state=="Failed": CancelJob(job,job.id) + session.commit() return def HandleJobs(): @@ -384,7 +389,7 @@ def SymlinkName(path, file): last_bit = os.path.dirname(sig_bit)[0:-1] else: last_bit = os.path.dirname(sig_bit) - symlink = 'static'+'/'+last_dir+'/'+last_bit + symlink = 'static/'+last_dir+'/'+last_bit if symlink[-1] == '/': symlink=symlink[0:-1] return symlink @@ -411,7 +416,7 @@ def AddDir(job, dirname, path_prefix, in_dir): e.in_dir.append(in_dir) if DEBUG==1: print(f"AddDir: created d={dirname}, pp={path_prefix}") - AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) ) + AddLogForJob(job, f"DEBUG: Process new dir: {dirname}") session.add(e) return dir @@ -494,11 +499,14 @@ def JobImportDir(job): dir=AddDir(job, os.path.basename(root), pp, parent_dir) parent_dir=dir for basename in files: + # commit every 100 files to see progress being made but not hammer the database + if job.current_file_num % 100 == 0: + session.commit() fname=dir.path_prefix+'/'+basename stat = os.stat(fname) if stat.st_ctime > dir.last_import_date: + AddLogForJob(job, f"Processing new/update file: {basename}", basename ) if DEBUG==1: - AddLogForJob(job, "DEBUG: {} - is new/updated".format( basename ), basename ) print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) ) if isImage(fname): type_str = 'Image' @@ -512,8 +520,10 @@ def JobImportDir(job): e=session.query(Entry).filter(Entry.name==basename).first() e.exists_on_fs=True if DEBUG==1: - AddLogForJob(job, "DEBUG: {} - is unchanged".format( basename, basename ) ) print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename ) + job.current_file=basename + job.current_file_num+=1 + dir.num_files=len(files)+len(subdirs) dir.last_import_date = time.time() job.num_files=overall_file_cnt @@ -521,10 +531,12 @@ def JobImportDir(job): rm_cnt=HandleAnyFSDeletions(job) - FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)") + # reset overall path with overall_file_cnt, we use this for future jobs + # to measure progress when dealing with this path import_dir=session.query(Dir).filter(Dir.path_prefix==symlink).first() import_dir.num_files=overall_file_cnt - session.commit() + session.add(import_dir) + FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)") return def RunFuncOnFilesInPath( job, path, file_func ): @@ -550,9 +562,12 @@ def JobProcessAI(job): def GenHashAndThumb(job, e): + # commit every 100 files to see progress being made but not hammer the database + if job.current_file_num % 100 == 0: + session.commit() stat = os.stat( e.in_dir[0].path_prefix + '/' + e.name ) if stat.st_ctime < e.file_details[0].last_hash_date: - print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this") +# print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this") job.current_file_num+=1 return