diff --git a/TODO b/TODO index 6e7cddf..84c5072 100644 --- a/TODO +++ b/TODO @@ -3,9 +3,6 @@ ### BACKEND *** Need to use thread-safe sessions per Thread, half-assed version did not work - * create A WalkTree( dir, func ) that calls func(file) per file ... for use - * in AI, but also maybe gen hash, etc. - * need a "batch" processing system that uses ionice to minimise load on mara and is threaded and used DB to interact with gunicorn'd pa * pa_job_manager, needs ai code diff --git a/pa_job_manager.py b/pa_job_manager.py index c5091a3..ef274fe 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -349,138 +349,143 @@ def JobImportDir(job): overall_file_cnt=0 fcnt={} keep_dirs={} - for jex in job.extra: - if jex.name =="path": - path = FixPath(jex.value) - AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) - if DEBUG==1: - print("DEBUG: Checking Import Directory: {}".format( path ) ) - if os.path.exists( path ): - symlink=CreateSymlink(job,path) - # dont want to do add a Dir, if this already exists - dir=session.query(Dir).filter(Dir.path_prefix==symlink).first() - if dir != None: - stat = os.stat( symlink ) - # check any modificaiton on fs, since last import, if none we are done - if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date: - if DEBUG==1: - print( "DEBUG: Directory has not been altered since the last import, just return" ) - job.current_file_num=dir.num_files - job.num_files=dir.num_files - FinishJob( job, "No new files in directory since the last import") - return + path=[jex.value for jex in job.extra if jex.name == "path"][0] + AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) + if DEBUG==1: + print("DEBUG: Checking Import Directory: {}".format( path ) ) + if os.path.exists( path ): + symlink=CreateSymlink(job,path) + # dont want to do add a Dir, if this already exists + dir=session.query(Dir).filter(Dir.path_prefix==symlink).first() + if dir != None: + stat = os.stat( symlink ) + # check any modificaiton on fs, since last import, if none we are done + if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date: + if DEBUG==1: + print( "DEBUG: Directory has not been altered since the last import, just return" ) + job.current_file_num=dir.num_files + job.num_files=dir.num_files + FinishJob( job, "No new files in directory since the last import") + return + else: + dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None ) + session.commit() + keep_dirs[dir.path_prefix]=dir + import_dir=dir + fcnt[symlink]=0 + files = sorted(glob.glob(path + '**', recursive=True)) + job.current_file_num=0 + # reduce this by 1, becasuse we skip file == path below + job.num_files=len(files)-1 + session.commit() + for file in sorted(glob.glob(path + '**', recursive=True)): + if file == path: + continue + fname=file.replace(path, "") + stat = os.stat(file) + dirname=SymlinkName(path, file) + if stat.st_ctime > keep_dirs[dirname].last_import_date: + if DEBUG==1: + AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) + print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) + if os.path.isdir(file): + path_prefix=symlink+'/'+fname + dir=AddDir( job, fname, path_prefix, dir ) + fcnt[path_prefix]=0 + keep_dirs[dir.path_prefix]=dir else: - dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None ) - session.commit() - keep_dirs[dir.path_prefix]=dir - import_dir=dir - fcnt[symlink]=0 - files = sorted(glob.glob(path + '**', recursive=True)) - job.current_file_num=0 - # reduce this by 1, becasuse we skip file == path below - job.num_files=len(files)-1 - print("len={}, files={}", len(files), files ) - session.commit() - for file in sorted(glob.glob(path + '**', recursive=True)): - if file == path: - continue - fname=file.replace(path, "") - stat = os.stat(file) + overall_file_cnt=overall_file_cnt+1 dirname=SymlinkName(path, file) - if stat.st_ctime > keep_dirs[dirname].last_import_date: - if DEBUG==1: - AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) - print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) - if os.path.isdir(file): - path_prefix=symlink+'/'+fname - dir=AddDir( job, fname, path_prefix, dir ) - fcnt[path_prefix]=0 - keep_dirs[dir.path_prefix]=dir - else: - overall_file_cnt=overall_file_cnt+1 - dirname=SymlinkName(path, file) - fcnt[dirname]=fcnt[dirname]+1 - if isImage(file): - type_str = 'Image' - elif isVideo(file): - type_str = 'Video' - else: - type_str = 'Unknown' - fsize = round(os.stat(file).st_size/(1024*1024)) - e=AddFile( job, os.path.basename(fname), type_str, fsize, dir ) + fcnt[dirname]=fcnt[dirname]+1 + if isImage(file): + type_str = 'Image' + elif isVideo(file): + type_str = 'Video' else: - if DEBUG==1: - AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) - print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) - FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) ) - for d in keep_dirs: - keep_dirs[d].num_files = fcnt[d] - keep_dirs[d].last_import_date = time.time() - # override this to be all the files in dir & its sub dirs... (used to know how many files in jobs for this import dir) - import_dir.num_files=overall_file_cnt + type_str = 'Unknown' + fsize = round(os.stat(file).st_size/(1024*1024)) + dir=keep_dirs[dirname] + e=AddFile( job, os.path.basename(fname), type_str, fsize, dir ) else: - FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) - for j in session.query(Job).filter(Job.wait_for==job.id).all(): - if DEBUG==1: - print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) - FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) - session.commit() + if DEBUG==1: + AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) + print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) + FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) ) + for d in keep_dirs: + keep_dirs[d].num_files = fcnt[d] + keep_dirs[d].last_import_date = time.time() + # override this to be all the files in dir & its sub dirs... (used to know how many files in jobs for this import dir) + import_dir.num_files=overall_file_cnt + else: + FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) + for j in session.query(Job).filter(Job.wait_for==job.id).all(): + if DEBUG==1: + print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) + FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) + session.commit() return def FilesInDir( path ): d=session.query(Dir).filter(Dir.path_prefix==path).first() return d.files - -def ProcessFilesInDir(job, e): +def GenHashAndThumb(job, e): + e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name ) + if e.type.name == 'Image': + e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) + elif e.type.name == 'Video': + e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) + elif e.type.name == 'Unknown': + job.current_file_num+=1 + return + +def HashAndThumbDirHasNew(dir): + session.add(dir) + stat = os.stat( dir.path_prefix ) + # check any modificaiton on fs, since last import, if none we are done + if stat.st_ctime < dir.last_hash_date: + dir.last_hash_date = time.time() + AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix)) + if DEBUG==1: + print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) + return 0 + dir.last_hash_date = time.time() + return 1 + +def ProcessFilesInDir(job, e, file_func, go_into_dir_func): if DEBUG==1: - print("DEBUG: files in dir - process: {}".format(e.name)) + print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix)) if e.type.name != 'Directory': - e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name ) - if e.type.name == 'Image': - e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) - elif e.type.name == 'Video': - e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name ) - elif e.type.name == 'Unknown': - job.current_file_num+=1 + file_func(job, e) else: dir=session.query(Dir).filter(Dir.eid==e.id).first() - stat = os.stat( dir.path_prefix ) - # check any modificaiton on fs, since last import, if none we are done - if stat.st_ctime < dir.last_hash_date: - session.add(dir) - dir.last_hash_date = time.time() - AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix)) - if DEBUG==1: - print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) + # if this func returns + if not go_into_dir_func(dir): return - dir.last_hash_date = time.time() for sub in dir.files: - ProcessFilesInDir(job, sub ) + ProcessFilesInDir(job, sub, file_func, go_into_dir_func) def JobGetFileDetails(job): JobProgressState( job, "In Progress" ) - for jex in job.extra: - if jex.name =="path": - path=jex.value - path='static'+'/'+os.path.basename(path[0:-1]) - if DEBUG==1: - print("DEBUG: JobGetFileDetails for path={}".format( path ) ) - dir=session.query(Dir).filter(Dir.path_prefix==path).first() - stat=os.stat( path ) - if stat.st_ctime < dir.last_hash_date: - session.add(dir) - dir.last_hash_date = time.time() - FinishJob(job, "{} has not changed since last hashing - finished job".format(dir.path_prefix)) - if DEBUG==1: - print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) - return - dir.last_hash_date = time.time() - job.current_file_num = 0 - job.num_files = dir.num_files - session.commit() - for e in FilesInDir( path ): - ProcessFilesInDir(job, e ) + path=[jex.value for jex in job.extra if jex.name == "path"][0] + path='static'+'/'+os.path.basename(path[0:-1]) + if DEBUG==1: + print("DEBUG: JobGetFileDetails for path={}".format( path ) ) + dir=session.query(Dir).filter(Dir.path_prefix==path).first() + stat=os.stat( path ) + if stat.st_ctime < dir.last_hash_date: + session.add(dir) + dir.last_hash_date = time.time() + FinishJob(job, "{} has not changed since last hashing - finished job".format(dir.path_prefix)) + if DEBUG==1: + print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) + return + dir.last_hash_date = time.time() + job.current_file_num = 0 + job.num_files = dir.num_files + session.commit() + for e in FilesInDir( path ): + ProcessFilesInDir(job, e, GenHashAndThumb, HashAndThumbDirHasNew ) FinishJob(job, "File Details job finished") session.commit() return @@ -546,7 +551,6 @@ def GenVideoThumbnail(job, file): thumbnail = str(thumbnail)[2:-1] return thumbnail - if __name__ == "__main__": print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) ) ProcessImportDirs()