hacky/quick fix to handle db container restarts, NEED to remove global session, and replace with sess, and then use Threads and allow parallelism finally - this fixes BUG-140
This commit is contained in:
@@ -20,6 +20,7 @@ from sqlalchemy.orm import relationship
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.orm import scoped_session
|
||||
from contextlib import contextmanager
|
||||
|
||||
### LOCAL FILE IMPORTS ###
|
||||
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName, GenThumb, SECS_IN_A_DAY, PA_EXIF_ROTATER, PA
|
||||
@@ -66,21 +67,41 @@ override_tbls={ "face_no_match_override", "face_force_match_override", "disconne
|
||||
# this is required to handle the duplicate processing code
|
||||
sys.setrecursionlimit(50000)
|
||||
|
||||
# a Manager, which the Session will use for connection resources
|
||||
some_engine = create_engine(DB_URL)
|
||||
|
||||
# create a configured "Session" class
|
||||
#Session = sessionmaker(bind=some_engine)
|
||||
# 1. Add pool_pre_ping and pool_recycle here to handle db container disappearing underneath us
|
||||
some_engine = create_engine(
|
||||
DB_URL,
|
||||
pool_pre_ping=True, # check DB connection is still active before use
|
||||
pool_recycle=300, # churn connections regardless every 5 mins
|
||||
pool_size=20, # Parallel-ready base pool
|
||||
max_overflow=10 # Burst capacity for high socket traffic
|
||||
)
|
||||
|
||||
# create a Session
|
||||
session_factory = sessionmaker(bind=some_engine)
|
||||
Session = scoped_session(session_factory)
|
||||
session = Session()
|
||||
|
||||
# HACK: need to remove this and use 'sess' as an actual param everywhere, butt here are 200+ so quick fix until retired
|
||||
session = Session
|
||||
|
||||
# this is a way to handle a session failing
|
||||
@contextmanager
|
||||
def PA_db_session():
|
||||
"""Provide a transactional scope around a series of operations."""
|
||||
# This creates a NEW session from the registry
|
||||
s = Session()
|
||||
try:
|
||||
yield s
|
||||
s.commit()
|
||||
except Exception:
|
||||
s.rollback()
|
||||
raise
|
||||
finally:
|
||||
# This destroys the session and returns connection to pool
|
||||
Session.remove()
|
||||
|
||||
# this creates the Base (like db model in flask)
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
################################################################################
|
||||
# Class describing PathType & in the database (via sqlalchemy)
|
||||
# series of pre-defined types of paths (import, storage, bin)
|
||||
@@ -2751,7 +2772,13 @@ if __name__ == "__main__":
|
||||
|
||||
InitialValidationChecks()
|
||||
|
||||
HandleJobs(True)
|
||||
# Initial job run on startup (hence True in 1st param)
|
||||
try:
|
||||
with PA_db_session() as sess:
|
||||
HandleJobs(True,sess)
|
||||
except Exception as e:
|
||||
PAprint(f"ERROR: Initial job handle failed: {e}")
|
||||
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
||||
# force timeout every 1 day so we can run scheduled jobs
|
||||
@@ -2759,18 +2786,35 @@ if __name__ == "__main__":
|
||||
s.listen()
|
||||
while True:
|
||||
try:
|
||||
# 1. Wait for connection
|
||||
conn, addr = s.accept()
|
||||
if DEBUG:
|
||||
PAprint( f"accept finished, tout={s.timeout}" )
|
||||
PAprint(f"Connection accepted from {addr}")
|
||||
|
||||
# 2. Process Jobs after a successful socket connection
|
||||
with PA_db_session() as sess:
|
||||
HandleJobs(False, sess)
|
||||
# Check for scheduled tasks as well
|
||||
if ScheduledJobs(sess):
|
||||
HandleJobs(False, sess)
|
||||
|
||||
except socket.timeout:
|
||||
if DEBUG:
|
||||
PAprint( f"timeout occurred, tout={s.timeout}" )
|
||||
if ScheduledJobs():
|
||||
HandleJobs(False)
|
||||
PAprint("Socket timeout (Daily maintenance window) reached.")
|
||||
|
||||
# 3. Process Scheduled Jobs during the timeout
|
||||
try:
|
||||
with PA_db_session() as sess:
|
||||
if ScheduledJobs(sess):
|
||||
HandleJobs(False, sess)
|
||||
except sqlalchemy.exc.OperationalError:
|
||||
PAprint("DB Connection lost during scheduled task window. Retrying next cycle.")
|
||||
continue
|
||||
else:
|
||||
HandleJobs(False)
|
||||
# in case we constantly have jobs running, the '1 day' last import might be missed, so check it after each job too
|
||||
if ScheduledJobs():
|
||||
HandleJobs(False)
|
||||
|
||||
except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.InterfaceError) as e:
|
||||
# This catches the DB container restart specifically
|
||||
PAprint(f"DATABASE ERROR: Connection lost. Retrying... {e}")
|
||||
time.sleep(5) # Brief pause before next socket listen
|
||||
|
||||
except Exception as e:
|
||||
PAprint(f"UNEXPECTED ERROR: {e}")
|
||||
|
||||
Reference in New Issue
Block a user