ProcessFilesInDir - now is generic and takes funcs to call on files

This commit is contained in:
2021-01-21 22:22:42 +11:00
parent 60953553a2
commit 2f3d7c1ae2
2 changed files with 118 additions and 117 deletions

3
TODO
View File

@@ -3,9 +3,6 @@
### BACKEND ### BACKEND
*** Need to use thread-safe sessions per Thread, half-assed version did not work *** Need to use thread-safe sessions per Thread, half-assed version did not work
* create A WalkTree( dir, func ) that calls func(file) per file ... for use
* in AI, but also maybe gen hash, etc.
* need a "batch" processing system that uses ionice to minimise load on mara and is threaded and used DB to interact with gunicorn'd pa * need a "batch" processing system that uses ionice to minimise load on mara and is threaded and used DB to interact with gunicorn'd pa
* pa_job_manager, needs ai code * pa_job_manager, needs ai code

View File

@@ -349,138 +349,143 @@ def JobImportDir(job):
overall_file_cnt=0 overall_file_cnt=0
fcnt={} fcnt={}
keep_dirs={} keep_dirs={}
for jex in job.extra: path=[jex.value for jex in job.extra if jex.name == "path"][0]
if jex.name =="path": AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
path = FixPath(jex.value) if DEBUG==1:
AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) print("DEBUG: Checking Import Directory: {}".format( path ) )
if DEBUG==1: if os.path.exists( path ):
print("DEBUG: Checking Import Directory: {}".format( path ) ) symlink=CreateSymlink(job,path)
if os.path.exists( path ): # dont want to do add a Dir, if this already exists
symlink=CreateSymlink(job,path) dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
# dont want to do add a Dir, if this already exists if dir != None:
dir=session.query(Dir).filter(Dir.path_prefix==symlink).first() stat = os.stat( symlink )
if dir != None: # check any modificaiton on fs, since last import, if none we are done
stat = os.stat( symlink ) if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date:
# check any modificaiton on fs, since last import, if none we are done if DEBUG==1:
if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date: print( "DEBUG: Directory has not been altered since the last import, just return" )
if DEBUG==1: job.current_file_num=dir.num_files
print( "DEBUG: Directory has not been altered since the last import, just return" ) job.num_files=dir.num_files
job.current_file_num=dir.num_files FinishJob( job, "No new files in directory since the last import")
job.num_files=dir.num_files return
FinishJob( job, "No new files in directory since the last import") else:
return dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None )
session.commit()
keep_dirs[dir.path_prefix]=dir
import_dir=dir
fcnt[symlink]=0
files = sorted(glob.glob(path + '**', recursive=True))
job.current_file_num=0
# reduce this by 1, becasuse we skip file == path below
job.num_files=len(files)-1
session.commit()
for file in sorted(glob.glob(path + '**', recursive=True)):
if file == path:
continue
fname=file.replace(path, "")
stat = os.stat(file)
dirname=SymlinkName(path, file)
if stat.st_ctime > keep_dirs[dirname].last_import_date:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) )
if os.path.isdir(file):
path_prefix=symlink+'/'+fname
dir=AddDir( job, fname, path_prefix, dir )
fcnt[path_prefix]=0
keep_dirs[dir.path_prefix]=dir
else: else:
dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None ) overall_file_cnt=overall_file_cnt+1
session.commit()
keep_dirs[dir.path_prefix]=dir
import_dir=dir
fcnt[symlink]=0
files = sorted(glob.glob(path + '**', recursive=True))
job.current_file_num=0
# reduce this by 1, becasuse we skip file == path below
job.num_files=len(files)-1
print("len={}, files={}", len(files), files )
session.commit()
for file in sorted(glob.glob(path + '**', recursive=True)):
if file == path:
continue
fname=file.replace(path, "")
stat = os.stat(file)
dirname=SymlinkName(path, file) dirname=SymlinkName(path, file)
if stat.st_ctime > keep_dirs[dirname].last_import_date: fcnt[dirname]=fcnt[dirname]+1
if DEBUG==1: if isImage(file):
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) type_str = 'Image'
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) elif isVideo(file):
if os.path.isdir(file): type_str = 'Video'
path_prefix=symlink+'/'+fname
dir=AddDir( job, fname, path_prefix, dir )
fcnt[path_prefix]=0
keep_dirs[dir.path_prefix]=dir
else:
overall_file_cnt=overall_file_cnt+1
dirname=SymlinkName(path, file)
fcnt[dirname]=fcnt[dirname]+1
if isImage(file):
type_str = 'Image'
elif isVideo(file):
type_str = 'Video'
else:
type_str = 'Unknown'
fsize = round(os.stat(file).st_size/(1024*1024))
e=AddFile( job, os.path.basename(fname), type_str, fsize, dir )
else: else:
if DEBUG==1: type_str = 'Unknown'
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) fsize = round(os.stat(file).st_size/(1024*1024))
print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) dir=keep_dirs[dirname]
FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) ) e=AddFile( job, os.path.basename(fname), type_str, fsize, dir )
for d in keep_dirs:
keep_dirs[d].num_files = fcnt[d]
keep_dirs[d].last_import_date = time.time()
# override this to be all the files in dir & its sub dirs... (used to know how many files in jobs for this import dir)
import_dir.num_files=overall_file_cnt
else: else:
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) if DEBUG==1:
for j in session.query(Job).filter(Job.wait_for==job.id).all(): AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
if DEBUG==1: print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) )
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) for d in keep_dirs:
session.commit() keep_dirs[d].num_files = fcnt[d]
keep_dirs[d].last_import_date = time.time()
# override this to be all the files in dir & its sub dirs... (used to know how many files in jobs for this import dir)
import_dir.num_files=overall_file_cnt
else:
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
for j in session.query(Job).filter(Job.wait_for==job.id).all():
if DEBUG==1:
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
session.commit()
return return
def FilesInDir( path ): def FilesInDir( path ):
d=session.query(Dir).filter(Dir.path_prefix==path).first() d=session.query(Dir).filter(Dir.path_prefix==path).first()
return d.files return d.files
def ProcessFilesInDir(job, e): def GenHashAndThumb(job, e):
e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name )
if e.type.name == 'Image':
e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Video':
e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Unknown':
job.current_file_num+=1
return
def HashAndThumbDirHasNew(dir):
session.add(dir)
stat = os.stat( dir.path_prefix )
# check any modificaiton on fs, since last import, if none we are done
if stat.st_ctime < dir.last_hash_date:
dir.last_hash_date = time.time()
AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix))
if DEBUG==1:
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
return 0
dir.last_hash_date = time.time()
return 1
def ProcessFilesInDir(job, e, file_func, go_into_dir_func):
if DEBUG==1: if DEBUG==1:
print("DEBUG: files in dir - process: {}".format(e.name)) print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix))
if e.type.name != 'Directory': if e.type.name != 'Directory':
e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name ) file_func(job, e)
if e.type.name == 'Image':
e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Video':
e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Unknown':
job.current_file_num+=1
else: else:
dir=session.query(Dir).filter(Dir.eid==e.id).first() dir=session.query(Dir).filter(Dir.eid==e.id).first()
stat = os.stat( dir.path_prefix ) # if this func returns
# check any modificaiton on fs, since last import, if none we are done if not go_into_dir_func(dir):
if stat.st_ctime < dir.last_hash_date:
session.add(dir)
dir.last_hash_date = time.time()
AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix))
if DEBUG==1:
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
return return
dir.last_hash_date = time.time()
for sub in dir.files: for sub in dir.files:
ProcessFilesInDir(job, sub ) ProcessFilesInDir(job, sub, file_func, go_into_dir_func)
def JobGetFileDetails(job): def JobGetFileDetails(job):
JobProgressState( job, "In Progress" ) JobProgressState( job, "In Progress" )
for jex in job.extra: path=[jex.value for jex in job.extra if jex.name == "path"][0]
if jex.name =="path": path='static'+'/'+os.path.basename(path[0:-1])
path=jex.value if DEBUG==1:
path='static'+'/'+os.path.basename(path[0:-1]) print("DEBUG: JobGetFileDetails for path={}".format( path ) )
if DEBUG==1: dir=session.query(Dir).filter(Dir.path_prefix==path).first()
print("DEBUG: JobGetFileDetails for path={}".format( path ) ) stat=os.stat( path )
dir=session.query(Dir).filter(Dir.path_prefix==path).first() if stat.st_ctime < dir.last_hash_date:
stat=os.stat( path ) session.add(dir)
if stat.st_ctime < dir.last_hash_date: dir.last_hash_date = time.time()
session.add(dir) FinishJob(job, "{} has not changed since last hashing - finished job".format(dir.path_prefix))
dir.last_hash_date = time.time() if DEBUG==1:
FinishJob(job, "{} has not changed since last hashing - finished job".format(dir.path_prefix)) print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
if DEBUG==1: return
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) dir.last_hash_date = time.time()
return job.current_file_num = 0
dir.last_hash_date = time.time() job.num_files = dir.num_files
job.current_file_num = 0 session.commit()
job.num_files = dir.num_files for e in FilesInDir( path ):
session.commit() ProcessFilesInDir(job, e, GenHashAndThumb, HashAndThumbDirHasNew )
for e in FilesInDir( path ):
ProcessFilesInDir(job, e )
FinishJob(job, "File Details job finished") FinishJob(job, "File Details job finished")
session.commit() session.commit()
return return
@@ -546,7 +551,6 @@ def GenVideoThumbnail(job, file):
thumbnail = str(thumbnail)[2:-1] thumbnail = str(thumbnail)[2:-1]
return thumbnail return thumbnail
if __name__ == "__main__": if __name__ == "__main__":
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) ) print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
ProcessImportDirs() ProcessImportDirs()