optimise to not go past scan job if it does not find any new files, also now print out if we find new files into scan log, fix up first attempt at debug being off/on with "production" or not
This commit is contained in:
@@ -13,11 +13,6 @@
|
||||
|
||||
# pylint: disable=no-member
|
||||
|
||||
|
||||
# global debug setting
|
||||
if os.environ['FLASK_ENV'] != "production":
|
||||
DEBUG=True
|
||||
|
||||
### SQLALCHEMY IMPORTS ###
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean, func
|
||||
@@ -52,6 +47,10 @@ import sys
|
||||
import json
|
||||
|
||||
|
||||
# global debug setting
|
||||
if 'FLASK_ENV' not in os.environ or os.environ['FLASK_ENV'] != "production":
|
||||
DEBUG=True
|
||||
|
||||
# this is required to handle the duplicate processing code
|
||||
sys.setrecursionlimit(50000)
|
||||
|
||||
@@ -685,16 +684,6 @@ def RunJob(job):
|
||||
HandleJobs(False)
|
||||
return
|
||||
|
||||
##############################################################################
|
||||
# CancelJob(): cancel this job, and if any other job is waiting on this job,
|
||||
# then cancel it too (this is done recursively)
|
||||
##############################################################################
|
||||
def CancelJob(job,id):
|
||||
for j in session.query(Job).filter(Job.wait_for==id).all():
|
||||
FinishJob(j, f"Job (#{j.id}) has been withdrawn as the job being waited for #{job.id} failed", "Withdrawn" )
|
||||
CancelJob(j, j.id)
|
||||
return
|
||||
|
||||
##############################################################################
|
||||
# FinishJob(): finish this job off (if no overrides), its just marked completed
|
||||
##############################################################################
|
||||
@@ -704,7 +693,7 @@ def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
||||
job.last_update=datetime.now(pytz.utc)
|
||||
AddLogForJob(job, last_log)
|
||||
if job.state=="Failed":
|
||||
CancelJob(job,job.id)
|
||||
WithdrawDependantJobs( job, job.id, "failed" ):
|
||||
session.commit()
|
||||
if DEBUG:
|
||||
print( f"DEBUG: {last_log}" )
|
||||
@@ -1254,6 +1243,17 @@ def AddJexToDependantJobs(job,name,value):
|
||||
AddJexToDependantJobs(j, name, value)
|
||||
return
|
||||
|
||||
####################################################################################################################################
|
||||
# WithdrawDependantJobs(): cancel/withdraw this dependant job, and if any other job is waiting on it then
|
||||
# cancel it too (this is done recursively)
|
||||
####################################################################################################################################
|
||||
def WithdrawDependantJobs( job, id, reason ):
|
||||
for j in session.query(Job).filter(Job.wait_for==id).all():
|
||||
FinishJob(j, f"Job (#{j.id}) has been withdrawn -- #{job.id} {reason}", "Withdrawn" )
|
||||
WithdrawDependantJobs(j, j.id, reason)
|
||||
return
|
||||
|
||||
|
||||
####################################################################################################################################
|
||||
# JobImportDir(): job that scan import dir and processes entries in there - key function that uses os.walk() to traverse the
|
||||
# file system and calls AddFile()/AddDir() as necessary
|
||||
@@ -1268,6 +1268,7 @@ def JobImportDir(job):
|
||||
if DEBUG:
|
||||
print( f"DEBUG: Checking Directory: {path}" )
|
||||
if not os.path.exists( path ):
|
||||
WithdrawDependantJobs( job, job.id, "scan job found no new files to process" )
|
||||
FinishJob( job, f"Finished Importing: {path} -- Path does not exist", "Failed" )
|
||||
return
|
||||
symlink=SymlinkName(ptype.name, path, path)
|
||||
@@ -1302,6 +1303,9 @@ def JobImportDir(job):
|
||||
AddLogForJob(job, f"Found {overall_file_cnt} file(s) to process")
|
||||
session.commit()
|
||||
|
||||
# we will store this into the job extras as the final result of whether this job found any new files,
|
||||
# if it did not, then the dependant jobs dont need to really run - use to optimise them
|
||||
found_new_files=0
|
||||
# root == path of dir, files are in dir... subdirs are in dir
|
||||
for root, subdirs, files in ftree:
|
||||
# already create root above to work out num_files for whole os.walk
|
||||
@@ -1331,6 +1335,7 @@ def JobImportDir(job):
|
||||
|
||||
year, month, day, woy = GetDateFromFile(fname, stat)
|
||||
e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy )
|
||||
found_new_files += 1
|
||||
else:
|
||||
if DEBUG:
|
||||
print( f"DEBUG: { basename} - {stat.st_ctime} is OLDER than {dir.last_import_date}" )
|
||||
@@ -1344,7 +1349,9 @@ def JobImportDir(job):
|
||||
|
||||
rm_cnt=HandleAnyFSDeletions(job)
|
||||
|
||||
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)")
|
||||
if found_new_files == 0:
|
||||
WithdrawDependantJobs( job, job.id, "scan job found no new files to process" )
|
||||
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Found {found_new_files} new files, Removed {rm_cnt} file(s)")
|
||||
return
|
||||
|
||||
####################################################################################################################################
|
||||
|
||||
Reference in New Issue
Block a user