fixed Bug-19 same fname, diff dir not going into DB - fixed due to use of Unique on name field in Entry class, optimised to only commit logs to DB 1 per 100. Also have importdir job on second and subsequent runs show progress as num_files has been stored into the Dir object for the import path
This commit is contained in:
@@ -43,7 +43,7 @@ import threading
|
|||||||
import io
|
import io
|
||||||
import face_recognition
|
import face_recognition
|
||||||
|
|
||||||
DEBUG=1
|
DEBUG=0
|
||||||
|
|
||||||
# an Manager, which the Session will use for connection resources
|
# an Manager, which the Session will use for connection resources
|
||||||
some_engine = create_engine(DB_URL)
|
some_engine = create_engine(DB_URL)
|
||||||
@@ -75,7 +75,7 @@ class EntryDirLink(Base):
|
|||||||
class Dir(Base):
|
class Dir(Base):
|
||||||
__tablename__ = "dir"
|
__tablename__ = "dir"
|
||||||
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
||||||
path_prefix = Column(String, unique=False, nullable=False )
|
path_prefix = Column(String, unique=True, nullable=False )
|
||||||
num_files = Column(Integer)
|
num_files = Column(Integer)
|
||||||
last_import_date = Column(Float)
|
last_import_date = Column(Float)
|
||||||
files = relationship("Entry", secondary="entry_dir_link")
|
files = relationship("Entry", secondary="entry_dir_link")
|
||||||
@@ -86,7 +86,7 @@ class Dir(Base):
|
|||||||
class Entry(Base):
|
class Entry(Base):
|
||||||
__tablename__ = "entry"
|
__tablename__ = "entry"
|
||||||
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
||||||
name = Column(String, unique=True, nullable=False )
|
name = Column(String, unique=False, nullable=False )
|
||||||
type_id = Column(Integer, ForeignKey("file_type.id"))
|
type_id = Column(Integer, ForeignKey("file_type.id"))
|
||||||
exists_on_fs=Column(Boolean)
|
exists_on_fs=Column(Boolean)
|
||||||
type=relationship("FileType")
|
type=relationship("FileType")
|
||||||
@@ -234,10 +234,15 @@ def ProcessImportDirs(parent_job=None):
|
|||||||
raise Exception("Cannot create file data with no settings / import path is missing")
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
||||||
paths = settings.import_path.split("#")
|
paths = settings.import_path.split("#")
|
||||||
now=datetime.now(pytz.utc)
|
now=datetime.now(pytz.utc)
|
||||||
|
# make new set of Jobs per path... HandleJobs will make them run later
|
||||||
for path in paths:
|
for path in paths:
|
||||||
# make new Job; HandleJobs will make them run later
|
d=session.query(Dir).filter(Dir.path_prefix==SymlinkName(path,path+'/')).first()
|
||||||
|
cfn=0
|
||||||
|
if d:
|
||||||
|
cfn=d.num_files
|
||||||
|
|
||||||
jex=JobExtra( name="path", value=path )
|
jex=JobExtra( name="path", value=path )
|
||||||
job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=0 )
|
job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=cfn )
|
||||||
job.extra.append(jex)
|
job.extra.append(jex)
|
||||||
session.add(job)
|
session.add(job)
|
||||||
session.commit()
|
session.commit()
|
||||||
@@ -274,7 +279,6 @@ def AddLogForJob(job, message, current_file=''):
|
|||||||
if job.num_files:
|
if job.num_files:
|
||||||
job.current_file_num=job.current_file_num+1
|
job.current_file_num=job.current_file_num+1
|
||||||
session.add(log)
|
session.add(log)
|
||||||
session.commit()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def RunJob(job):
|
def RunJob(job):
|
||||||
@@ -314,6 +318,7 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
|||||||
AddLogForJob(job, last_log)
|
AddLogForJob(job, last_log)
|
||||||
if job.state=="Failed":
|
if job.state=="Failed":
|
||||||
CancelJob(job,job.id)
|
CancelJob(job,job.id)
|
||||||
|
session.commit()
|
||||||
return
|
return
|
||||||
|
|
||||||
def HandleJobs():
|
def HandleJobs():
|
||||||
@@ -384,7 +389,7 @@ def SymlinkName(path, file):
|
|||||||
last_bit = os.path.dirname(sig_bit)[0:-1]
|
last_bit = os.path.dirname(sig_bit)[0:-1]
|
||||||
else:
|
else:
|
||||||
last_bit = os.path.dirname(sig_bit)
|
last_bit = os.path.dirname(sig_bit)
|
||||||
symlink = 'static'+'/'+last_dir+'/'+last_bit
|
symlink = 'static/'+last_dir+'/'+last_bit
|
||||||
if symlink[-1] == '/':
|
if symlink[-1] == '/':
|
||||||
symlink=symlink[0:-1]
|
symlink=symlink[0:-1]
|
||||||
return symlink
|
return symlink
|
||||||
@@ -411,7 +416,7 @@ def AddDir(job, dirname, path_prefix, in_dir):
|
|||||||
e.in_dir.append(in_dir)
|
e.in_dir.append(in_dir)
|
||||||
if DEBUG==1:
|
if DEBUG==1:
|
||||||
print(f"AddDir: created d={dirname}, pp={path_prefix}")
|
print(f"AddDir: created d={dirname}, pp={path_prefix}")
|
||||||
AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) )
|
AddLogForJob(job, f"DEBUG: Process new dir: {dirname}")
|
||||||
session.add(e)
|
session.add(e)
|
||||||
return dir
|
return dir
|
||||||
|
|
||||||
@@ -494,11 +499,14 @@ def JobImportDir(job):
|
|||||||
dir=AddDir(job, os.path.basename(root), pp, parent_dir)
|
dir=AddDir(job, os.path.basename(root), pp, parent_dir)
|
||||||
parent_dir=dir
|
parent_dir=dir
|
||||||
for basename in files:
|
for basename in files:
|
||||||
|
# commit every 100 files to see progress being made but not hammer the database
|
||||||
|
if job.current_file_num % 100 == 0:
|
||||||
|
session.commit()
|
||||||
fname=dir.path_prefix+'/'+basename
|
fname=dir.path_prefix+'/'+basename
|
||||||
stat = os.stat(fname)
|
stat = os.stat(fname)
|
||||||
if stat.st_ctime > dir.last_import_date:
|
if stat.st_ctime > dir.last_import_date:
|
||||||
|
AddLogForJob(job, f"Processing new/update file: {basename}", basename )
|
||||||
if DEBUG==1:
|
if DEBUG==1:
|
||||||
AddLogForJob(job, "DEBUG: {} - is new/updated".format( basename ), basename )
|
|
||||||
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
|
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
|
||||||
if isImage(fname):
|
if isImage(fname):
|
||||||
type_str = 'Image'
|
type_str = 'Image'
|
||||||
@@ -512,8 +520,10 @@ def JobImportDir(job):
|
|||||||
e=session.query(Entry).filter(Entry.name==basename).first()
|
e=session.query(Entry).filter(Entry.name==basename).first()
|
||||||
e.exists_on_fs=True
|
e.exists_on_fs=True
|
||||||
if DEBUG==1:
|
if DEBUG==1:
|
||||||
AddLogForJob(job, "DEBUG: {} - is unchanged".format( basename, basename ) )
|
|
||||||
print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename )
|
print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename )
|
||||||
|
job.current_file=basename
|
||||||
|
job.current_file_num+=1
|
||||||
|
|
||||||
dir.num_files=len(files)+len(subdirs)
|
dir.num_files=len(files)+len(subdirs)
|
||||||
dir.last_import_date = time.time()
|
dir.last_import_date = time.time()
|
||||||
job.num_files=overall_file_cnt
|
job.num_files=overall_file_cnt
|
||||||
@@ -521,10 +531,12 @@ def JobImportDir(job):
|
|||||||
|
|
||||||
rm_cnt=HandleAnyFSDeletions(job)
|
rm_cnt=HandleAnyFSDeletions(job)
|
||||||
|
|
||||||
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)")
|
# reset overall path with overall_file_cnt, we use this for future jobs
|
||||||
|
# to measure progress when dealing with this path
|
||||||
import_dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
|
import_dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
|
||||||
import_dir.num_files=overall_file_cnt
|
import_dir.num_files=overall_file_cnt
|
||||||
session.commit()
|
session.add(import_dir)
|
||||||
|
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)")
|
||||||
return
|
return
|
||||||
|
|
||||||
def RunFuncOnFilesInPath( job, path, file_func ):
|
def RunFuncOnFilesInPath( job, path, file_func ):
|
||||||
@@ -550,9 +562,12 @@ def JobProcessAI(job):
|
|||||||
|
|
||||||
|
|
||||||
def GenHashAndThumb(job, e):
|
def GenHashAndThumb(job, e):
|
||||||
|
# commit every 100 files to see progress being made but not hammer the database
|
||||||
|
if job.current_file_num % 100 == 0:
|
||||||
|
session.commit()
|
||||||
stat = os.stat( e.in_dir[0].path_prefix + '/' + e.name )
|
stat = os.stat( e.in_dir[0].path_prefix + '/' + e.name )
|
||||||
if stat.st_ctime < e.file_details[0].last_hash_date:
|
if stat.st_ctime < e.file_details[0].last_hash_date:
|
||||||
print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
|
# print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
|
||||||
job.current_file_num+=1
|
job.current_file_num+=1
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user