### # # This file controls the 'external' job control manager, that (periodically # # looks / somehow is pushed an event?) picks up new jobs, and processes them. # # It then stores the progress/status, etc. in job and joblog tables as needed # via wrapper functions. # # The whole pa_job_manager is multi-threaded, and uses the database tables for # state management and communication back to the pa web site # ### from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT from datetime import datetime, timedelta import pytz import time import os import glob from PIL import Image from pymediainfo import MediaInfo import hashlib import exifread import base64 import numpy import cv2 import socket import threading DEBUG=1 # an Manager, which the Session will use for connection resources some_engine = create_engine(DB_URL) # create a configured "Session" class Session = sessionmaker(bind=some_engine) # create a Session session = Session() Base = declarative_base() # global for us to keep state / let front-end know our state pa_eng=None ################################################################################ # Class describing File in the database, and via sqlalchemy, connected to the DB as well # This has to match one-for-one the DB table ################################################################################ class EntryDirLink(Base): __tablename__ = "entry_dir_link" entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True ) dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True ) def __repr__(self): return "".format(self.entry_id, self.dir_eid) class Dir(Base): __tablename__ = "dir" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) path_prefix = Column(String, unique=False, nullable=False ) num_files = Column(Integer) last_import_date = Column(Float) files = relationship("Entry", secondary="entry_dir_link") def __repr__(self): return "".format(self.eid, self.path_prefix, self.num_files) class Entry(Base): __tablename__ = "entry" id = Column(Integer, Sequence('file_id_seq'), primary_key=True ) name = Column(String, unique=True, nullable=False ) type_id = Column(Integer, ForeignKey("file_type.id")) type=relationship("FileType") dir_details = relationship( "Dir") file_details = relationship( "File" ) in_dir = relationship ("Dir", secondary="entry_dir_link" ) def __repr__(self): return "".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir) class File(Base): __tablename__ = "file" eid = Column(Integer, ForeignKey("entry.id"), primary_key=True ) size_mb = Column(Integer, unique=False, nullable=False) hash = Column(Integer, unique=True, nullable=True) thumbnail = Column(String, unique=False, nullable=True) def __repr__(self): return "".format(self.eid, self.size_mb, self.hash ) class FileType(Base): __tablename__ = "file_type" id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True ) name = Column(String, unique=True, nullable=False ) def __repr__(self): return "".format(self.id, self.name ) class Settings(Base): __tablename__ = "settings" id = Column(Integer, Sequence('settings_id_seq'), primary_key=True ) import_path = Column(String) def __repr__(self): return "".format(self.id, self.import_path ) ################################################################################ # classes for the job manager: # PA_JobManager overall status tracking), # Job (and Joblog) for each JOb, and # PA_Jobmanager_fe_message (to pass messages to the front-end web) ################################################################################ class PA_JobManager(Base): __tablename__ = "pa_job_manager" id = Column(Integer, Sequence('pa_job_manager_id_seq'), primary_key=True) state = Column(String) num_active_jobs = Column(Integer) num_completed_jobs = Column(Integer) def __repr__(self): return "".format( self.id, self.state, self.num_active_jobs, self.num_completed_jobs ) class Joblog(Base): __tablename__ = "joblog" id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id') ) log_date = Column(DateTime(timezone=True)) log = Column(String) def __repr__(self): return "".format(self.id, self.job_id, self.name, self.value ) class Job(Base): __tablename__ = "job" id = Column(Integer, Sequence('job_id_seq'), primary_key=True ) start_time = Column(DateTime(timezone=True)) last_update = Column(DateTime(timezone=True)) name = Column(String) state = Column(String) num_files = Column(Integer) current_file_num = Column(Integer) current_file = Column(String) wait_for = Column(Integer) pa_job_state = Column(String) logs = relationship( "Joblog") extra = relationship( "JobExtra") def __repr__(self): return "".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs) class PA_JobManager_FE_Message(Base): __tablename__ = "pa_job_manager_fe_message" id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True ) job_id = Column(Integer, ForeignKey('job.id'), primary_key=True ) alert = Column(String) message = Column(String) def __repr__(self): return " keep_dirs[dirname].last_import_date: AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) ) if os.path.isdir(file): path_prefix=os.path.join(symlink,fname) dir=AddDir( job, fname, path_prefix, dir ) fcnt[path_prefix]=0 keep_dirs[dir.path_prefix]=dir else: overall_file_cnt=overall_file_cnt+1 dirname=SymlinkName(path, file) fcnt[dirname]=fcnt[dirname]+1 if isImage(file): type_str = 'Image' elif isVideo(file): type_str = 'Video' else: type_str = 'File' fsize = round(os.stat(file).st_size/(1024*1024)) e=AddFile( job, os.path.basename(fname), type_str, fsize, dir ) else: AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file ) FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) ) for d in keep_dirs: keep_dirs[d].num_files = fcnt[d] keep_dirs[d].last_import_date = time.time() # override this to be all the files in dir & its sub dirs... (used to know how many files in jobs for this import dir) import_dir.num_files=overall_file_cnt else: FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" ) for j in session.query(Job).filter(Job.wait_for==job.id).all(): print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) ) FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" ) session.commit() print("fcnt:", fcnt) return def FilesInDir( path ): d=session.query(Dir).filter(Dir.path_prefix==path).first() return d.files def ProcessFilesInDir(job, e): print("files in dir - process: {}".format(e.name)) if e.type.name != 'Directory': e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) if e.type.name == 'Image': e.file_details[0].thumbnail = GenImageThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) elif e.type.name == 'Video': e.file_details[0].thumbnail = GenVideoThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) ) else: print("need to better process: {}".format(e)) d=session.query(Dir).filter(Dir.eid==e.id).first() for sub in d.files: ProcessFilesInDir(job, sub ) def JobGetFileDetails(job): JobProgressState( job, "In Progress" ) for jex in job.extra: if jex.name =="path": path=jex.value path=FixPath('static/{}'.format( os.path.basename(path[0:-1]))) print("DEBUG: JobGetFileDetails for path={}".format( path ) ) dir=session.query(Dir).filter(Dir.path_prefix==path).first() job.current_file_num = 0 job.num_files = dir.num_files session.commit() for e in FilesInDir( path ): ProcessFilesInDir(job, e ) FinishJob(job, "DEBUG: File Details processed") session.commit() return def isVideo(file): try: fileInfo = MediaInfo.parse(file) for track in fileInfo.tracks: if track.track_type == "Video": return True return False except Exception as e: return False # Converts linux paths into windows paths # HACK: assumes c:, might be best to just look for [a-z]: ? def FixPath(p): if p.startswith('c:'): p = p.replace('/', '\\') return p # Returns an md5 hash of the fnames' contents def md5(job, fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) hash = hash_md5.hexdigest() AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) ) return hash def isImage(file): try: img = Image.open(file) return True except: return False def GenImageThumbnail(job, file): AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file ) f = open(file, 'rb') try: tags = exifread.process_file(f) except: print('WARNING: NO EXIF TAGS?!?!?!?') AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file)) f.close() thumbnail = base64.b64encode(tags['JPEGThumbnail']) thumbnail = str(thumbnail)[2:-1] return thumbnail def GenVideoThumbnail(job, file): AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file ) vcap = cv2.VideoCapture(file) res, im_ar = vcap.read() while im_ar.mean() < 15 and res: res, im_ar = vcap.read() im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR) res, thumb_buf = cv2.imencode('.jpeg', im_ar) bt = thumb_buf.tostring() thumbnail = base64.b64encode(bt) thumbnail = str(thumbnail)[2:-1] return thumbnail if __name__ == "__main__": print("PA job manager starting") try: InitialiseManager() ProcessImportDirs() except Exception as e: print( "Failed to initialise PA Job Manager: {}".format(e) ) session.rollback() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT)) s.listen() while True: conn, addr = s.accept() print("Connection from: {} so HandleJobs".format(addr)) HandleJobs()