482 lines
18 KiB
Python
482 lines
18 KiB
Python
###
|
|
#
|
|
# This file controls the 'external' job control manager, that (periodically #
|
|
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
|
#
|
|
# It then stores the progress/status, etc. in job and joblog tables as needed
|
|
# via wrapper functions.
|
|
#
|
|
# The whole pa_job_manager is multi-threaded, and uses the database tables for
|
|
# state management and communication back to the pa web site
|
|
#
|
|
###
|
|
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
import time
|
|
import os
|
|
import glob
|
|
from PIL import Image
|
|
from pymediainfo import MediaInfo
|
|
import hashlib
|
|
import exifread
|
|
import base64
|
|
import numpy
|
|
import cv2
|
|
import socket
|
|
import threading
|
|
|
|
DEBUG=1
|
|
|
|
# an Manager, which the Session will use for connection resources
|
|
some_engine = create_engine(DB_URL)
|
|
|
|
# create a configured "Session" class
|
|
Session = sessionmaker(bind=some_engine)
|
|
|
|
# create a Session
|
|
session = Session()
|
|
|
|
Base = declarative_base()
|
|
|
|
|
|
# global for us to keep state / let front-end know our state
|
|
pa_eng=None
|
|
|
|
|
|
################################################################################
|
|
# FileData class...
|
|
################################################################################
|
|
|
|
class FileData():
|
|
def getExif(self, file):
|
|
f = open(file, 'rb')
|
|
try:
|
|
tags = exifread.process_file(f)
|
|
except:
|
|
print('NO EXIF TAGS?!?!?!?')
|
|
AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file))
|
|
f.close()
|
|
raise
|
|
f.close()
|
|
|
|
fthumbnail = base64.b64encode(tags['JPEGThumbnail'])
|
|
fthumbnail = str(fthumbnail)[2:-1]
|
|
return fthumbnail
|
|
|
|
def isVideo(self, file):
|
|
try:
|
|
fileInfo = MediaInfo.parse(file)
|
|
for track in fileInfo.tracks:
|
|
if track.track_type == "Video":
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
|
|
# Converts linux paths into windows paths
|
|
# HACK: assumes c:, might be best to just look for [a-z]: ?
|
|
def FixPath(self, p):
|
|
if p.startswith('c:'):
|
|
p = p.replace('/', '\\')
|
|
return p
|
|
|
|
# Returns an md5 hash of the fnames' contents
|
|
def md5(self, fname):
|
|
hash_md5 = hashlib.md5()
|
|
with open(fname, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
return hash_md5.hexdigest()
|
|
|
|
def isImage(self, file):
|
|
try:
|
|
img = Image.open(file)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def generateVideoThumbnail(self, file):
|
|
#overall wrapper function for generating video thumbnails
|
|
vcap = cv2.VideoCapture(file)
|
|
res, im_ar = vcap.read()
|
|
while im_ar.mean() < 15 and res:
|
|
res, im_ar = vcap.read()
|
|
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
|
|
#save on a buffer for direct transmission
|
|
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
|
|
# '.jpeg' etc are permitted
|
|
#get the bytes content
|
|
bt = thumb_buf.tostring()
|
|
fthumbnail = base64.b64encode(bt)
|
|
fthumbnail = str(fthumbnail)[2:-1]
|
|
return fthumbnail
|
|
|
|
##############################################################################
|
|
def ProcessImportDirs(self):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
|
last_import_date = settings.last_import_date
|
|
paths = settings.import_path.split("#")
|
|
for path in paths:
|
|
# make new Job; HandleJobs will make them run later
|
|
jex=JobExtra( name="path", value=path )
|
|
job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None )
|
|
job.extra.append(jex)
|
|
session.add(job)
|
|
return
|
|
|
|
|
|
################################################################################
|
|
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
|
# This has to match one-for-one the DB table
|
|
################################################################################
|
|
class EntryDirLink(Base):
|
|
__tablename__ = "entry_dir_link"
|
|
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return "<entry_id: {}, dir_eid: {}>".format(self.entry_id, self.dir_eid)
|
|
|
|
class Dir(Base):
|
|
__tablename__ = "dir"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
path_prefix = Column(String, unique=False, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return "<eid: {}, path_prefix: {}>".format(self.eid, self.path_prefix)
|
|
|
|
class Entry(Base):
|
|
__tablename__ = "entry"
|
|
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
type = Column(String, unique=False, nullable=False)
|
|
dir_details = relationship( "Dir")
|
|
file_details = relationship( "New_File" )
|
|
in_dir = relationship ("Dir", secondary="entry_dir_link" )
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, name: {}, type={}, dir_details={}, file_details={}, in_dir={}>".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir)
|
|
|
|
class New_File(Base):
|
|
__tablename__ = "new_file"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
size_mb = Column(Integer, unique=False, nullable=False)
|
|
hash = Column(Integer, unique=True, nullable=True)
|
|
thumbnail = Column(String, unique=False, nullable=True)
|
|
|
|
def __repr__(self):
|
|
return "<eid: {}, size_mb={}, hash={}>".format(self.eid, self.size_mb, self.hash )
|
|
|
|
class FileType(Base):
|
|
__tablename__ = "file_type"
|
|
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
|
|
class File(Base):
|
|
__tablename__ = "file"
|
|
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
type = Column(String, unique=False, nullable=False)
|
|
path_prefix = Column(String, unique=False, nullable=False)
|
|
size_mb = Column(Integer, unique=False, nullable=False)
|
|
# hash might not be unique, this could be the source of dupe problems
|
|
hash = Column(Integer, unique=True, nullable=True)
|
|
thumbnail = Column(String, unique=False, nullable=True)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, name: {}>".format(self.id, self.name )
|
|
|
|
|
|
class Settings(Base):
|
|
__tablename__ = "settings"
|
|
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
|
|
import_path = Column(String)
|
|
last_import_date = Column(Float)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, import_path: {}, last_import_date: {}>".format(self.id, self.import_path, self.last_import_date)
|
|
|
|
|
|
### Initiatlise the file data set
|
|
filedata = FileData()
|
|
|
|
################################################################################
|
|
# classes for the job manager:
|
|
# PA_JobManager overall status tracking),
|
|
# Job (and Joblog) for each JOb, and
|
|
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
|
|
################################################################################
|
|
class PA_JobManager(Base):
|
|
__tablename__ = "pa_job_manager"
|
|
id = Column(Integer, Sequence('pa_job_manager_id_seq'), primary_key=True)
|
|
state = Column(String)
|
|
num_active_jobs = Column(Integer)
|
|
num_completed_jobs = Column(Integer)
|
|
|
|
def __repr__(self):
|
|
return "<id={}, state={}, num_active_jobs={}, num_completed_jobs={}>".format( self.id, self.state, self.num_active_jobs, self.num_completed_jobs )
|
|
|
|
class Joblog(Base):
|
|
__tablename__ = "joblog"
|
|
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
log_date = Column(DateTime(timezone=True))
|
|
log = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
|
|
|
|
class JobExtra(Base):
|
|
__tablename__ = "jobextra"
|
|
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
name = Column(String)
|
|
value = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
|
|
|
|
class Job(Base):
|
|
__tablename__ = "job"
|
|
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
|
|
start_time = Column(DateTime(timezone=True))
|
|
last_update = Column(DateTime(timezone=True))
|
|
name = Column(String)
|
|
state = Column(String)
|
|
num_passes = Column(Integer)
|
|
current_pass = Column(Integer)
|
|
num_files = Column(Integer)
|
|
current_file_num = Column(Integer)
|
|
current_file = Column(String)
|
|
wait_for = Column(Integer)
|
|
pa_job_state = Column(String)
|
|
|
|
logs = relationship( "Joblog")
|
|
extra = relationship( "JobExtra")
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_passes: {}, current_passes: {}, num_files: {}, current_file_num: {}, current_file: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file, self.extra, self.logs)
|
|
|
|
class PA_JobManager_FE_Message(Base):
|
|
__tablename__ = "pa_job_manager_fe_message"
|
|
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id'), primary_key=True )
|
|
alert = Column(String)
|
|
message = Column(String)
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
|
|
|
def MessageToFE( job_id, alert, message ):
|
|
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
|
session.add(msg)
|
|
session.commit()
|
|
|
|
def GetJobs():
|
|
return session.query(Job).all()
|
|
|
|
def InitialiseManager():
|
|
global pa_eng
|
|
|
|
pa_eng=session.query(PA_JobManager).first()
|
|
if( pa_eng == None ):
|
|
pa_eng = PA_JobManager(state='Initialising', num_active_jobs=0, num_completed_jobs=0 )
|
|
session.add(pa_eng)
|
|
return
|
|
|
|
def AddLogForJob(job, message, current_file=''):
|
|
now=datetime.now(pytz.utc)
|
|
log=Joblog( job_id=job.id, log=message, log_date=now )
|
|
job.last_update=now
|
|
job.current_file=current_file
|
|
session.add(log)
|
|
return
|
|
|
|
def RunJob(job):
|
|
if job.name =="scannow":
|
|
JobScanNow(job)
|
|
elif job.name =="forcescan":
|
|
JobForceScan(job)
|
|
elif job.name =="importdir":
|
|
JobImportDir(job)
|
|
else:
|
|
print("Requested to process unknown job type: {}".format(job.name))
|
|
return
|
|
|
|
def HandleJobs():
|
|
global pa_eng
|
|
|
|
print("PA job manager is scanning for new jobs to process")
|
|
pa_eng.state = 'Scanning Jobs'
|
|
jobs=GetJobs()
|
|
pa_eng.num_active_jobs=0
|
|
pa_eng.num_completed_jobs=0
|
|
for job in jobs:
|
|
if job.pa_job_state != 'Completed':
|
|
# use this to remove threads for easier debugging, and errors will stacktrace to the console
|
|
if DEBUG==1:
|
|
RunJob(job)
|
|
else:
|
|
try:
|
|
threading.Thread(target=RunJob, args=(job,)).start()
|
|
except Exception as e:
|
|
try:
|
|
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
|
|
except Exception as e2:
|
|
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
|
print("PA job manager is waiting jobs")
|
|
pa_eng.state = 'Waiting for new Jobs'
|
|
return
|
|
|
|
def JobScanNow(job):
|
|
filedata.GenerateFileData(job)
|
|
job.state="Completed"
|
|
job.pa_job_state="Completed"
|
|
job.last_update=datetime.now(pytz.utc)
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
session.commit()
|
|
return
|
|
|
|
def JobForceScan(job):
|
|
session.query(File).delete()
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
|
settings.last_import_date = 0
|
|
session.commit()
|
|
filedata.GenerateFileData(job)
|
|
job.state="Completed"
|
|
job.pa_job_state="Completed"
|
|
job.last_update=datetime.now(pytz.utc)
|
|
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
|
|
session.commit()
|
|
return
|
|
|
|
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
|
|
def MakeSymlink(job,path):
|
|
symlink = FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
|
if not os.path.exists(symlink):
|
|
os.symlink(path, symlink)
|
|
return symlink
|
|
|
|
def AddDir(job, dirname, path_prefix, in_dir):
|
|
dir=Dir( path_prefix=path_prefix )
|
|
dtype = FileType(name='Directory')
|
|
e=Entry( name=dirname, type=dtype.id )
|
|
e.dir_details.append(dir)
|
|
# this occurs when we Add the actual Dir for the import_path
|
|
if in_dir:
|
|
e.in_dir.append(in_dir)
|
|
AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) )
|
|
session.add(e)
|
|
return dir
|
|
|
|
def AddFile(job, fname, ftype, fsize, in_dir ):
|
|
e=Entry( name=fname, type=ftype )
|
|
f=New_File( size_mb=fsize )
|
|
e.file_details.append(f)
|
|
e.in_dir.append(in_dir)
|
|
AddLogForJob(job, "Found new file: {}".format(fname) )
|
|
session.add(e)
|
|
return e
|
|
|
|
def JobImportDir(job):
|
|
print("DEBUG: Importing dir: {}".format(job.id))
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
|
last_import_date = settings.last_import_date
|
|
for jex in job.extra:
|
|
if jex.name =="path":
|
|
path = FixPath( jex.value)
|
|
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
|
|
if os.path.exists( path ):
|
|
symlink=MakeSymlink(job,path)
|
|
dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None )
|
|
for file in glob.glob(path + '**', recursive=True):
|
|
if file == path:
|
|
continue
|
|
fname=file.replace(path, "")
|
|
stat = os.stat(file)
|
|
if last_import_date == 0 or stat.st_ctime > last_import_date:
|
|
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
|
|
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
|
|
if os.path.isdir(file):
|
|
path_prefix=os.path.join(symlink,fname)
|
|
dir=AddDir( job, fname, path_prefix, dir )
|
|
else:
|
|
if isImage(file):
|
|
ftype = FileType(name='Image')
|
|
elif isVideo(file):
|
|
ftype = FileType(name='Video')
|
|
else:
|
|
ftype = FileType('File')
|
|
fsize = round(os.stat(file).st_size/(1024*1024))
|
|
e=AddFile( job, os.path.basename(fname), ftype.id, fsize, dir )
|
|
else:
|
|
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
|
|
print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
|
|
#settings.last_import_date = time.time()
|
|
session.commit()
|
|
print ("DEBUG: finished Job import dir")
|
|
return
|
|
|
|
def isVideo(file):
|
|
try:
|
|
fileInfo = MediaInfo.parse(file)
|
|
for track in fileInfo.tracks:
|
|
if track.track_type == "Video":
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
|
|
# Converts linux paths into windows paths
|
|
# HACK: assumes c:, might be best to just look for [a-z]: ?
|
|
def FixPath(p):
|
|
if p.startswith('c:'):
|
|
p = p.replace('/', '\\')
|
|
return p
|
|
|
|
# Returns an md5 hash of the fnames' contents
|
|
def md5(fname):
|
|
hash_md5 = hashlib.md5()
|
|
with open(fname, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
return hash_md5.hexdigest()
|
|
|
|
def isImage(file):
|
|
try:
|
|
img = Image.open(file)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
print("PA job manager starting")
|
|
try:
|
|
InitialiseManager()
|
|
filedata.ProcessImportDirs()
|
|
session.commit()
|
|
except Exception as e:
|
|
print( "Failed to initialise PA Job Manager: {}".format(e) )
|
|
session.rollback()
|
|
HandleJobs()
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
|
s.listen()
|
|
while True:
|
|
conn, addr = s.accept()
|
|
print("Connection from: {} so HandleJobs".format(addr))
|
|
HandleJobs()
|