### # # This file controls the 'external' job control manager, that (periodically # # looks / somehow is pushed an event?) picks up new jobs, and processes them. # # It then stores the progress/status, etc. in job and joblog tables as needed # via wrapper functions. # # The whole pa_job_manager is multi-threaded, and uses the database tables for # state management and communication back to the pa web site # ### from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT from datetime import datetime, timedelta import pytz import time import os import glob from PIL import Image from pymediainfo import MediaInfo import hashlib import exifread import base64 import numpy import cv2 import socket import threading DEBUG=0 # an Manager, which the Session will use for connection resources some_engine = create_engine(DB_URL) # create a configured "Session" class #Session = sessionmaker(bind=some_engine) # create a Session session_factory = sessionmaker(bind=some_engine) Session = scoped_session(session_factory) session = Session() Base = declarative_base() ################################################################################ # Class describing File in the database, and via sqlalchemy, connected to the DB as well # This has to match one-for-one the DB table ################################################################################ class EntryDirLink(Base): __tablename__ = "entry_dir_link" entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True ) dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True ) def __repr__(self): return "".format(self.entry_id, self.dir_eid) class Dir(Base): __tablename__ = "dir" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) path_prefix = Column(String, unique=False, nullable=False ) num_files = Column(Integer) last_import_date = Column(Float) last_hash_date = Column(Float) files = relationship("Entry", secondary="entry_dir_link") def __repr__(self): return "".format(self.eid, self.path_prefix, self.num_files, self.last_import_date, self.last_hash_date) class Entry(Base): __tablename__ = "entry" id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) name = Column(String, unique=True, nullable=False ) type_id = Column(Integer, ForeignKey("file_type.id")) type=relationship("FileType") dir_details = relationship( "Dir") file_details = relationship( "File" ) in_dir = relationship ("Dir", secondary="entry_dir_link" ) def __repr__(self): return "".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir) class File(Base): __tablename__ = "file" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) size_mb = Column(Integer, unique=False, nullable=False) hash = Column(Integer, unique=True, nullable=True) thumbnail = Column(String, unique=False, nullable=True) def __repr__(self): return "".format(self.eid, self.size_mb, self.hash ) class FileType(Base): __tablename__ = "file_type" id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True ) name = Column(String, unique=True, nullable=False ) def __repr__(self): return "".format(self.id, self.name ) class Settings(Base): __tablename__ = "settings" id = Column(Integer, Sequence('settings_id_seq'), primary_key=True ) import_path = Column(String) def __repr__(self): return "".format(self.id, self.import_path ) ################################################################################ # classes for the job manager: # Job (and Joblog, JobExtra) for each Job, and # PA_Jobmanager_fe_message (to pass messages to the front-end web) ################################################################################ class Joblog(Base): __tablename__ = "joblog" id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id') ) log_date = Column(DateTime(timezone=True)) log = Column(String) def __repr__(self): return "".format(self.id, self.job_id, self.name, self.value ) class Job(Base): __tablename__ = "job" id = Column(Integer, Sequence('job_id_seq'), primary_key=True ) start_time = Column(DateTime(timezone=True)) last_update = Column(DateTime(timezone=True)) name = Column(String) state = Column(String) num_files = Column(Integer) current_file_num = Column(Integer) current_file = Column(String) wait_for = Column(Integer) pa_job_state = Column(String) logs = relationship( "Joblog") extra = relationship( "JobExtra") def __repr__(self): return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) class PA_JobManager_FE_Message(Base): __tablename__ = "pa_job_manager_fe_message" id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id'), primary_key=True ) alert = Column(String) message = Column(String) def __repr__(self): return "job id={} {}".format( job.id, job.id, job.name ) ) # force commit to make job.id be valid in use of wait_for later session.commit() jex2=JobExtra( name="path", value=path ) job2=Job(start_time='now()', last_update='now()', name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New" ) job2.extra.append(jex2) session.add(job2) session.commit() if parent_job: AddLogForJob(parent_job, "adding job id={} {} (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) ) HandleJobs() return def AddLogForJob(job, message, current_file=''): now=datetime.now(pytz.utc) log=Joblog( job_id=job.id, log=message, log_date=now ) job.last_update=now if current_file != '': job.current_file=os.path.basename(current_file) # this may not be set on an import job if job.num_files: job.current_file_num=job.current_file_num+1 session.add(log) session.commit() return def RunJob(job): # session = Session() if job.name =="scannow": JobScanNow(job) elif job.name =="forcescan": JobForceScan(job) elif job.name =="importdir": JobImportDir(job) elif job.name =="getfiledetails": JobGetFileDetails(job) else: print("ERROR: Requested to process unknown job type: {}".format(job.name)) # okay, we finished a job, so check for any jobs that are dependant on this and run them... # session.close() HandleJobs() return def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"): job.state=state job.pa_job_state=pa_job_state job.last_update=datetime.now(pytz.utc) AddLogForJob(job, last_log) return def HandleJobs(): print("INFO: PA job manager is scanning for new jobs to process") for job in session.query(Job).all(): if job.pa_job_state == 'New': if job.wait_for != None: j2 = session.query(Job).get(job.wait_for) if not j2: print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for )) for j in session.query(Job).all(): print ("j={}".format(j.id)) continue if j2.pa_job_state != 'Completed': continue # use this to remove threads for easier debugging, and errors will stacktrace to the console if DEBUG==1: print("*************************************") print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for )) RunJob(job) else: try: RunJob(job) # threading.Thread(target=RunJob, args=(job,)).start() except Exception as e: try: MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) except Exception as e2: print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) ) print("INFO: PA job manager is waiting jobs") return def JobProgressState( job, state ): job.pa_job_state = "In Progress" job.state=state session.commit() return def JobScanNow(job): JobProgressState( job, "In Progress" ) ProcessImportDirs(job) FinishJob( job, "Completed (scan for new files)" ) MessageToFE( job.id, "success", "Completed (scan for new files)" ) session.commit() return def JobForceScan(job): JobProgressState( job, "In Progress" ) session.query(EntryDirLink).delete() session.query(Dir).delete() session.query(File).delete() session.query(Entry).delete() session.commit() ProcessImportDirs(job) FinishJob(job, "Completed (forced remove and recreation of all file data)") MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" ) session.commit() return def SymlinkName(path, file): sig_bit=file.replace(path, "") last_dir=format( os.path.basename(path[0:-1])) if sig_bit[-1] == os.path.sep: last_bit = os.path.dirname(sig_bit)[0:-1] else: last_bit = os.path.dirname(sig_bit) symlink = os.path.join('static', last_dir, last_bit ) if symlink[-1] == os.path.sep: symlink=symlink[0:-1] return symlink # to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists def CreateSymlink(job,path): symlink=os.path.join('static', os.path.basename(path[0:-1])) if not os.path.exists(symlink): os.symlink(path, symlink) return symlink def AddDir(job, dirname, path_prefix, in_dir): dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0, last_hash_date=0 ) dtype=session.query(FileType).filter(FileType.name=='Directory').first() e=Entry( name=dirname, type=dtype ) e.dir_details.append(dir) # this occurs when we Add the actual Dir for the import_path if in_dir: e.in_dir.append(in_dir) if DEBUG==1: AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) ) session.add(e) return dir def AddFile(job, fname, type_str, fsize, in_dir ): ftype = session.query(FileType).filter(FileType.name==type_str).first() e=Entry( name=fname, type=ftype ) f=File( size_mb=fsize ) e.file_details.append(f) e.in_dir.append(in_dir) AddLogForJob(job, "Found new file: {}".format(fname) ) session.add(e) return e def JobImportDir(job): JobProgressState( job, "In Progress" ) settings = session.query(Settings).first() if settings == None: raise Exception("Cannot create file data with no settings / import path is missing") overall_file_cnt=0 fcnt={} keep_dirs={} for jex in job.extra: if jex.name =="path": path = FixPath(jex.value) AddLogForJob(job, "Checking Import Directory: {}".format( path ) ) if DEBUG==1: print("DEBUG: Checking Import Directory: {}".format( path ) ) if os.path.exists( path ): symlink=CreateSymlink(job,path) # dont want to do add a Dir, if this already exists dir=session.query(Dir).filter(Dir.path_prefix==symlink).first() if dir != None: stat = os.stat( symlink ) # check any modificaiton on fs, since last import, if none we are done if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date: if DEBUG==1: print( "DEBUG: Directory has not been altered since the last import, just return" ) job.current_file_num=dir.num_files job.num_files=dir.num_files FinishJob( job, "No new files in directory since the last import") return else: dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None ) session.commit() keep_dirs[dir.path_prefix]=dir import_dir=dir fcnt[symlink]=0 files = sorted(glob.glob(path + '**', recursive=True)) job.current_file_num=0 # reduce this by 1, becasuse we skip file == path below job.num_files=len(files)-1 print("len={}, files={}", len(files), files ) session.commit() for file in sorted(glob.glob(path + '**', recursive=True)): if file == path: continue fname=file.replace(path, "") stat = os.stat(file) dirname=SymlinkName(path, file) if stat.st_ctime > keep_dirs[dirname].last_import_date: if DEBUG==1: AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) if os.path.isdir(file): path_prefix=os.path.join(symlink,fname) dir=AddDir( job, fname, path_prefix, dir ) fcnt[path_prefix]=0 keep_dirs[dir.path_prefix]=dir else: overall_file_cnt=overall_file_cnt+1 dirname=SymlinkName(path, file) fcnt[dirname]=fcnt[dirname]+1 if isImage(file): type_str = 'Image' elif isVideo(file): type_str = 'Video' else: type_str = 'File' fsize = round(os.stat(file).st_size/(1024*1024)) e=AddFile( job, os.path.basename(fname), type_str, fsize, dir ) else: if DEBUG==1: AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) ) for d in keep_dirs: keep_dirs[d].num_files = fcnt[d] keep_dirs[d].last_import_date = time.time() # override this to be all the files in dir & its sub dirs... (used to know how many files in jobs for this import dir) import_dir.num_files=overall_file_cnt else: FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) for j in session.query(Job).filter(Job.wait_for==job.id).all(): if DEBUG==1: print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) session.commit() return def FilesInDir( path ): d=session.query(Dir).filter(Dir.path_prefix==path).first() return d.files def ProcessFilesInDir(job, e): if DEBUG==1: print("DEBUG: files in dir - process: {}".format(e.name)) if e.type.name != 'Directory': e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) if e.type.name == 'Image': e.file_details[0].thumbnail = GenImageThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) elif e.type.name == 'Video': e.file_details[0].thumbnail = GenVideoThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) else: dir=session.query(Dir).filter(Dir.eid==e.id).first() stat = os.stat( dir.path_prefix ) # check any modificaiton on fs, since last import, if none we are done if stat.st_ctime < dir.last_hash_date: session.add(dir) dir.last_hash_date = time.time() AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix)) if DEBUG==1: print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) return dir.last_hash_date = time.time() for sub in dir.files: ProcessFilesInDir(job, sub ) def JobGetFileDetails(job): JobProgressState( job, "In Progress" ) for jex in job.extra: if jex.name =="path": path=jex.value path=os.path.join('static', os.path.basename(path[0:-1])) if DEBUG==1: print("DEBUG: JobGetFileDetails for path={}".format( path ) ) dir=session.query(Dir).filter(Dir.path_prefix==path).first() stat=os.stat( path ) if stat.st_ctime < dir.last_hash_date: session.add(dir) dir.last_hash_date = time.time() FinishJob(job, "{} has not changed since last hashing - finished job".format(dir.path_prefix)) if DEBUG==1: print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix)) return dir.last_hash_date = time.time() job.current_file_num = 0 job.num_files = dir.num_files session.commit() for e in FilesInDir( path ): ProcessFilesInDir(job, e ) FinishJob(job, "File Details job finished") session.commit() return def isVideo(file): try: fileInfo = MediaInfo.parse(file) for track in fileInfo.tracks: if track.track_type == "Video": return True return False except Exception as e: return False # Converts linux paths into windows paths # HACK: assumes c:, might be best to just look for [a-z]: ? def FixPath(p): if p.startswith('c:'): p = p.replace('/', '\\') return p # Returns an md5 hash of the fnames' contents def md5(job, fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) hash = hash_md5.hexdigest() AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) ) return hash def isImage(file): try: img = Image.open(file) return True except: return False def GenImageThumbnail(job, file): AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file ) f = open(file, 'rb') try: tags = exifread.process_file(f) except: print('WARNING: NO EXIF TAGS?!?!?!?') AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file)) f.close() thumbnail = base64.b64encode(tags['JPEGThumbnail']) thumbnail = str(thumbnail)[2:-1] return thumbnail def GenVideoThumbnail(job, file): AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file ) vcap = cv2.VideoCapture(file) res, im_ar = vcap.read() while im_ar.mean() < 15 and res: res, im_ar = vcap.read() im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR) res, thumb_buf = cv2.imencode('.jpeg', im_ar) bt = thumb_buf.tostring() thumbnail = base64.b64encode(bt) thumbnail = str(thumbnail)[2:-1] return thumbnail if __name__ == "__main__": print("INFO: PA job manager starting") ProcessImportDirs() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) s.listen() while True: conn, addr = s.accept() HandleJobs()