From 2b9dedb9b939571d5b8097eb6a340fb2d147cbca Mon Sep 17 00:00:00 2001 From: Damien De Paoli Date: Sun, 17 Jan 2021 12:35:28 +1100 Subject: [PATCH] change how we calculate active jobs, probably will remove it from pa_job_engine, safer with threads I think. But, mostyle, added in client / server socket comms between web FE and job manager, with better job creation message (including link to job detail) and when you view job detail it auto-refreshes every few seconds until job complete) --- files.py | 4 ++-- job.py | 18 +++++++++++++++--- pa_job_manager.py | 20 ++++++++++++++++---- shared.py | 2 ++ tables.sql | 17 +++++++---------- templates/base.html | 5 +++-- templates/joblog.html | 7 +++++++ 7 files changed, 52 insertions(+), 21 deletions(-) diff --git a/files.py b/files.py index 9cafe7d..2275034 100644 --- a/files.py +++ b/files.py @@ -61,7 +61,7 @@ def files(): def scannow(): job=NewJob("scannow" ) st.SetAlert("success") - st.SetMessage("Created job to scan for new files") + st.SetMessage("scanning for new files in: Job #{} (Click the link to follow progress)".format( job.id, job.id) ) return render_template("base.html") ################################################################################ @@ -71,7 +71,7 @@ def scannow(): def forcescan(): job=NewJob("forcescan" ) st.SetAlert("success") - st.SetMessage("Created job to force scan & rebuild data for files") + st.SetMessage("force scan & rebuild data for files in: Job #{} (Click the link to follow progress)".format( job.id, job.id) ) return render_template("base.html") ################################################################################ diff --git a/job.py b/job.py index f505bb6..e19ce4f 100644 --- a/job.py +++ b/job.py @@ -7,6 +7,8 @@ from sqlalchemy.exc import SQLAlchemyError from status import st, Status from datetime import datetime, timedelta import pytz +import socket +from shared import PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT class Joblog(db.Model): id = db.Column(db.Integer, db.Sequence('ill_id_seq'), primary_key=True ) @@ -42,9 +44,8 @@ class Job(db.Model): # Utility classes for Jobs ################################################################################ def GetNumActiveJobs(): - ret = db.engine.execute("select num_active_jobs from pa_job_manager").first(); - if( ret != None ): - return ret.num_active_jobs + ret = db.engine.execute("select count(1) from job where pa_job_state != 'Completed'").first() + return ret.count ############################################################################### # NewJob takes a name (which will be matched in pa_job_manager.py to run @@ -57,6 +58,17 @@ def NewJob(name, num_passes="1", num_files="0", wait_for=None ): db.session.add(job) db.session.commit() + try: + print("Waking up PA Job Manager") + s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) + s.sendall(b'Hello, world') + s.close() + except Exception as e: + st.SetAlert("danger") + st.SetMessage("Failed to connect to job manager, has it crashed? Exception was:{}".format(e)) + return job + ################################################################################ # /jobs -> show current settings ################################################################################ diff --git a/pa_job_manager.py b/pa_job_manager.py index 923f4a5..809a256 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -16,7 +16,7 @@ from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, Dat from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker -from shared import DB_URL +from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT from datetime import datetime, timedelta import pytz import time @@ -29,6 +29,8 @@ import exifread import base64 import numpy import cv2 +import socket +import threading # an Manager, which the Session will use for connection resources some_engine = create_engine(DB_URL) @@ -169,6 +171,7 @@ class FileData(): AddLogForJob(job, "Found new file: {}".format(fname) ) else: AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file ) + time.sleep(0.4) settings.last_import_date = time.time() session.commit() return self @@ -297,14 +300,17 @@ def RunJob(job): return def HandleJobs(): - print("PA job manager is scanning for jobs") + global pa_eng + + print("PA job manager is scanning for new jobs to process") pa_eng.state = 'Scanning Jobs' jobs=GetJobs() pa_eng.num_active_jobs=0 pa_eng.num_completed_jobs=0 for job in jobs: if job.pa_job_state != 'Completed': - RunJob(job) + threading.Thread(target=RunJob, args=(job,)).start() + print ("HandleJobs setting num_active jobs to +1") pa_eng.num_active_jobs = pa_eng.num_active_jobs + 1 else: pa_eng.num_completed_jobs = pa_eng.num_completed_jobs +1 @@ -346,4 +352,10 @@ if __name__ == "__main__": print( "Failed to initialise PA Job Manager: {}".format(e) ) session.rollback() HandleJobs() - print("Exiting for now: {}".format( pa_eng )) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) + s.listen() + while True: + conn, addr = s.accept() + print("Connection from: {} so HandleJobs".format(addr)) + HandleJobs() diff --git a/shared.py b/shared.py index 11b35d2..ace1919 100644 --- a/shared.py +++ b/shared.py @@ -1 +1,3 @@ DB_URL = 'postgresql+psycopg2://pa:for_now_pa@mara.ddp.net:55432/pa' +PA_JOB_MANAGER_HOST="192.168.0.2" +PA_JOB_MANAGER_PORT=55430 diff --git a/tables.sql b/tables.sql index 90c4b18..6616657 100644 --- a/tables.sql +++ b/tables.sql @@ -56,13 +56,10 @@ insert into person_refimg_link values ( 2, 2 ); insert into person_refimg_link values ( 3, 3 ); insert into person_refimg_link values ( 4, 4 ); insert into settings values ( (select nextval('settings_id_seq')), '/home/ddp/src/photoassistant/images_to_process/#c:/Users/cam/Desktop/code/python/photoassistant/photos/#/home/ddp/src/photoassistant/new_img_dir/', 0 ); -insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Full Import', 'Completed', 4, 4, 157, 157, 'last_fake_data.img', null, 'Completed' ); -insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Full Import', 'Processing AI', 4, 3, 157, 45, 'fake_data.img', null, 'Running' ); -insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Scan Files', 'New', 3, 0, 157, 0, '', null, 'New' ); -insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Gen Hashes', 'New', 3, 0, 157, 0, '', 3, 'New' ); -insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Process AI', 'New', 3, 0, 157, 0, '', 4, 'New' ); -insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Started Scanning Files' ); -insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Finished Scanning Files' ); -insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Started Generating Hashes and thumbnails' ); -insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Finished Generating Hashes and thumbnails' ); -insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Started Processing AI for "Cam"' ); +insert into job values ( (select nextval('job_id_seq')), now(), now(), 'Full Import', 'Completed', 3, 3, 157, 157, 'fake_data.img', null, 'Completed' ); +insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 1: Started Scanning Files' ); +insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 1: Finished Scanning Files' ); +insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 2: Started Generating Hashes and thumbnails' ); +insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 2: Finished Generating Hashes and thumbnails' ); +insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 3: Started Processing AI' ); +insert into joblog values ( (select nextval('joblog_id_seq')), 1, now(), 'Pass 3: Finished Processing AI' ); diff --git a/templates/base.html b/templates/base.html index db98c89..83620bb 100644 --- a/templates/base.html +++ b/templates/base.html @@ -79,8 +79,9 @@ {% endblock main_content %} +{% block script_content %} + +{% endblock script_content %}