diff --git a/pa_job_manager.py b/pa_job_manager.py index 6683439..eee2ed7 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -17,6 +17,7 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import scoped_session from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT from datetime import datetime, timedelta import pytz @@ -33,15 +34,18 @@ import cv2 import socket import threading -DEBUG=1 +DEBUG=0 # an Manager, which the Session will use for connection resources some_engine = create_engine(DB_URL) # create a configured "Session" class -Session = sessionmaker(bind=some_engine) +#Session = sessionmaker(bind=some_engine) # create a Session + +session_factory = sessionmaker(bind=some_engine) +Session = scoped_session(session_factory) session = Session() Base = declarative_base() @@ -233,6 +237,7 @@ def AddLogForJob(job, message, current_file=''): return def RunJob(job): + session = Session() if job.name =="scannow": JobScanNow(job) elif job.name =="forcescan": @@ -244,6 +249,7 @@ def RunJob(job): else: print("ERROR: Requested to process unknown job type: {}".format(job.name)) # okay, we finished a job, so check for any jobs that are dependant on this and run them... +# session.close() HandleJobs() return @@ -278,7 +284,8 @@ def HandleJobs(): RunJob(job) else: try: - threading.Thread(target=RunJob, args=(job,)).start() + RunJob(job) + # threading.Thread(target=RunJob, args=(job,)).start() except Exception as e: try: MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) @@ -343,7 +350,8 @@ def AddDir(job, dirname, path_prefix, in_dir): # this occurs when we Add the actual Dir for the import_path if in_dir: e.in_dir.append(in_dir) - AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) ) + if DEBUG==1: + AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) ) session.add(e) return dir @@ -369,7 +377,8 @@ def JobImportDir(job): if jex.name =="path": path = FixPath(jex.value) AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) - print("DEBUG: Checking Import Directory: {}".format( path ) ) + if DEBUG==1: + print("DEBUG: Checking Import Directory: {}".format( path ) ) if os.path.exists( path ): symlink=CreateSymlink(job,path) # dont want to do add a Dir, if this already exists @@ -378,7 +387,10 @@ def JobImportDir(job): stat = os.stat( symlink ) # check any modificaiton on fs, since last import, if none we are done if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date: - print( "DEBUG: Directory has not been altered since the last import, just return" ) + if DEBUG==1: + print( "DEBUG: Directory has not been altered since the last import, just return" ) + job.current_file_num=dir.num_files + job.num_files=dir.num_files FinishJob( job, "No new files in directory since the last import") return else: @@ -386,6 +398,12 @@ def JobImportDir(job): keep_dirs[dir.path_prefix]=dir import_dir=dir fcnt[symlink]=0 + files = sorted(glob.glob(path + '**', recursive=True)) + job.current_file_num=0 + # reduce this by 1, becasuse we skip file == path below + job.num_files=len(files)-1 + print("len={}, files={}", len(files), files ) + session.commit() for file in sorted(glob.glob(path + '**', recursive=True)): if file == path: continue @@ -393,8 +411,9 @@ def JobImportDir(job): stat = os.stat(file) dirname=SymlinkName(path, file) if stat.st_ctime > keep_dirs[dirname].last_import_date: - AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) - print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) + if DEBUG==1: + AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) + print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) if os.path.isdir(file): path_prefix=os.path.join(symlink,fname) dir=AddDir( job, fname, path_prefix, dir ) @@ -413,8 +432,9 @@ def JobImportDir(job): fsize = round(os.stat(file).st_size/(1024*1024)) e=AddFile( job, os.path.basename(fname), type_str, fsize, dir ) else: - AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) - print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) + if DEBUG==1: + AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) + print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) ) for d in keep_dirs: keep_dirs[d].num_files = fcnt[d] @@ -424,7 +444,8 @@ def JobImportDir(job): else: FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) for j in session.query(Job).filter(Job.wait_for==job.id).all(): - print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) + if DEBUG==1: + print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) session.commit() return @@ -435,7 +456,8 @@ def FilesInDir( path ): def ProcessFilesInDir(job, e): - print("DEBUG: files in dir - process: {}".format(e.name)) + if DEBUG==1: + print("DEBUG: files in dir - process: {}".format(e.name)) if e.type.name != 'Directory': e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) if e.type.name == 'Image': @@ -450,7 +472,8 @@ def ProcessFilesInDir(job, e): session.add(dir) dir.last_hash_date = time.time() AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix)) - print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) + if DEBUG==1: + print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) return dir.last_hash_date = time.time() for sub in dir.files: @@ -462,14 +485,16 @@ def JobGetFileDetails(job): if jex.name =="path": path=jex.value path=FixPath('static/{}'.format( os.path.basename(path[0:-1]))) - print("DEBUG: JobGetFileDetails for path={}".format( path ) ) + if DEBUG==1: + print("DEBUG: JobGetFileDetails for path={}".format( path ) ) dir=session.query(Dir).filter(Dir.path_prefix==path).first() stat=os.stat( path ) if stat.st_ctime < dir.last_hash_date: session.add(dir) dir.last_hash_date = time.time() AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix)) - print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) + if DEBUG==1: + print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) return dir.last_hash_date = time.time() job.current_file_num = 0 @@ -477,7 +502,7 @@ def JobGetFileDetails(job): session.commit() for e in FilesInDir( path ): ProcessFilesInDir(job, e ) - FinishJob(job, "DEBUG: File Details processed") + FinishJob(job, "File Details job finished") session.commit() return