now have a NewJob used in pa_job_mgr and it does not set start_time until it starts, caters for BUG-97 (restarting jobs resetting start times)

This commit is contained in:
2022-07-23 00:33:04 +10:00
parent 750b903a9d
commit 24bcfa0aab
4 changed files with 55 additions and 59 deletions

View File

@@ -455,6 +455,25 @@ class PA_UserState(Base):
return f"<pa_user_dn: {self.pa_user_dn}, path_type: {self.path_type}, noo: {self.noo}, grouping: {self.grouping}, how_many: {self.how_many}, st_offset: {self.st_offset}, size: {self.size}, folders: {self.folders}, root: {self.root}, cwd: {self.cwd}, view_eid: {self.view_eid}, orig_ptype: {self.orig_ptype}, orig_search_term: {self.orig_search_term}, orig_url: {self.orig_url}, current={self.current}, first_eid={self.first_eid}, last_eid={self.last_eid}, num_entries={self.num_entries}>"
##############################################################################
# NewJob(): convenience function to create a job, appropriately
##############################################################################
def NewJob(name, num_files=0, wait_for=None, jex=None, parent_job=None ):
job=Job( name=name, current_file_num=0, current_file='', num_files=num_files,
wait_for=wait_for, state="New", pa_job_state="New", start_time=None, last_update=None)
if jex != None:
for e in jex:
job.extra.append(e)
session.add(job)
session.commit()
if parent_job:
str=f"adding <a href='/job/{job.id}'>job id={job.id} {job.name}</a>"
if job.wait_for:
str+=f" (wait for: {job.wait_for})"
AddLogForJob(parent_job, str )
return job
##############################################################################
# MessageToFE(): sends a specific alert/messasge for a given job via the DB to
# the front end
@@ -563,36 +582,25 @@ def JobsForPaths( parent_job, paths, ptype ):
if p:
cfn=p.num_files
job1=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=cfn )
job1.extra.append( JobExtra( name="path", value=path ) )
job1.extra.append( JobExtra( name="path_type", value=ptype.id ) )
session.add(job1)
session.commit()
if parent_job:
AddLogForJob(parent_job, f"adding <a href='/job/{job1.id}'>job id={job1.id} {job1.name}</a>")
# force commit to make job.id be valid in use of wait_for later
session.commit()
job2=Job(start_time=now, last_update=now, name="getfiledetails", state="New", wait_for=job1.id, pa_job_state="New", current_file_num=0 )
job2.extra.append( JobExtra( name="path", value=path ) )
session.add(job2)
session.commit()
if parent_job:
AddLogForJob(parent_job, f"adding <a href='/job/{job2.id}'>job id={job2.id} {job2.name}</a> (wait for: {job2.wait_for})")
jex=[]
jex.append( JobExtra( name="path", value=path ) )
jex.append( JobExtra( name="path_type", value=ptype.id ) )
job1=NewJob( "importdir", cfn, None, jex, parent_job )
job3=Job(start_time=now, last_update=now, name="run_ai_on_path", state="New", wait_for=job1.id, pa_job_state="New", current_file_num=0 )
job3.extra.append( JobExtra( name="person", value="all" ) )
job3.extra.append( JobExtra( name="path_type", value=ptype.id ) )
session.add(job3)
session.commit()
if parent_job:
AddLogForJob(parent_job, f"adding <a href='/job/{job3.id}'>job id={job3.id} {job3.name}</a> (wait for: {job3.wait_for})")
jex=[]
jex.append( JobExtra( name="path", value=path ) )
job2=NewJob("getfiledetails", 0, job1.id, jex, parent_job )
# can start straight after importdir - job1, does not need details
jex=[]
jex.append( JobExtra( name="person", value="all" ) )
jex.append( JobExtra( name="path_type", value=ptype.id ) )
job3=NewJob("run_ai_on_path", 0, job1.id, jex, parent_job )
# careful here, wait for getfiledetails (job2), the ai job cannot cause a dup
# but it can fail - in which case the checkdup will be withdrawn
job4=NewJob( "checkdups", 0, job2.id, None, parent_job )
# careful here, wait for getfiledetails, the ai job cannot cause a dup, but it can fail - in which case the checkdup will be withdrawn
job4=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
session.add(job4)
session.commit()
if parent_job:
AddLogForJob(parent_job, f"adding <a href='/job/{job4.id}'>job id={job4.id} {job4.name}</a> (wait for: {job4.wait_for})" )
HandleJobs(False)
return
@@ -682,7 +690,9 @@ def AddLogForJob(job, message):
##############################################################################
def RunJob(job):
# session = Session()
job.start_time=datetime.now(pytz.utc)
# only update start_time if we have never set it - stops restarts resetting start_time
if not job.start_time:
job.start_time=datetime.now(pytz.utc)
if job.name =="scannow":
JobScanNow(job)
elif job.name =="forcescan":
@@ -726,6 +736,8 @@ def RunJob(job):
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
job.state=state
job.pa_job_state=pa_job_state
if not job.start_time:
job.start_time=datetime.now(pytz.utc)
job.last_update=datetime.now(pytz.utc)
AddLogForJob(job, last_log)
if job.state=="Failed":
@@ -1816,10 +1828,7 @@ def JobRemoveDups(job):
FinishJob(job, f"Finished removing {dup_cnt} duplicate files" )
# Need to put another checkdups job in now to force / validate we have no dups
now=datetime.now(pytz.utc)
next_job=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(next_job)
session.commit()
next_job=NewJob( "checkdups" )
AddLogForJob(job, "adding <a href='/job/{}'>job id={} {}</a> to confirm there are no more duplicates".format( next_job.id, next_job.id, next_job.name ) )
return
@@ -1849,9 +1858,7 @@ def JobMoveFiles(job):
if 'eid-' in jex.name:
move_me=session.query(Entry).get(jex.value)
MoveEntriesToOtherFolder( job, move_me, dst_storage_path, f"{prefix}{suffix}" )
now=datetime.now(pytz.utc)
next_job=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(next_job)
next_job=NewJob( "checkdups" )
MessageToFE( job.id, "success", "Completed (move of selected files)" )
FinishJob(job, f"Finished move selected file(s)")
return
@@ -1866,9 +1873,7 @@ def JobDeleteFiles(job):
if 'eid-' in jex.name:
del_me=session.query(Entry).join(File).filter(Entry.id==jex.value).first()
MoveFileToRecycleBin(job,del_me)
now=datetime.now(pytz.utc)
next_job=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(next_job)
next_job=NewJob( "checkdups" )
MessageToFE( job.id, "success", "Completed (delete of selected files)" )
FinishJob(job, f"Finished deleting selected file(s)")
return
@@ -1883,9 +1888,7 @@ def JobRestoreFiles(job):
if 'eid-' in jex.name:
restore_me=session.query(Entry).join(File).filter(Entry.id==jex.value).first()
RestoreFile(job,restore_me)
now=datetime.now(pytz.utc)
next_job=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(next_job)
next_job=NewJob( "checkdups" )
MessageToFE( job.id, "success", "Completed (restore of selected files)" )
FinishJob(job, f"Finished restoring selected file(s)")
return
@@ -1897,8 +1900,7 @@ def JobRestoreFiles(job):
####################################################################################################################################
def InitialValidationChecks():
now=datetime.now(pytz.utc)
job=Job(start_time=now, last_update=now, name="init", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(job)
job=NewJob( "init" )
settings = session.query(Settings).first()
AddLogForJob(job, f"INFO: Starting Initial Validation checks...")
JobProgressState( job, "In Progress" )
@@ -1913,9 +1915,6 @@ def InitialValidationChecks():
AddLogForJob(job, "WARNING: IF the files in the bin are in the DB (succeeded from GUI deletes) then this is okay, otherwise you should delete contents form the recycle bin and restart the job manager)" )
# create symlink and Path/Dir if needed
ProcessRecycleBinDir(job)
# bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
# if not bin_path:
# ProcessRecycleBinDir(job)
else:
AddLogForJob(job, "ERROR: The bin path in settings does not exist - Please fix now");
sp_exists=0
@@ -2143,8 +2142,7 @@ def CheckAndRunBinClean():
now=datetime.now(pytz.utc)
if not j or (now-j.last_update).days >= settings.scheduled_bin_cleanup:
print( f"INFO: Should force clean up bin path, del files older than {settings.bin_cleanup_file_age} days old" )
job=Job(start_time=now, last_update=now, name="clean_bin", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(job)
job=NewJob( "clean_bin" )
created_jobs=True
return created_jobs
@@ -2164,13 +2162,11 @@ def ScheduledJobs():
now=datetime.now(pytz.utc)
if ndays_since_last_im_scan >= settings.scheduled_import_scan:
print( f"INFO: Time to force an import scan, last scan was {ndays_since_last_im_scan} days ago" )
job=Job(start_time=now, last_update=now, name="scannow", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(job)
job=NewJob( "scannow" )
created_jobs=True
if ndays_since_last_st_scan >= settings.scheduled_storage_scan:
print( f"INFO: Time to force a storage scan, last scan was {ndays_since_last_st_scan}" )
job=Job(start_time=now, last_update=now, name="scan_sp", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
session.add(job)
job=NewJob( "scan_sp" )
created_jobs=True
if CheckAndRunBinClean():
created_jobs=True