diff --git a/TODO b/TODO index 4632fcb..ce79fc3 100644 --- a/TODO +++ b/TODO @@ -3,6 +3,11 @@ ### BACKEND * need a "batch" processing system that uses ionice to minimise load on mara and is threaded and used DB to interact with gunicorn'd pa + Might need a session per Thread, so maybe + sess=[] + sess[job.id]=Session() + etc + DONE: pa_jobs (threaded python add separate from photoassistant) DONE: takes over files.py has ai.py diff --git a/pa_job_manager.py b/pa_job_manager.py index 02222fc..a693dd2 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -14,6 +14,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT @@ -32,6 +33,8 @@ import cv2 import socket import threading +DEBUG=1 + # an Manager, which the Session will use for connection resources some_engine = create_engine(DB_URL) @@ -116,10 +119,21 @@ class FileData(): fthumbnail = str(fthumbnail)[2:-1] return fthumbnail - ############################################################################## - # HACK: At present this only handles one path (need to re-factor if we have # - # multiple valid paths in import_path) # + def ProcessImportDirs(self): + settings = session.query(Settings).first() + if settings == None: + raise Exception("Cannot create file data with no settings / import path is missing") + last_import_date = settings.last_import_date + paths = settings.import_path.split("#") + for path in paths: + # make new Job; HandleJobs will make them run later + jex=JobExtra( name="path", value=path ) + job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None ) + job.extra.append(jex) + session.add(job) + return + ############################################################################## def GenerateFileData(self, job): settings = session.query(Settings).first() @@ -182,6 +196,49 @@ class FileData(): # Class describing File in the database, and via sqlalchemy, connected to the DB as well # This has to match one-for-one the DB table ################################################################################ +class EntryDirLink(Base): + __tablename__ = "entry_dir_link" + entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True ) + dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True ) + + def __repr__(self): + return "".format(self.entry_id, self.dir_eid) + +class Dir(Base): + __tablename__ = "dir" + eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) + path_prefix = Column(String, unique=False, nullable=False ) + + def __repr__(self): + return "".format(self.eid, self.path_prefix) + +class Entry(Base): + __tablename__ = "entry" + id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) + name = Column(String, unique=True, nullable=False ) + type = Column(String, unique=False, nullable=False) + dir_details = relationship( "Dir") + file_details = relationship( "New_File" ) + in_dir = relationship ("Dir", secondary="entry_dir_link" ) + + def __repr__(self): + return "".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir) + +class New_File(Base): + __tablename__ = "new_file" + eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) + size_mb = Column(Integer, unique=False, nullable=False) + hash = Column(Integer, unique=True, nullable=True) + thumbnail = Column(String, unique=False, nullable=True) + + def __repr__(self): + return "".format(self.eid, self.size_mb, self.hash ) + +class FileType(Base): + __tablename__ = "file_type" + id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True ) + name = Column(String, unique=True, nullable=False ) + class File(Base): __tablename__ = "file" id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) @@ -196,6 +253,7 @@ class File(Base): def __repr__(self): return "".format(self.id, self.name ) + class Settings(Base): __tablename__ = "settings" id = Column(Integer, Sequence('settings_id_seq'), primary_key=True ) @@ -233,7 +291,17 @@ class Joblog(Base): log = Column(String) def __repr__(self): - return "".format(self.id, self.job_id, self.name, self.value ) class Job(Base): __tablename__ = "job" @@ -250,8 +318,11 @@ class Job(Base): wait_for = Column(Integer) pa_job_state = Column(String) + logs = relationship( "Joblog") + extra = relationship( "JobExtra") + def __repr__(self): - return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file) + return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file, self.extra, self.logs) class PA_JobManager_FE_Message(Base): __tablename__ = "pa_job_manager_fe_message" @@ -285,19 +356,29 @@ def AddLogForJob(job, message, current_file=''): job.last_update=now job.current_file=current_file session.add(log) - session.commit() + return def RunJob(job): - try: +# try: if job.name =="scannow": JobScanNow(job) elif job.name =="forcescan": JobForceScan(job) + elif job.name =="importdir": + JobImportDir(job) else: print("Requested to process unknown job type: {}".format(job.name)) - except Exception as e: - MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) - return +# except Exception as e: + if DEBUG==0: + try: + MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) + except Exception as e: + print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- exception was: {})".format( job.id, job.name, e) ) +# else: +# print("back-end Failed to run job (id: {}, name: {} -- exception was: {})".format( job.id, job.name, e) ) +# exit(1) + return +# return def HandleJobs(): global pa_eng @@ -309,7 +390,12 @@ def HandleJobs(): pa_eng.num_completed_jobs=0 for job in jobs: if job.pa_job_state != 'Completed': - threading.Thread(target=RunJob, args=(job,)).start() + # use this to remove threads for easier debuggin + if DEBUG==1: + RunJob(job) + else: + print ("WTF") + threading.Thread(target=RunJob, args=(job,)).start() print ("HandleJobs setting num_active jobs to +1") pa_eng.num_active_jobs = pa_eng.num_active_jobs + 1 else: @@ -342,11 +428,110 @@ def JobForceScan(job): session.commit() return +def JobImportDir(job): + print("Working on this - import dir: {}".format(job.id)) + settings = session.query(Settings).first() + if settings == None: + raise Exception("Cannot create file data with no settings / import path is missing") + last_import_date = settings.last_import_date + for jex in job.extra: + if jex.name =="path": + print("Should be importing: {}".format(jex.value)) + path=jex.value + AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) + path = FixPath(path) + if os.path.exists( path ): + # to serve static content of the images, we create a symlink + # from inside the static subdir of each import_path that exists + symlink = FixPath('static/{}'.format( os.path.basename(path[0:-1]))) + if not os.path.exists(symlink): + os.symlink(path, symlink) + file_list=[] + file_list.append(glob.glob(path + '**', recursive=True)) + dir=Dir( path_prefix=symlink ) + dtype = FileType(name='Directory') + e=Entry( name=os.path.basename(path[0:-1]), type=dtype.id ) + e.dir_details.append(dir) + session.add(e) + for file in file_list[0]: + if file == path: + continue + fname=file.replace(path, "") + stat = os.stat(file) + if last_import_date == 0 or stat.st_ctime > last_import_date: + AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) ) + print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) ) + if os.path.isdir(file): + path_prefix=os.path.join(symlink,fname) + e=Entry( name=fname, type=dtype.id ) + dir=Dir( path_prefix=path_prefix ) + e.dir_details.append(dir) + print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) ) + print("DEBUG: DIR- path={}, pp={}, sl={}".format( path, path_prefix, symlink ) ) +# DEBUG: DIR- path=/home/ddp/src/photoassistant/images_to_process/, pp=static/images_to_process, sl=static/images_to_process + else: + if isImage(file): + ftype = FileType(name='Image') + elif isVideo(file): + ftype = FileType(name='Video') + else: + ftype = FileType('File') + fsize = round(os.stat(file).st_size/(1024*1024)) + e=Entry( name=os.path.basename(fname), type=ftype.id ) + f=New_File( size_mb=fsize ) + e.file_details.append(f) + e.in_dir.append(dir) + session.add(e) + print( session.new ) + + AddLogForJob(job, "Found new file: {}".format(fname) ) + print("Found new file: {}".format(fname) ) + else: + AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file ) + print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file ) + #settings.last_import_date = time.time() + session.commit() + print( "Ending, list session new objects" ) + print ("fake finished import dir") + return + +def isVideo(file): + try: + fileInfo = MediaInfo.parse(file) + for track in fileInfo.tracks: + if track.track_type == "Video": + return True + return False + except Exception as e: + return False + +# Converts linux paths into windows paths +# HACK: assumes c:, might be best to just look for [a-z]: ? +def FixPath(p): + if p.startswith('c:'): + p = p.replace('/', '\\') + return p + +# Returns an md5 hash of the fnames' contents +def md5(fname): + hash_md5 = hashlib.md5() + with open(fname, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + +def isImage(file): + try: + img = Image.open(file) + return True + except: + return False if __name__ == "__main__": print("PA job manager starting") try: InitialiseManager() + filedata.ProcessImportDirs() session.commit() except Exception as e: print( "Failed to initialise PA Job Manager: {}".format(e) ) diff --git a/tables.sql b/tables.sql index f6da8e8..3ba7a10 100644 --- a/tables.sql +++ b/tables.sql @@ -2,11 +2,43 @@ ALTER DATABASE pa SET timezone TO 'Australia/Victoria'; create table settings( id integer, import_path varchar, last_import_date float, constraint pk_settings_id primary key(id) ); -create table file_type ( id integer, name varchar(32) unique, constraint pk_file_type_id primary key(id) ); -insert into file_type values ( 1, 'Directory' ); -insert into file_type values ( 2, 'Image' ); -insert into file_type values ( 3, 'Video' ); -insert into file_type values ( 4, 'Unknown' ); +create table FILE_TYPE ( ID integer, NAME varchar(32) unique, constraint PK_FILE_TYPE_ID primary key(ID) ); + +create table ENTRY( ID integer, NAME varchar(128), TYPE integer, constraint PK_ENTRY_ID primary key(ID) ); + +create table NEW_FILE ( EID integer, SIZE_MB integer, HASH varchar(34), THUMBNAIL varchar, + constraint PK_FILE_ID primary key(EID), + constraint FK_FILE_ENTRY_ID foreign key (EID) references ENTRY(ID) ); + +create table DIR ( EID integer, PATH_PREFIX varchar(256), + constraint PK_DIR_ID primary key(EID), + constraint FK_DIR_ENTRY_ID foreign key (EID) references ENTRY(ID) ); + +create table ENTRY_DIR_LINK ( entry_id integer, dir_eid integer, + constraint PK_EDL_entry_id_dir_eid primary key (entry_id, dir_eid), + constraint FK_EDL_ENTRY_ID foreign key (ENTRY_ID) references ENTRY(ID), + constraint FK_EDL_DIR_ID foreign key (DIR_EID) references DIR(EID) ); + +-- ENTRY( 1, 'images_to_process', D) +-- ENTRY( 2, 'IMG_9289.JPG', F ) +-- ENTRY( 3, 'IMG_9289.JPG', F ) +-- ENTRY( 4, 'TEST', D ) +-- ENTRY( 5, 'a.jpg', F ) ; -- in TEST/ +-- ENTRY( 6, 'new_img_dir/', F ) +-- ENTRY( 7, 'b.jpg', F ) ; -- in new_img_dir/ +-- +-- DIR( 1, 'static/images_to_process/' +-- NEW_FILE( 3, size, hash, thumb ) +-- DIR( 4, 'static/images_to_process/TEST/' +-- NEW_FILE( 5, size, hash, thumb ) +-- DIR( 6, 'static/new_img_dir/' +-- NEW_FILE( 7, size, hash, thumb ) +-- +-- DIR, ENTRY ( 1, 2 ) +-- DIR, ENTRY ( 1, 3 ) +-- DIR, ENTRY ( 1, 4 ) +-- DIR, ENTRY ( 4, 5 ) +-- DIR, ENTRY ( 6, 7 ) create table file( id integer, name varchar(128), size_MB integer, type varchar(20), path_prefix varchar(256), hash varchar(34), thumbnail varchar, constraint pk_photos_id primary key(id) ); @@ -32,6 +64,10 @@ create table job ( num_files integer, current_file_num integer, current_file varchar(256), wait_for integer, pa_job_state varchar(48), constraint pk_job_id primary key(id) ); +-- used to pass / keep extra values, e.g. num_files for jobs that have sets of files, or out* for adding output from jobs that you want to pass to next job in the chain +create table jobextra ( id integer, job_id integer, name varchar(32), value varchar(1024), + constraint pk_jobextra_id primary key(id), constraint fk_jobextra_job_id foreign key(job_id) references job(id) ); + create table joblog ( id integer, job_id integer, log_date timestamptz, log varchar, constraint pk_jl_id primary key(id), constraint fk_jl_job_id foreign key(job_id) references job(id) ); @@ -40,6 +76,8 @@ create table pa_job_manager ( id integer, state varchar(128), num_active_jobs in create table pa_job_manager_fe_message ( id integer, job_id integer, alert varchar(16), message varchar(1024), constraint pa_job_manager_fe_acks_id primary key(id), constraint fk_pa_job_manager_fe_message_job_id foreign key(job_id) references job(id) ); create sequence file_id_seq; +create sequence file_type_id_seq; +create sequence jobextra_id_seq; create sequence joblog_id_seq; create sequence job_id_seq; create sequence person_id_seq; @@ -48,6 +86,11 @@ create sequence settings_id_seq; create sequence pa_job_manager_id_seq; create sequence pa_job_manager_fe_message_id_seq; +insert into FILE_TYPE values ( (select nextval('file_type_id_seq')), 'Directory' ); +insert into FILE_TYPE values ( (select nextval('file_type_id_seq')), 'Image' ); +insert into FILE_TYPE values ( (select nextval('file_type_id_seq')), 'Video' ); +insert into FILE_TYPE values ( (select nextval('file_type_id_seq')), 'Unknown' ); + -- fake data only for making testing easier insert into person values ( (select nextval('person_id_seq')), 'dad', 'Damien', 'De Paoli' ); insert into person values ( (select nextval('person_id_seq')), 'mum', 'Mandy', 'De Paoli' );