cleaned up debugs, and optimised gen hash / thumbs to only process dirs with new content
This commit is contained in:
@@ -68,10 +68,11 @@ class Dir(Base):
|
|||||||
path_prefix = Column(String, unique=False, nullable=False )
|
path_prefix = Column(String, unique=False, nullable=False )
|
||||||
num_files = Column(Integer)
|
num_files = Column(Integer)
|
||||||
last_import_date = Column(Float)
|
last_import_date = Column(Float)
|
||||||
|
last_hash_date = Column(Float)
|
||||||
files = relationship("Entry", secondary="entry_dir_link")
|
files = relationship("Entry", secondary="entry_dir_link")
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<eid: {}, path_prefix: {}, num_files: {}>".format(self.eid, self.path_prefix, self.num_files)
|
return "<eid: {}, path_prefix: {}, num_files: {}, last_import_date: {}, last_hash_date: {}>".format(self.eid, self.path_prefix, self.num_files, self.last_import_date, self.last_hash_date)
|
||||||
|
|
||||||
class Entry(Base):
|
class Entry(Base):
|
||||||
__tablename__ = "entry"
|
__tablename__ = "entry"
|
||||||
@@ -205,7 +206,7 @@ def ProcessImportDirs(parent_job=None):
|
|||||||
session.add(job)
|
session.add(job)
|
||||||
session.commit()
|
session.commit()
|
||||||
if parent_job:
|
if parent_job:
|
||||||
AddLogForJob(parent_job, "adding job id={}, job is: {}".format( job.id, job.name ) )
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a>".format( job.id, job.id, job.name ) )
|
||||||
# force commit to make job.id be valid in use of wait_for later
|
# force commit to make job.id be valid in use of wait_for later
|
||||||
session.commit()
|
session.commit()
|
||||||
jex2=JobExtra( name="path", value=path )
|
jex2=JobExtra( name="path", value=path )
|
||||||
@@ -214,8 +215,7 @@ def ProcessImportDirs(parent_job=None):
|
|||||||
session.add(job2)
|
session.add(job2)
|
||||||
session.commit()
|
session.commit()
|
||||||
if parent_job:
|
if parent_job:
|
||||||
AddLogForJob(parent_job, "adding job2 id={}, wait_for={}, job is: {}".format( job2.id, job2.wait_for, job2.name ) )
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) )
|
||||||
print("about to handleJobs")
|
|
||||||
HandleJobs()
|
HandleJobs()
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -228,7 +228,6 @@ def AddLogForJob(job, message, current_file=''):
|
|||||||
# this may not be set on an import job
|
# this may not be set on an import job
|
||||||
if job.num_files:
|
if job.num_files:
|
||||||
job.current_file_num=job.current_file_num+1
|
job.current_file_num=job.current_file_num+1
|
||||||
print("cfm for job: {} is now {}".format(job.id, job.current_file_num));
|
|
||||||
session.add(log)
|
session.add(log)
|
||||||
session.commit()
|
session.commit()
|
||||||
return
|
return
|
||||||
@@ -243,7 +242,7 @@ def RunJob(job):
|
|||||||
elif job.name =="getfiledetails":
|
elif job.name =="getfiledetails":
|
||||||
JobGetFileDetails(job)
|
JobGetFileDetails(job)
|
||||||
else:
|
else:
|
||||||
print("Requested to process unknown job type: {}".format(job.name))
|
print("ERROR: Requested to process unknown job type: {}".format(job.name))
|
||||||
return
|
return
|
||||||
|
|
||||||
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
||||||
@@ -256,10 +255,9 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
|||||||
def HandleJobs():
|
def HandleJobs():
|
||||||
global pa_eng
|
global pa_eng
|
||||||
|
|
||||||
print("PA job manager is scanning for new jobs to process")
|
print("INFO: PA job manager is scanning for new jobs to process")
|
||||||
pa_eng.state = 'Scanning Jobs'
|
pa_eng.state = 'Scanning Jobs'
|
||||||
for job in session.query(Job).all():
|
for job in session.query(Job).all():
|
||||||
print("processing: {}".format(job))
|
|
||||||
if job.pa_job_state == 'New':
|
if job.pa_job_state == 'New':
|
||||||
if job.wait_for != None:
|
if job.wait_for != None:
|
||||||
j2 = session.query(Job).get(job.wait_for)
|
j2 = session.query(Job).get(job.wait_for)
|
||||||
@@ -284,7 +282,7 @@ def HandleJobs():
|
|||||||
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
|
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
|
||||||
except Exception as e2:
|
except Exception as e2:
|
||||||
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
||||||
print("PA job manager is waiting jobs")
|
print("INFO: PA job manager is waiting jobs")
|
||||||
pa_eng.state = 'Waiting for new Jobs'
|
pa_eng.state = 'Waiting for new Jobs'
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -336,7 +334,7 @@ def CreateSymlink(job,path):
|
|||||||
return symlink
|
return symlink
|
||||||
|
|
||||||
def AddDir(job, dirname, path_prefix, in_dir):
|
def AddDir(job, dirname, path_prefix, in_dir):
|
||||||
dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0 )
|
dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0, last_hash_date=0 )
|
||||||
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
||||||
e=Entry( name=dirname, type=dtype )
|
e=Entry( name=dirname, type=dtype )
|
||||||
e.dir_details.append(dir)
|
e.dir_details.append(dir)
|
||||||
@@ -359,7 +357,6 @@ def AddFile(job, fname, type_str, fsize, in_dir ):
|
|||||||
|
|
||||||
def JobImportDir(job):
|
def JobImportDir(job):
|
||||||
JobProgressState( job, "In Progress" )
|
JobProgressState( job, "In Progress" )
|
||||||
print("DEBUG: Importing dir")
|
|
||||||
settings = session.query(Settings).first()
|
settings = session.query(Settings).first()
|
||||||
if settings == None:
|
if settings == None:
|
||||||
raise Exception("Cannot create file data with no settings / import path is missing")
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
||||||
@@ -373,11 +370,12 @@ def JobImportDir(job):
|
|||||||
print("DEBUG: Checking Import Directory: {}".format( path ) )
|
print("DEBUG: Checking Import Directory: {}".format( path ) )
|
||||||
if os.path.exists( path ):
|
if os.path.exists( path ):
|
||||||
symlink=CreateSymlink(job,path)
|
symlink=CreateSymlink(job,path)
|
||||||
# DDP: dont want to do add a Dir, if this already exists (and if so, check any modificaiton on fs, since last import)!!!!
|
# dont want to do add a Dir, if this already exists
|
||||||
dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
|
dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
|
||||||
if dir != None:
|
if dir != None:
|
||||||
stat = os.stat( os.path.basename(path[0:-1]) )
|
stat = os.stat( symlink )
|
||||||
if stat.st_ctime < dir.last_import_date:
|
# check any modificaiton on fs, since last import, if none we are done
|
||||||
|
if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date:
|
||||||
print( "DEBUG: Directory has not been altered since the last import, just return" )
|
print( "DEBUG: Directory has not been altered since the last import, just return" )
|
||||||
FinishJob( job, "No new files in directory since the last import")
|
FinishJob( job, "No new files in directory since the last import")
|
||||||
return
|
return
|
||||||
@@ -392,7 +390,7 @@ def JobImportDir(job):
|
|||||||
fname=file.replace(path, "")
|
fname=file.replace(path, "")
|
||||||
stat = os.stat(file)
|
stat = os.stat(file)
|
||||||
dirname=SymlinkName(path, file)
|
dirname=SymlinkName(path, file)
|
||||||
if keep_dirs[dirname].last_import_date == 0 or stat.st_ctime > keep_dirs[dirname].last_import_date:
|
if stat.st_ctime > keep_dirs[dirname].last_import_date:
|
||||||
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
|
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
|
||||||
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) )
|
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) )
|
||||||
if os.path.isdir(file):
|
if os.path.isdir(file):
|
||||||
@@ -427,7 +425,6 @@ def JobImportDir(job):
|
|||||||
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
|
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
|
||||||
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
|
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
|
||||||
session.commit()
|
session.commit()
|
||||||
print("fcnt:", fcnt)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def FilesInDir( path ):
|
def FilesInDir( path ):
|
||||||
@@ -436,7 +433,7 @@ def FilesInDir( path ):
|
|||||||
|
|
||||||
|
|
||||||
def ProcessFilesInDir(job, e):
|
def ProcessFilesInDir(job, e):
|
||||||
print("files in dir - process: {}".format(e.name))
|
print("DEBUG: files in dir - process: {}".format(e.name))
|
||||||
if e.type.name != 'Directory':
|
if e.type.name != 'Directory':
|
||||||
e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
|
e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
|
||||||
if e.type.name == 'Image':
|
if e.type.name == 'Image':
|
||||||
@@ -444,9 +441,17 @@ def ProcessFilesInDir(job, e):
|
|||||||
elif e.type.name == 'Video':
|
elif e.type.name == 'Video':
|
||||||
e.file_details[0].thumbnail = GenVideoThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
|
e.file_details[0].thumbnail = GenVideoThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
|
||||||
else:
|
else:
|
||||||
print("need to better process: {}".format(e))
|
dir=session.query(Dir).filter(Dir.eid==e.id).first()
|
||||||
d=session.query(Dir).filter(Dir.eid==e.id).first()
|
stat = os.stat( dir.path_prefix )
|
||||||
for sub in d.files:
|
# check any modificaiton on fs, since last import, if none we are done
|
||||||
|
if stat.st_ctime < dir.last_hash_date:
|
||||||
|
session.add(dir)
|
||||||
|
dir.last_hash_date = time.time()
|
||||||
|
AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix))
|
||||||
|
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
|
||||||
|
return
|
||||||
|
dir.last_hash_date = time.time()
|
||||||
|
for sub in dir.files:
|
||||||
ProcessFilesInDir(job, sub )
|
ProcessFilesInDir(job, sub )
|
||||||
|
|
||||||
def JobGetFileDetails(job):
|
def JobGetFileDetails(job):
|
||||||
@@ -457,6 +462,14 @@ def JobGetFileDetails(job):
|
|||||||
path=FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
path=FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
||||||
print("DEBUG: JobGetFileDetails for path={}".format( path ) )
|
print("DEBUG: JobGetFileDetails for path={}".format( path ) )
|
||||||
dir=session.query(Dir).filter(Dir.path_prefix==path).first()
|
dir=session.query(Dir).filter(Dir.path_prefix==path).first()
|
||||||
|
stat=os.stat( path )
|
||||||
|
if stat.st_ctime < dir.last_hash_date:
|
||||||
|
session.add(dir)
|
||||||
|
dir.last_hash_date = time.time()
|
||||||
|
AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix))
|
||||||
|
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
|
||||||
|
return
|
||||||
|
dir.last_hash_date = time.time()
|
||||||
job.current_file_num = 0
|
job.current_file_num = 0
|
||||||
job.num_files = dir.num_files
|
job.num_files = dir.num_files
|
||||||
session.commit()
|
session.commit()
|
||||||
@@ -529,17 +542,16 @@ def GenVideoThumbnail(job, file):
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
print("PA job manager starting")
|
print("INFO: PA job manager starting")
|
||||||
try:
|
try:
|
||||||
InitialiseManager()
|
InitialiseManager()
|
||||||
ProcessImportDirs()
|
ProcessImportDirs()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print( "Failed to initialise PA Job Manager: {}".format(e) )
|
print( "ERROR: Failed to initialise PA Job Manager: {}".format(e) )
|
||||||
session.rollback()
|
session.rollback()
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
||||||
s.listen()
|
s.listen()
|
||||||
while True:
|
while True:
|
||||||
conn, addr = s.accept()
|
conn, addr = s.accept()
|
||||||
print("Connection from: {} so HandleJobs".format(addr))
|
|
||||||
HandleJobs()
|
HandleJobs()
|
||||||
|
|||||||
Reference in New Issue
Block a user