diff --git a/BUGs b/BUGs index d4a4a4a..3e260fe 100644 --- a/BUGs +++ b/BUGs @@ -1,6 +1,5 @@ ### Next: 147 BUG-146: with an empty DB, I See 'No files in Path!' twice (for file*, except for files_rbp) -BUG-140: When db is restarted underneath PA, it crashes job mgr... It should just accept timeouts, and keep trying to reconnect every 2? mins BUG-118: can move files from Bin path, but it leaves the del_file entry for it - need to remove it BUG-117: when search returns files that can be deleted and/or restored, the icon stays as delete and tries to delete! BUG-106: cant add trudy /pat? as refimgs via FaceDBox diff --git a/pa_job_manager.py b/pa_job_manager.py index bbe67b4..a671af7 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -20,6 +20,7 @@ from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session +from contextlib import contextmanager ### LOCAL FILE IMPORTS ### from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName, GenThumb, SECS_IN_A_DAY, PA_EXIF_ROTATER, PA @@ -66,21 +67,41 @@ override_tbls={ "face_no_match_override", "face_force_match_override", "disconne # this is required to handle the duplicate processing code sys.setrecursionlimit(50000) -# a Manager, which the Session will use for connection resources -some_engine = create_engine(DB_URL) -# create a configured "Session" class -#Session = sessionmaker(bind=some_engine) +# 1. Add pool_pre_ping and pool_recycle here to handle db container disappearing underneath us +some_engine = create_engine( + DB_URL, + pool_pre_ping=True, # check DB connection is still active before use + pool_recycle=300, # churn connections regardless every 5 mins + pool_size=20, # Parallel-ready base pool + max_overflow=10 # Burst capacity for high socket traffic +) -# create a Session session_factory = sessionmaker(bind=some_engine) Session = scoped_session(session_factory) -session = Session() + +# HACK: need to remove this and use 'sess' as an actual param everywhere, butt here are 200+ so quick fix until retired +session = Session + +# this is a way to handle a session failing +@contextmanager +def PA_db_session(): + """Provide a transactional scope around a series of operations.""" + # This creates a NEW session from the registry + s = Session() + try: + yield s + s.commit() + except Exception: + s.rollback() + raise + finally: + # This destroys the session and returns connection to pool + Session.remove() # this creates the Base (like db model in flask) Base = declarative_base() - ################################################################################ # Class describing PathType & in the database (via sqlalchemy) # series of pre-defined types of paths (import, storage, bin) @@ -2751,7 +2772,13 @@ if __name__ == "__main__": InitialValidationChecks() - HandleJobs(True) + # Initial job run on startup (hence True in 1st param) + try: + with PA_db_session() as sess: + HandleJobs(True,sess) + except Exception as e: + PAprint(f"ERROR: Initial job handle failed: {e}") + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) # force timeout every 1 day so we can run scheduled jobs @@ -2759,18 +2786,35 @@ if __name__ == "__main__": s.listen() while True: try: + # 1. Wait for connection conn, addr = s.accept() if DEBUG: - PAprint( f"accept finished, tout={s.timeout}" ) + PAprint(f"Connection accepted from {addr}") + + # 2. Process Jobs after a successful socket connection + with PA_db_session() as sess: + HandleJobs(False, sess) + # Check for scheduled tasks as well + if ScheduledJobs(sess): + HandleJobs(False, sess) except socket.timeout: if DEBUG: - PAprint( f"timeout occurred, tout={s.timeout}" ) - if ScheduledJobs(): - HandleJobs(False) + PAprint("Socket timeout (Daily maintenance window) reached.") + + # 3. Process Scheduled Jobs during the timeout + try: + with PA_db_session() as sess: + if ScheduledJobs(sess): + HandleJobs(False, sess) + except sqlalchemy.exc.OperationalError: + PAprint("DB Connection lost during scheduled task window. Retrying next cycle.") continue - else: - HandleJobs(False) - # in case we constantly have jobs running, the '1 day' last import might be missed, so check it after each job too - if ScheduledJobs(): - HandleJobs(False) + + except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.InterfaceError) as e: + # This catches the DB container restart specifically + PAprint(f"DATABASE ERROR: Connection lost. Retrying... {e}") + time.sleep(5) # Brief pause before next socket listen + + except Exception as e: + PAprint(f"UNEXPECTED ERROR: {e}")