Files
photoassistant/pa_job_manager.py

576 lines
22 KiB
Python

###
#
# This file controls the 'external' job control manager, that (periodically #
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
#
# It then stores the progress/status, etc. in job and joblog tables as needed
# via wrapper functions.
#
# The whole pa_job_manager is multi-threaded, and uses the database tables for
# state management and communication back to the pa web site
#
###
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
from datetime import datetime, timedelta
import pytz
import time
import os
import glob
from PIL import Image
from pymediainfo import MediaInfo
import hashlib
import exifread
import base64
import numpy
import cv2
import socket
import threading
DEBUG=1
# an Manager, which the Session will use for connection resources
some_engine = create_engine(DB_URL)
# create a configured "Session" class
Session = sessionmaker(bind=some_engine)
# create a Session
session = Session()
Base = declarative_base()
# global for us to keep state / let front-end know our state
pa_eng=None
################################################################################
# FileData class...
################################################################################
class FileData():
def getExif(self, file):
f = open(file, 'rb')
try:
tags = exifread.process_file(f)
except:
print('WARNING: NO EXIF TAGS?!?!?!?')
AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file))
f.close()
fthumbnail = base64.b64encode(tags['JPEGThumbnail'])
fthumbnail = str(fthumbnail)[2:-1]
return fthumbnail
def isVideo(self, file):
try:
fileInfo = MediaInfo.parse(file)
for track in fileInfo.tracks:
if track.track_type == "Video":
return True
return False
except Exception as e:
return False
# Converts linux paths into windows paths
# HACK: assumes c:, might be best to just look for [a-z]: ?
def FixPath(self, p):
if p.startswith('c:'):
p = p.replace('/', '\\')
return p
# Returns an md5 hash of the fnames' contents
def md5(self, fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def isImage(self, file):
try:
img = Image.open(file)
return True
except:
return False
def generateVideoThumbnail(self, file):
#overall wrapper function for generating video thumbnails
vcap = cv2.VideoCapture(file)
res, im_ar = vcap.read()
while im_ar.mean() < 15 and res:
res, im_ar = vcap.read()
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
#save on a buffer for direct transmission
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
# '.jpeg' etc are permitted
#get the bytes content
bt = thumb_buf.tostring()
fthumbnail = base64.b64encode(bt)
fthumbnail = str(fthumbnail)[2:-1]
return fthumbnail
##############################################################################
def ProcessImportDirs(self):
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
last_import_date = settings.last_import_date
paths = settings.import_path.split("#")
for path in paths:
# make new Job; HandleJobs will make them run later
jex=JobExtra( name="path", value=path )
job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None )
job.extra.append(jex)
session.add(job)
# force commit to make job.id be valid in use of wait_for later
session.commit()
jex2=JobExtra( name="path", value=path )
job2=Job(start_time='now()', last_update='now()', name="getfiledetails", state="New", wait_for=job.id )
job2.extra.append(jex2)
session.add(job2)
print ("adding job2 id={}, wait_for={}, job is: {}".format( job2.id, job2.wait_for, job.id ) )
return
################################################################################
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
# This has to match one-for-one the DB table
################################################################################
class EntryDirLink(Base):
__tablename__ = "entry_dir_link"
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return "<entry_id: {}, dir_eid: {}>".format(self.entry_id, self.dir_eid)
class Dir(Base):
__tablename__ = "dir"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
path_prefix = Column(String, unique=False, nullable=False )
num_files = Column(Integer)
files = relationship("Entry", secondary="entry_dir_link")
def __repr__(self):
return "<eid: {}, path_prefix: {}>".format(self.eid, self.path_prefix)
class Entry(Base):
__tablename__ = "entry"
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
type_id = Column(Integer, ForeignKey("file_type.id"))
type=relationship("FileType")
dir_details = relationship( "Dir")
file_details = relationship( "New_File" )
in_dir = relationship ("Dir", secondary="entry_dir_link" )
def __repr__(self):
return "<id: {}, name: {}, type={}, dir_details={}, file_details={}, in_dir={}>".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir)
class New_File(Base):
__tablename__ = "new_file"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
size_mb = Column(Integer, unique=False, nullable=False)
hash = Column(Integer, unique=True, nullable=True)
thumbnail = Column(String, unique=False, nullable=True)
def __repr__(self):
return "<eid: {}, size_mb={}, hash={}>".format(self.eid, self.size_mb, self.hash )
class FileType(Base):
__tablename__ = "file_type"
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
def __repr__(self):
return "<id: {}, name={}>".format(self.id, self.name )
class Settings(Base):
__tablename__ = "settings"
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
import_path = Column(String)
last_import_date = Column(Float)
def __repr__(self):
return "<id: {}, import_path: {}, last_import_date: {}>".format(self.id, self.import_path, self.last_import_date)
### Initiatlise the file data set
filedata = FileData()
################################################################################
# classes for the job manager:
# PA_JobManager overall status tracking),
# Job (and Joblog) for each JOb, and
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
################################################################################
class PA_JobManager(Base):
__tablename__ = "pa_job_manager"
id = Column(Integer, Sequence('pa_job_manager_id_seq'), primary_key=True)
state = Column(String)
num_active_jobs = Column(Integer)
num_completed_jobs = Column(Integer)
def __repr__(self):
return "<id={}, state={}, num_active_jobs={}, num_completed_jobs={}>".format( self.id, self.state, self.num_active_jobs, self.num_completed_jobs )
class Joblog(Base):
__tablename__ = "joblog"
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
log_date = Column(DateTime(timezone=True))
log = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
class JobExtra(Base):
__tablename__ = "jobextra"
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
name = Column(String)
value = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
class Job(Base):
__tablename__ = "job"
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
start_time = Column(DateTime(timezone=True))
last_update = Column(DateTime(timezone=True))
name = Column(String)
state = Column(String)
num_passes = Column(Integer)
current_pass = Column(Integer)
num_files = Column(Integer)
current_file_num = Column(Integer)
current_file = Column(String)
wait_for = Column(Integer)
pa_job_state = Column(String)
logs = relationship( "Joblog")
extra = relationship( "JobExtra")
def __repr__(self):
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_passes: {}, current_pass: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
class PA_JobManager_FE_Message(Base):
__tablename__ = "pa_job_manager_fe_message"
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id'), primary_key=True )
alert = Column(String)
message = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
def MessageToFE( job_id, alert, message ):
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
session.add(msg)
session.commit()
def GetJobs():
return session.query(Job).all()
def InitialiseManager():
global pa_eng
pa_eng=session.query(PA_JobManager).first()
if( pa_eng == None ):
pa_eng = PA_JobManager(state='Initialising', num_active_jobs=0, num_completed_jobs=0 )
session.add(pa_eng)
return
def AddLogForJob(job, message, current_file=''):
now=datetime.now(pytz.utc)
log=Joblog( job_id=job.id, log=message, log_date=now )
job.last_update=now
job.current_file=current_file
session.add(log)
return
def RunJob(job):
if job.name =="scannow":
JobScanNow(job)
elif job.name =="forcescan":
JobForceScan(job)
elif job.name =="importdir":
JobImportDir(job)
elif job.name =="getfiledetails":
JobGetFileDetails(job)
else:
print("Requested to process unknown job type: {}".format(job.name))
return
def HandleJobs():
global pa_eng
print("PA job manager is scanning for new jobs to process")
pa_eng.state = 'Scanning Jobs'
jobs=GetJobs()
pa_eng.num_active_jobs=0
pa_eng.num_completed_jobs=0
for job in jobs:
if job.pa_job_state != 'Completed':
if job.wait_for != None:
j2 = session.query(Job).get(job.wait_for)
if not j2:
print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
for j in session.query(Job).all():
print ("j={}".format(j.id))
continue
if j2.pa_job_state != 'Completed':
continue
# use this to remove threads for easier debugging, and errors will stacktrace to the console
if DEBUG==1:
print("*************************************")
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
RunJob(job)
else:
try:
threading.Thread(target=RunJob, args=(job,)).start()
except Exception as e:
try:
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
except Exception as e2:
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
print("PA job manager is waiting jobs")
pa_eng.state = 'Waiting for new Jobs'
return
def JobScanNow(job):
filedata.GenerateFileData(job)
job.state="Completed"
job.pa_job_state="Completed"
job.last_update=datetime.now(pytz.utc)
MessageToFE( job.id, "success", "Completed (scan for new files)" )
session.commit()
return
def JobForceScan(job):
session.query(File).delete()
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
settings.last_import_date = 0
session.commit()
filedata.GenerateFileData(job)
job.state="Completed"
job.pa_job_state="Completed"
job.last_update=datetime.now(pytz.utc)
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
session.commit()
return
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
def MakeSymlink(job,path):
symlink = FixPath('static/{}'.format( os.path.basename(path[0:-1])))
if not os.path.exists(symlink):
os.symlink(path, symlink)
return symlink
def AddDir(job, dirname, path_prefix, in_dir):
dir=Dir( path_prefix=path_prefix, num_files=0 )
dtype = session.query(FileType).filter(FileType.name=='Directory').first()
e=Entry( name=dirname, type=dtype )
e.dir_details.append(dir)
# this occurs when we Add the actual Dir for the import_path
if in_dir:
e.in_dir.append(in_dir)
AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) )
session.add(e)
return dir
def AddFile(job, fname, type_str, fsize, in_dir ):
ftype = session.query(FileType).filter(FileType.name==type_str).first()
e=Entry( name=fname, type=ftype )
f=New_File( size_mb=fsize )
e.file_details.append(f)
e.in_dir.append(in_dir)
AddLogForJob(job, "Found new file: {}".format(fname) )
session.add(e)
return e
def JobImportDir(job):
print("DEBUG: Importing dir")
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
last_import_date = settings.last_import_date
file_cnt=0
for jex in job.extra:
if jex.name =="path":
path = FixPath( jex.value)
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
print("DEBUG: Checking Import Directory: {}".format( path ) )
if os.path.exists( path ):
symlink=MakeSymlink(job,path)
dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None )
import_dir=dir
for file in sorted(glob.glob(path + '**', recursive=True)):
if file == path:
continue
fname=file.replace(path, "")
stat = os.stat(file)
if last_import_date == 0 or stat.st_ctime > last_import_date:
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
if os.path.isdir(file):
path_prefix=os.path.join(symlink,fname)
dir=AddDir( job, fname, path_prefix, dir )
else:
file_cnt=file_cnt+1
if isImage(file):
type_str = 'Image'
elif isVideo(file):
type_str = 'Video'
else:
type_str = 'File'
fsize = round(os.stat(file).st_size/(1024*1024))
e=AddFile( job, os.path.basename(fname), type_str, fsize, dir )
else:
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
import_dir.num_files=file_cnt
AddLogForJob(job, "Finished Importing: {} - Found {} new files".format( path, file_cnt ) )
job.pa_job_state = "Completed"
job.state = "Completed"
job.last_updated = datetime.now(pytz.utc)
# settings.last_import_date = time.time()
else:
AddLogForJob(job, "Finished Importing: {} -- Path does not exist".format( path) )
job.pa_job_state = "Completed"
job.state = "Failed"
job.last_updated = datetime.now(pytz.utc)
for j in session.query(Job).filter(Job.wait_for==job.id).all():
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
j.pa_job_state = "Completed"
j.state = "Withdrawn"
j.last_updated = datetime.now(pytz.utc)
AddLogForJob(j, "Job has been withdrawn as the job being waited for failed")
session.commit()
return
def FilesInDir( path ):
d=session.query(Dir).filter(Dir.path_prefix==path).first()
return d.files
def ProcessFilesInDir(job, e):
print("files in dir - process: {}".format(e.name))
if e.type.name != 'Directory':
e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
if e.type.name == 'Image':
e.file_details[0].thumbnail = GenImageThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
elif e.type.name == 'Video':
e.file_details[0].thumbnail = GenVideoThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
else:
print("need to better process: {}".format(e))
d=session.query(Dir).filter(Dir.eid==e.id).first()
for sub in d.files:
ProcessFilesInDir(job, sub )
def JobGetFileDetails(job):
print("JobGetFileDetails:")
for jex in job.extra:
if jex.name =="path":
path=jex.value
path=FixPath('static/{}'.format( os.path.basename(path[0:-1])))
print(" for path={}".format( path ) )
for e in FilesInDir( path ):
ProcessFilesInDir(job, e )
job.pa_job_state = "Completed"
job.state = "Completed"
job.last_updated = datetime.now(pytz.utc)
session.commit()
return
def isVideo(file):
try:
fileInfo = MediaInfo.parse(file)
for track in fileInfo.tracks:
if track.track_type == "Video":
return True
return False
except Exception as e:
return False
# Converts linux paths into windows paths
# HACK: assumes c:, might be best to just look for [a-z]: ?
def FixPath(p):
if p.startswith('c:'):
p = p.replace('/', '\\')
return p
# Returns an md5 hash of the fnames' contents
def md5(job, fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
hash = hash_md5.hexdigest()
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
return hash
def isImage(file):
try:
img = Image.open(file)
return True
except:
return False
def GenImageThumbnail(job, file):
AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ) )
f = open(file, 'rb')
try:
tags = exifread.process_file(f)
except:
print('WARNING: NO EXIF TAGS?!?!?!?')
AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file))
f.close()
thumbnail = base64.b64encode(tags['JPEGThumbnail'])
thumbnail = str(thumbnail)[2:-1]
return thumbnail
def GenVideoThumbnail(job, file):
AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ) )
vcap = cv2.VideoCapture(file)
res, im_ar = vcap.read()
while im_ar.mean() < 15 and res:
res, im_ar = vcap.read()
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
bt = thumb_buf.tostring()
thumbnail = base64.b64encode(bt)
thumbnail = str(thumbnail)[2:-1]
return thumbnail
if __name__ == "__main__":
print("PA job manager starting")
try:
InitialiseManager()
filedata.ProcessImportDirs()
session.commit()
except Exception as e:
print( "Failed to initialise PA Job Manager: {}".format(e) )
session.rollback()
HandleJobs()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
s.listen()
while True:
conn, addr = s.accept()
print("Connection from: {} so HandleJobs".format(addr))
HandleJobs()