alternate database structure with entry, (new_)file, dir works in terms of DB and SQL alchemy, tomorrow we rewrite the jobs to really use the new structure

This commit is contained in:
2021-01-18 21:40:46 +11:00
parent 4053919def
commit 5285bf66ab
3 changed files with 249 additions and 16 deletions

View File

@@ -14,6 +14,7 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
@@ -32,6 +33,8 @@ import cv2
import socket
import threading
DEBUG=1
# an Manager, which the Session will use for connection resources
some_engine = create_engine(DB_URL)
@@ -116,10 +119,21 @@ class FileData():
fthumbnail = str(fthumbnail)[2:-1]
return fthumbnail
##############################################################################
# HACK: At present this only handles one path (need to re-factor if we have #
# multiple valid paths in import_path) #
def ProcessImportDirs(self):
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
last_import_date = settings.last_import_date
paths = settings.import_path.split("#")
for path in paths:
# make new Job; HandleJobs will make them run later
jex=JobExtra( name="path", value=path )
job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None )
job.extra.append(jex)
session.add(job)
return
##############################################################################
def GenerateFileData(self, job):
settings = session.query(Settings).first()
@@ -182,6 +196,49 @@ class FileData():
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
# This has to match one-for-one the DB table
################################################################################
class EntryDirLink(Base):
__tablename__ = "entry_dir_link"
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return "<entry_id: {}, dir_eid: {}>".format(self.entry_id, self.dir_eid)
class Dir(Base):
__tablename__ = "dir"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
path_prefix = Column(String, unique=False, nullable=False )
def __repr__(self):
return "<eid: {}, path_prefix: {}>".format(self.eid, self.path_prefix)
class Entry(Base):
__tablename__ = "entry"
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
type = Column(String, unique=False, nullable=False)
dir_details = relationship( "Dir")
file_details = relationship( "New_File" )
in_dir = relationship ("Dir", secondary="entry_dir_link" )
def __repr__(self):
return "<id: {}, name: {}, type={}, dir_details={}, file_details={}, in_dir={}>".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir)
class New_File(Base):
__tablename__ = "new_file"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
size_mb = Column(Integer, unique=False, nullable=False)
hash = Column(Integer, unique=True, nullable=True)
thumbnail = Column(String, unique=False, nullable=True)
def __repr__(self):
return "<eid: {}, size_mb={}, hash={}>".format(self.eid, self.size_mb, self.hash )
class FileType(Base):
__tablename__ = "file_type"
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
class File(Base):
__tablename__ = "file"
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
@@ -196,6 +253,7 @@ class File(Base):
def __repr__(self):
return "<id: {}, name: {}>".format(self.id, self.name )
class Settings(Base):
__tablename__ = "settings"
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
@@ -233,7 +291,17 @@ class Joblog(Base):
log = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, log: {}".format(self.id, self.job_id, self.log )
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
class JobExtra(Base):
__tablename__ = "jobextra"
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
name = Column(String)
value = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
class Job(Base):
__tablename__ = "job"
@@ -250,8 +318,11 @@ class Job(Base):
wait_for = Column(Integer)
pa_job_state = Column(String)
logs = relationship( "Joblog")
extra = relationship( "JobExtra")
def __repr__(self):
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_passes: {}, current_passes: {}, num_files: {}, current_file_num: {}, current_file: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file)
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_passes: {}, current_passes: {}, num_files: {}, current_file_num: {}, current_file: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file, self.extra, self.logs)
class PA_JobManager_FE_Message(Base):
__tablename__ = "pa_job_manager_fe_message"
@@ -285,19 +356,29 @@ def AddLogForJob(job, message, current_file=''):
job.last_update=now
job.current_file=current_file
session.add(log)
session.commit()
return
def RunJob(job):
try:
# try:
if job.name =="scannow":
JobScanNow(job)
elif job.name =="forcescan":
JobForceScan(job)
elif job.name =="importdir":
JobImportDir(job)
else:
print("Requested to process unknown job type: {}".format(job.name))
except Exception as e:
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
return
# except Exception as e:
if DEBUG==0:
try:
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
except Exception as e:
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- exception was: {})".format( job.id, job.name, e) )
# else:
# print("back-end Failed to run job (id: {}, name: {} -- exception was: {})".format( job.id, job.name, e) )
# exit(1)
return
# return
def HandleJobs():
global pa_eng
@@ -309,7 +390,12 @@ def HandleJobs():
pa_eng.num_completed_jobs=0
for job in jobs:
if job.pa_job_state != 'Completed':
threading.Thread(target=RunJob, args=(job,)).start()
# use this to remove threads for easier debuggin
if DEBUG==1:
RunJob(job)
else:
print ("WTF")
threading.Thread(target=RunJob, args=(job,)).start()
print ("HandleJobs setting num_active jobs to +1")
pa_eng.num_active_jobs = pa_eng.num_active_jobs + 1
else:
@@ -342,11 +428,110 @@ def JobForceScan(job):
session.commit()
return
def JobImportDir(job):
print("Working on this - import dir: {}".format(job.id))
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
last_import_date = settings.last_import_date
for jex in job.extra:
if jex.name =="path":
print("Should be importing: {}".format(jex.value))
path=jex.value
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
path = FixPath(path)
if os.path.exists( path ):
# to serve static content of the images, we create a symlink
# from inside the static subdir of each import_path that exists
symlink = FixPath('static/{}'.format( os.path.basename(path[0:-1])))
if not os.path.exists(symlink):
os.symlink(path, symlink)
file_list=[]
file_list.append(glob.glob(path + '**', recursive=True))
dir=Dir( path_prefix=symlink )
dtype = FileType(name='Directory')
e=Entry( name=os.path.basename(path[0:-1]), type=dtype.id )
e.dir_details.append(dir)
session.add(e)
for file in file_list[0]:
if file == path:
continue
fname=file.replace(path, "")
stat = os.stat(file)
if last_import_date == 0 or stat.st_ctime > last_import_date:
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
if os.path.isdir(file):
path_prefix=os.path.join(symlink,fname)
e=Entry( name=fname, type=dtype.id )
dir=Dir( path_prefix=path_prefix )
e.dir_details.append(dir)
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
print("DEBUG: DIR- path={}, pp={}, sl={}".format( path, path_prefix, symlink ) )
# DEBUG: DIR- path=/home/ddp/src/photoassistant/images_to_process/, pp=static/images_to_process, sl=static/images_to_process
else:
if isImage(file):
ftype = FileType(name='Image')
elif isVideo(file):
ftype = FileType(name='Video')
else:
ftype = FileType('File')
fsize = round(os.stat(file).st_size/(1024*1024))
e=Entry( name=os.path.basename(fname), type=ftype.id )
f=New_File( size_mb=fsize )
e.file_details.append(f)
e.in_dir.append(dir)
session.add(e)
print( session.new )
AddLogForJob(job, "Found new file: {}".format(fname) )
print("Found new file: {}".format(fname) )
else:
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
#settings.last_import_date = time.time()
session.commit()
print( "Ending, list session new objects" )
print ("fake finished import dir")
return
def isVideo(file):
try:
fileInfo = MediaInfo.parse(file)
for track in fileInfo.tracks:
if track.track_type == "Video":
return True
return False
except Exception as e:
return False
# Converts linux paths into windows paths
# HACK: assumes c:, might be best to just look for [a-z]: ?
def FixPath(p):
if p.startswith('c:'):
p = p.replace('/', '\\')
return p
# Returns an md5 hash of the fnames' contents
def md5(fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def isImage(file):
try:
img = Image.open(file)
return True
except:
return False
if __name__ == "__main__":
print("PA job manager starting")
try:
InitialiseManager()
filedata.ProcessImportDirs()
session.commit()
except Exception as e:
print( "Failed to initialise PA Job Manager: {}".format(e) )