change how we calculate active jobs, probably will remove it from pa_job_engine, safer with threads I think. But, mostyle, added in client / server socket comms between web FE and job manager, with better job creation message (including link to job detail) and when you view job detail it auto-refreshes every few seconds until job complete)

This commit is contained in:
2021-01-17 12:35:28 +11:00
parent abff2d8bab
commit 2b9dedb9b9
7 changed files with 52 additions and 21 deletions

View File

@@ -61,7 +61,7 @@ def files():
def scannow():
job=NewJob("scannow" )
st.SetAlert("success")
st.SetMessage("Created job to scan for new files")
st.SetMessage("scanning for new files in:&nbsp;<a href=/job/{}>Job #{}</a>&nbsp;(Click the link to follow progress)".format( job.id, job.id) )
return render_template("base.html")
################################################################################
@@ -71,7 +71,7 @@ def scannow():
def forcescan():
job=NewJob("forcescan" )
st.SetAlert("success")
st.SetMessage("Created job to force scan & rebuild data for files")
st.SetMessage("force scan & rebuild data for files in:&nbsp;<a href=/job/{}>Job #{}</a>&nbsp;(Click the link to follow progress)".format( job.id, job.id) )
return render_template("base.html")
################################################################################

18
job.py
View File

@@ -7,6 +7,8 @@ from sqlalchemy.exc import SQLAlchemyError
from status import st, Status
from datetime import datetime, timedelta
import pytz
import socket
from shared import PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
class Joblog(db.Model):
id = db.Column(db.Integer, db.Sequence('ill_id_seq'), primary_key=True )
@@ -42,9 +44,8 @@ class Job(db.Model):
# Utility classes for Jobs
################################################################################
def GetNumActiveJobs():
ret = db.engine.execute("select num_active_jobs from pa_job_manager").first();
if( ret != None ):
return ret.num_active_jobs
ret = db.engine.execute("select count(1) from job where pa_job_state != 'Completed'").first()
return ret.count
###############################################################################
# NewJob takes a name (which will be matched in pa_job_manager.py to run
@@ -57,6 +58,17 @@ def NewJob(name, num_passes="1", num_files="0", wait_for=None ):
db.session.add(job)
db.session.commit()
try:
print("Waking up PA Job Manager")
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
s.sendall(b'Hello, world')
s.close()
except Exception as e:
st.SetAlert("danger")
st.SetMessage("Failed to connect to job manager, has it crashed? Exception was:{}".format(e))
return job
################################################################################
# /jobs -> show current settings
################################################################################

View File

@@ -16,7 +16,7 @@ from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, Dat
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from shared import DB_URL
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
from datetime import datetime, timedelta
import pytz
import time
@@ -29,6 +29,8 @@ import exifread
import base64
import numpy
import cv2
import socket
import threading
# an Manager, which the Session will use for connection resources
some_engine = create_engine(DB_URL)
@@ -169,6 +171,7 @@ class FileData():
AddLogForJob(job, "Found new file: {}".format(fname) )
else:
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
time.sleep(0.4)
settings.last_import_date = time.time()
session.commit()
return self
@@ -297,14 +300,17 @@ def RunJob(job):
return
def HandleJobs():
print("PA job manager is scanning for jobs")
global pa_eng
print("PA job manager is scanning for new jobs to process")
pa_eng.state = 'Scanning Jobs'
jobs=GetJobs()
pa_eng.num_active_jobs=0
pa_eng.num_completed_jobs=0
for job in jobs:
if job.pa_job_state != 'Completed':
RunJob(job)
threading.Thread(target=RunJob, args=(job,)).start()
print ("HandleJobs setting num_active jobs to +1")
pa_eng.num_active_jobs = pa_eng.num_active_jobs + 1
else:
pa_eng.num_completed_jobs = pa_eng.num_completed_jobs +1
@@ -346,4 +352,10 @@ if __name__ == "__main__":
print( "Failed to initialise PA Job Manager: {}".format(e) )
session.rollback()
HandleJobs()
print("Exiting for now: {}".format( pa_eng ))
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
s.listen()
while True:
conn, addr = s.accept()
print("Connection from: {} so HandleJobs".format(addr))
HandleJobs()

View File

@@ -1 +1,3 @@
DB_URL = 'postgresql+psycopg2://pa:for_now_pa@mara.ddp.net:55432/pa'
PA_JOB_MANAGER_HOST="192.168.0.2"
PA_JOB_MANAGER_PORT=55430

View File

@@ -56,13 +56,10 @@ insert into person_refimg_link values ( 2, 2 );
insert into person_refimg_link values ( 3, 3 );
insert into person_refimg_link values ( 4, 4 );
insert into settings values ( (select nextval('settings_id_seq')), '/home/ddp/src/photoassistant/images_to_process/#c:/Users/cam/Desktop/code/python/photoassistant/photos/#/home/ddp/src/photoassistant/new_img_dir/', 0 );
insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Full Import', 'Completed', 4, 4, 157, 157, 'last_fake_data.img', null, 'Completed' );
insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Full Import', 'Processing AI', 4, 3, 157, 45, 'fake_data.img', null, 'Running' );
insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Scan Files', 'New', 3, 0, 157, 0, '', null, 'New' );
insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Gen Hashes', 'New', 3, 0, 157, 0, '', 3, 'New' );
insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Process AI', 'New', 3, 0, 157, 0, '', 4, 'New' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Started Scanning Files' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Finished Scanning Files' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Started Generating Hashes and thumbnails' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Finished Generating Hashes and thumbnails' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Started Processing AI for "Cam"' );
insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Full Import', 'Completed', 3, 3, 157, 157, 'fake_data.img', null, 'Completed' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 1: Started Scanning Files' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 1: Finished Scanning Files' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 2: Started Generating Hashes and thumbnails' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 2: Finished Generating Hashes and thumbnails' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 3: Started Processing AI' );
insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 3: Finished Processing AI' );

View File

@@ -79,8 +79,9 @@
</div class="nav-item dropdown">
<div class="nav-item ml-5">
<a href="{{url_for('jobs')}}"}}<span class="navbar-text">Active Jobs:
{% if GetNumActiveJobs() != None %}
<span class="badge badge-danger text-white"}}>4</span>
{% set num_active_jobs = GetNumActiveJobs() %}
{% if num_active_jobs > 0 %}
<span class="badge badge-danger text-white"}}>{{num_active_jobs}}</span>
{% else %}
<span class="badge">0</span>
{% endif %}

View File

@@ -67,3 +67,10 @@
</div>
</div class="containter">
{% endblock main_content %}
{% block script_content %}
<script>
{% if job.pa_job_state != "Completed" %}
setTimeout(function(){ window.location.reload(1); }, 3000 )
{% endif %}
</script>
{% endblock script_content %}