322 lines
12 KiB
Python
322 lines
12 KiB
Python
###
|
|
#
|
|
# This file controls the 'external' job control manager, that (periodically #
|
|
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
|
#
|
|
# It then stores the progress/status, etc. in job and joblog tables as needed
|
|
# via wrapper functions.
|
|
#
|
|
# The whole pa_job_manager is multi-threaded, and uses the database tables for
|
|
# state management and communication back to the pa web site
|
|
#
|
|
###
|
|
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from shared import DB_URL
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
import time
|
|
import os
|
|
import glob
|
|
from PIL import Image
|
|
from pymediainfo import MediaInfo
|
|
import hashlib
|
|
import exifread
|
|
import base64
|
|
import numpy
|
|
import cv2
|
|
|
|
# an Manager, which the Session will use for connection resources
|
|
some_engine = create_engine(DB_URL)
|
|
|
|
# create a configured "Session" class
|
|
Session = sessionmaker(bind=some_engine)
|
|
|
|
# create a Session
|
|
session = Session()
|
|
|
|
Base = declarative_base()
|
|
|
|
|
|
# global for us to keep state / let front-end know our state
|
|
pa_eng=None
|
|
|
|
|
|
################################################################################
|
|
# FileData class...
|
|
################################################################################
|
|
|
|
class FileData():
|
|
def getExif(self, file):
|
|
f = open(file, 'rb')
|
|
try:
|
|
tags = exifread.process_file(f)
|
|
except:
|
|
print('NO EXIF TAGS?!?!?!?')
|
|
f.close()
|
|
raise
|
|
f.close()
|
|
|
|
fthumbnail = base64.b64encode(tags['JPEGThumbnail'])
|
|
fthumbnail = str(fthumbnail)[2:-1]
|
|
return fthumbnail
|
|
|
|
def isVideo(self, file):
|
|
try:
|
|
fileInfo = MediaInfo.parse(file)
|
|
for track in fileInfo.tracks:
|
|
if track.track_type == "Video":
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
|
|
# Converts linux paths into windows paths
|
|
# HACK: assumes c:, might be best to just look for [a-z]: ?
|
|
def FixPath(self, p):
|
|
if p.startswith('c:'):
|
|
p = p.replace('/', '\\')
|
|
return p
|
|
|
|
# Returns an md5 hash of the fnames' contents
|
|
def md5(self, fname):
|
|
hash_md5 = hashlib.md5()
|
|
with open(fname, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
return hash_md5.hexdigest()
|
|
|
|
def isImage(self, file):
|
|
try:
|
|
img = Image.open(file)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def generateVideoThumbnail(self, file):
|
|
#overall wrapper function for generating video thumbnails
|
|
vcap = cv2.VideoCapture(file)
|
|
res, im_ar = vcap.read()
|
|
while im_ar.mean() < 15 and res:
|
|
res, im_ar = vcap.read()
|
|
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
|
|
#save on a buffer for direct transmission
|
|
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
|
|
# '.jpeg' etc are permitted
|
|
#get the bytes content
|
|
bt = thumb_buf.tostring()
|
|
fthumbnail = base64.b64encode(bt)
|
|
fthumbnail = str(fthumbnail)[2:-1]
|
|
return fthumbnail
|
|
|
|
|
|
##############################################################################
|
|
# HACK: At present this only handles one path (need to re-factor if we have #
|
|
# multiple valid paths in import_path) #
|
|
##############################################################################
|
|
def GenerateFileData(self):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
return
|
|
last_import_date = settings.last_import_date
|
|
paths = settings.import_path.split("#")
|
|
|
|
for path in paths:
|
|
print( "GenerateFileData: Checking {}".format( path ) )
|
|
path = self.FixPath(path)
|
|
if os.path.exists( path ):
|
|
# to serve static content of the images, we create a symlink
|
|
# from inside the static subdir of each import_path that exists
|
|
symlink = self.FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
|
if not os.path.exists(symlink):
|
|
os.symlink(path, symlink)
|
|
|
|
file_list=[]
|
|
file_list.append(glob.glob(path + '**', recursive=True))
|
|
for file in file_list[0]:
|
|
if file == path:
|
|
continue
|
|
stat = os.stat(file)
|
|
if last_import_date == 0 or stat.st_ctime > last_import_date:
|
|
print( "{} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
|
|
fthumbnail = None
|
|
if os.path.isdir(file):
|
|
ftype = 'Directory'
|
|
elif self.isImage(file):
|
|
ftype = 'Image'
|
|
fthumbnail = self.getExif(file)
|
|
elif self.isVideo(file):
|
|
ftype = 'Video'
|
|
fthumbnail = self.generateVideoThumbnail(file)
|
|
else:
|
|
ftype = 'File'
|
|
|
|
if ftype != "Directory":
|
|
fhash=self.md5(file)
|
|
else:
|
|
fhash=None
|
|
|
|
fsize = round(os.stat(file).st_size/(1024*1024))
|
|
fname=file.replace(path, "")
|
|
path_prefix=symlink.replace(path,"")
|
|
file_obj = File( name=fname, type=ftype, size_mb=fsize, hash=fhash, path_prefix=path_prefix, thumbnail=fthumbnail )
|
|
session.add(file_obj)
|
|
else:
|
|
print( "{} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ) )
|
|
settings.last_import_date = time.time()
|
|
session.commit()
|
|
return self
|
|
|
|
################################################################################
|
|
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
|
# This has to match one-for-one the DB table
|
|
################################################################################
|
|
class File(Base):
|
|
__tablename__ = "file"
|
|
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
type = Column(String, unique=False, nullable=False)
|
|
path_prefix = Column(String, unique=False, nullable=False)
|
|
size_mb = Column(Integer, unique=False, nullable=False)
|
|
# hash might not be unique, this could be the source of dupe problems
|
|
hash = Column(Integer, unique=True, nullable=True)
|
|
thumbnail = Column(String, unique=False, nullable=True)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, name: {}>".format(self.id, self.name )
|
|
|
|
class Settings(Base):
|
|
__tablename__ = "settings"
|
|
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
|
|
import_path = Column(String)
|
|
last_import_date = Column(Float)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, import_path: {}, last_import_date: {}>".format(self.id, self.import_path, self.last_import_date)
|
|
|
|
|
|
### Initiatlise the file data set
|
|
filedata = FileData()
|
|
|
|
################################################################################
|
|
# classes for the job manager:
|
|
# PA_JobManager overall status tracking),
|
|
# Job (and Joblog) for each JOb, and
|
|
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
|
|
################################################################################
|
|
class PA_JobManager(Base):
|
|
__tablename__ = "pa_job_manager"
|
|
id = Column(Integer, Sequence('pa_job_manager_id_seq'), primary_key=True)
|
|
state = Column(String)
|
|
num_active_jobs = Column(Integer)
|
|
num_completed_jobs = Column(Integer)
|
|
|
|
def __repr__(self):
|
|
return "<id={}, state={}, num_active_jobs={}, num_completed_jobs={}>".format( self.id, self.state, self.num_active_jobs, self.num_completed_jobs )
|
|
|
|
class Joblog(Base):
|
|
__tablename__ = "joblog"
|
|
id = Column(Integer, Sequence('ill_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
log_date = Column(DateTime(timezone=True))
|
|
log = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, log: {}".format(self.id, self.job_id, self.log )
|
|
|
|
class Job(Base):
|
|
__tablename__ = "job"
|
|
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
|
|
start_time = Column(DateTime(timezone=True))
|
|
last_update = Column(DateTime(timezone=True))
|
|
name = Column(String)
|
|
state = Column(String)
|
|
num_passes = Column(Integer)
|
|
current_pass = Column(Integer)
|
|
num_files = Column(Integer)
|
|
current_file_num = Column(Integer)
|
|
current_file = Column(String)
|
|
wait_for = Column(Integer)
|
|
pa_job_state = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_passes: {}, current_passes: {}, num_files: {}, current_file_num: {}, current_file: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file)
|
|
|
|
class PA_JobManager_FE_Message(Base):
|
|
__tablename__ = "pa_job_manager_fe_message"
|
|
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id'), primary_key=True )
|
|
alert = Column(String)
|
|
message = Column(String)
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
|
|
|
def MessageToFE( job_id, alert, message ):
|
|
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
|
session.add(msg)
|
|
session.commit()
|
|
|
|
def GetJobs():
|
|
return session.query(Job).all()
|
|
|
|
def InitialiseManager():
|
|
global pa_eng
|
|
|
|
pa_eng=session.query(PA_JobManager).first()
|
|
print(pa_eng)
|
|
if( pa_eng == None ):
|
|
pa_eng = PA_JobManager(state='Initialising', num_active_jobs=0, num_completed_jobs=0 )
|
|
session.add(pa_eng)
|
|
return
|
|
|
|
def RunJob(job):
|
|
print("Run job: {}, pa_eng state: {}, internal job state: {}".format( job.name, job.pa_job_state, job.state) )
|
|
try:
|
|
if job.name =="scannow":
|
|
JobScanNow(job)
|
|
elif job.name =="forcescan":
|
|
print("force scan not being handled yet")
|
|
else:
|
|
print("Requested to process unknown job type: {}".format(job.name))
|
|
except:
|
|
MessageToFE( job.id, "danger", "Failed (see log for details)" )
|
|
return
|
|
|
|
def HandleJobs():
|
|
print("PA job manager is scanning for jobs")
|
|
pa_eng.state = 'Scanning Jobs'
|
|
jobs=GetJobs()
|
|
for job in jobs:
|
|
if job.pa_job_state != 'complete':
|
|
RunJob(job)
|
|
pa_eng.num_active_jobs = pa_eng.num_active_jobs + 1
|
|
else:
|
|
pa_eng.num_completed_jobs = pa_eng.num_completed_jobs +1
|
|
print("PA job manager is waiting jobs")
|
|
pa_eng.state = 'Waiting for new Jobs'
|
|
return
|
|
|
|
def JobScanNow(job):
|
|
filedata.GenerateFileData()
|
|
job.state="Completed"
|
|
job.pa_job_state="Completed"
|
|
job.last_update=datetime.now(pytz.utc)
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
session.commit()
|
|
return
|
|
|
|
if __name__ == "__main__":
|
|
print("PA job manager starting")
|
|
try:
|
|
InitialiseManager()
|
|
session.commit()
|
|
except Exception as e:
|
|
print( "Failed to initialise PA Job Manager: {}".format(e) )
|
|
session.rollback()
|
|
HandleJobs()
|
|
print("Exiting for now: {}".format( pa_eng ))
|