cleaned up debugs, used if debug before print debug, also ran into threading issues, so for now RunJob is non-threaded

This commit is contained in:
2021-01-20 23:22:44 +11:00
parent 153be75302
commit 2cc023bd10

View File

@@ -17,6 +17,7 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
from datetime import datetime, timedelta from datetime import datetime, timedelta
import pytz import pytz
@@ -33,15 +34,18 @@ import cv2
import socket import socket
import threading import threading
DEBUG=1 DEBUG=0
# an Manager, which the Session will use for connection resources # an Manager, which the Session will use for connection resources
some_engine = create_engine(DB_URL) some_engine = create_engine(DB_URL)
# create a configured "Session" class # create a configured "Session" class
Session = sessionmaker(bind=some_engine) #Session = sessionmaker(bind=some_engine)
# create a Session # create a Session
session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)
session = Session() session = Session()
Base = declarative_base() Base = declarative_base()
@@ -233,6 +237,7 @@ def AddLogForJob(job, message, current_file=''):
return return
def RunJob(job): def RunJob(job):
session = Session()
if job.name =="scannow": if job.name =="scannow":
JobScanNow(job) JobScanNow(job)
elif job.name =="forcescan": elif job.name =="forcescan":
@@ -244,6 +249,7 @@ def RunJob(job):
else: else:
print("ERROR: Requested to process unknown job type: {}".format(job.name)) print("ERROR: Requested to process unknown job type: {}".format(job.name))
# okay, we finished a job, so check for any jobs that are dependant on this and run them... # okay, we finished a job, so check for any jobs that are dependant on this and run them...
# session.close()
HandleJobs() HandleJobs()
return return
@@ -278,7 +284,8 @@ def HandleJobs():
RunJob(job) RunJob(job)
else: else:
try: try:
threading.Thread(target=RunJob, args=(job,)).start() RunJob(job)
# threading.Thread(target=RunJob, args=(job,)).start()
except Exception as e: except Exception as e:
try: try:
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
@@ -343,6 +350,7 @@ def AddDir(job, dirname, path_prefix, in_dir):
# this occurs when we Add the actual Dir for the import_path # this occurs when we Add the actual Dir for the import_path
if in_dir: if in_dir:
e.in_dir.append(in_dir) e.in_dir.append(in_dir)
if DEBUG==1:
AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) ) AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) )
session.add(e) session.add(e)
return dir return dir
@@ -369,6 +377,7 @@ def JobImportDir(job):
if jex.name =="path": if jex.name =="path":
path = FixPath(jex.value) path = FixPath(jex.value)
AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
if DEBUG==1:
print("DEBUG: Checking Import Directory: {}".format( path ) ) print("DEBUG: Checking Import Directory: {}".format( path ) )
if os.path.exists( path ): if os.path.exists( path ):
symlink=CreateSymlink(job,path) symlink=CreateSymlink(job,path)
@@ -378,7 +387,10 @@ def JobImportDir(job):
stat = os.stat( symlink ) stat = os.stat( symlink )
# check any modificaiton on fs, since last import, if none we are done # check any modificaiton on fs, since last import, if none we are done
if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date: if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date:
if DEBUG==1:
print( "DEBUG: Directory has not been altered since the last import, just return" ) print( "DEBUG: Directory has not been altered since the last import, just return" )
job.current_file_num=dir.num_files
job.num_files=dir.num_files
FinishJob( job, "No new files in directory since the last import") FinishJob( job, "No new files in directory since the last import")
return return
else: else:
@@ -386,6 +398,12 @@ def JobImportDir(job):
keep_dirs[dir.path_prefix]=dir keep_dirs[dir.path_prefix]=dir
import_dir=dir import_dir=dir
fcnt[symlink]=0 fcnt[symlink]=0
files = sorted(glob.glob(path + '**', recursive=True))
job.current_file_num=0
# reduce this by 1, becasuse we skip file == path below
job.num_files=len(files)-1
print("len={}, files={}", len(files), files )
session.commit()
for file in sorted(glob.glob(path + '**', recursive=True)): for file in sorted(glob.glob(path + '**', recursive=True)):
if file == path: if file == path:
continue continue
@@ -393,6 +411,7 @@ def JobImportDir(job):
stat = os.stat(file) stat = os.stat(file)
dirname=SymlinkName(path, file) dirname=SymlinkName(path, file)
if stat.st_ctime > keep_dirs[dirname].last_import_date: if stat.st_ctime > keep_dirs[dirname].last_import_date:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) )
if os.path.isdir(file): if os.path.isdir(file):
@@ -413,6 +432,7 @@ def JobImportDir(job):
fsize = round(os.stat(file).st_size/(1024*1024)) fsize = round(os.stat(file).st_size/(1024*1024))
e=AddFile( job, os.path.basename(fname), type_str, fsize, dir ) e=AddFile( job, os.path.basename(fname), type_str, fsize, dir )
else: else:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) ) FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) )
@@ -424,6 +444,7 @@ def JobImportDir(job):
else: else:
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
for j in session.query(Job).filter(Job.wait_for==job.id).all(): for j in session.query(Job).filter(Job.wait_for==job.id).all():
if DEBUG==1:
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
session.commit() session.commit()
@@ -435,6 +456,7 @@ def FilesInDir( path ):
def ProcessFilesInDir(job, e): def ProcessFilesInDir(job, e):
if DEBUG==1:
print("DEBUG: files in dir - process: {}".format(e.name)) print("DEBUG: files in dir - process: {}".format(e.name))
if e.type.name != 'Directory': if e.type.name != 'Directory':
e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
@@ -450,6 +472,7 @@ def ProcessFilesInDir(job, e):
session.add(dir) session.add(dir)
dir.last_hash_date = time.time() dir.last_hash_date = time.time()
AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix)) AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix))
if DEBUG==1:
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
return return
dir.last_hash_date = time.time() dir.last_hash_date = time.time()
@@ -462,6 +485,7 @@ def JobGetFileDetails(job):
if jex.name =="path": if jex.name =="path":
path=jex.value path=jex.value
path=FixPath('static/{}'.format( os.path.basename(path[0:-1]))) path=FixPath('static/{}'.format( os.path.basename(path[0:-1])))
if DEBUG==1:
print("DEBUG: JobGetFileDetails for path={}".format( path ) ) print("DEBUG: JobGetFileDetails for path={}".format( path ) )
dir=session.query(Dir).filter(Dir.path_prefix==path).first() dir=session.query(Dir).filter(Dir.path_prefix==path).first()
stat=os.stat( path ) stat=os.stat( path )
@@ -469,6 +493,7 @@ def JobGetFileDetails(job):
session.add(dir) session.add(dir)
dir.last_hash_date = time.time() dir.last_hash_date = time.time()
AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix)) AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix))
if DEBUG==1:
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
return return
dir.last_hash_date = time.time() dir.last_hash_date = time.time()
@@ -477,7 +502,7 @@ def JobGetFileDetails(job):
session.commit() session.commit()
for e in FilesInDir( path ): for e in FilesInDir( path ):
ProcessFilesInDir(job, e ) ProcessFilesInDir(job, e )
FinishJob(job, "DEBUG: File Details processed") FinishJob(job, "File Details job finished")
session.commit() session.commit()
return return