Files
photoassistant/pa_job_manager.py

644 lines
26 KiB
Python

###
#
# This file controls the 'external' job control manager, that (periodically #
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
#
# It then stores the progress/status, etc. in job and joblog tables as needed
# via wrapper functions.
#
# The whole pa_job_manager is multi-threaded, and uses the database tables for
# state management and communication back to the pa web site
#
###
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT
from datetime import datetime, timedelta
import pytz
import time
import os
import glob
from PIL import Image
from pymediainfo import MediaInfo
import hashlib
import exifread
import base64
import numpy
import cv2
import socket
import threading
DEBUG=1
# an Manager, which the Session will use for connection resources
some_engine = create_engine(DB_URL)
# create a configured "Session" class
#Session = sessionmaker(bind=some_engine)
# create a Session
session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)
session = Session()
Base = declarative_base()
################################################################################
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
# This has to match one-for-one the DB table
################################################################################
class EntryDirLink(Base):
__tablename__ = "entry_dir_link"
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return "<entry_id: {}, dir_eid: {}>".format(self.entry_id, self.dir_eid)
class Dir(Base):
__tablename__ = "dir"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
path_prefix = Column(String, unique=False, nullable=False )
num_files = Column(Integer)
last_import_date = Column(Float)
last_hash_date = Column(Float)
files = relationship("Entry", secondary="entry_dir_link")
def __repr__(self):
return "<eid: {}, path_prefix: {}, num_files: {}, last_import_date: {}, last_hash_date: {}>".format(self.eid, self.path_prefix, self.num_files, self.last_import_date, self.last_hash_date)
class Entry(Base):
__tablename__ = "entry"
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
type_id = Column(Integer, ForeignKey("file_type.id"))
type=relationship("FileType")
dir_details = relationship( "Dir")
file_details = relationship( "File" )
in_dir = relationship ("Dir", secondary="entry_dir_link" )
def __repr__(self):
return "<id: {}, name: {}, type={}, dir_details={}, file_details={}, in_dir={}>".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir)
class File(Base):
__tablename__ = "file"
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
size_mb = Column(Integer, unique=False, nullable=False)
hash = Column(Integer, unique=True, nullable=True)
thumbnail = Column(String, unique=False, nullable=True)
def __repr__(self):
return "<eid: {}, size_mb={}, hash={}>".format(self.eid, self.size_mb, self.hash )
class FileType(Base):
__tablename__ = "file_type"
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
name = Column(String, unique=True, nullable=False )
def __repr__(self):
return "<id: {}, name={}>".format(self.id, self.name )
class Settings(Base):
__tablename__ = "settings"
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
import_path = Column(String)
def __repr__(self):
return "<id: {}, import_path: {}>".format(self.id, self.import_path )
################################################################################
# classes for the job manager:
# Job (and Joblog, JobExtra) for each Job, and
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
################################################################################
class Joblog(Base):
__tablename__ = "joblog"
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
log_date = Column(DateTime(timezone=True))
log = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
class JobExtra(Base):
__tablename__ = "jobextra"
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id') )
name = Column(String)
value = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
class Job(Base):
__tablename__ = "job"
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
start_time = Column(DateTime(timezone=True))
last_update = Column(DateTime(timezone=True))
name = Column(String)
state = Column(String)
num_files = Column(Integer)
current_file_num = Column(Integer)
current_file = Column(String)
wait_for = Column(Integer)
pa_job_state = Column(String)
logs = relationship( "Joblog")
extra = relationship( "JobExtra")
def __repr__(self):
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
class PA_JobManager_FE_Message(Base):
__tablename__ = "pa_job_manager_fe_message"
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
job_id = Column(Integer, ForeignKey('job.id'), primary_key=True )
alert = Column(String)
message = Column(String)
def __repr__(self):
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
##############################################################################
# Util (non Class) functions
##############################################################################
def MessageToFE( job_id, alert, message ):
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
session.add(msg)
session.commit()
def ProcessImportDirs(parent_job=None):
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
paths = settings.import_path.split("#")
for path in paths:
# make new Job; HandleJobs will make them run later
jex=JobExtra( name="path", value=path )
job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None, pa_job_state="New" )
job.extra.append(jex)
session.add(job)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a>".format( job.id, job.id, job.name ) )
# force commit to make job.id be valid in use of wait_for later
session.commit()
jex2=JobExtra( name="path", value=path )
job2=Job(start_time='now()', last_update='now()', name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New" )
job2.extra.append(jex2)
session.add(job2)
session.commit()
if parent_job:
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) )
HandleJobs()
return
def AddLogForJob(job, message, current_file=''):
now=datetime.now(pytz.utc)
log=Joblog( job_id=job.id, log=message, log_date=now )
job.last_update=now
if current_file != '':
job.current_file=os.path.basename(current_file)
# this may not be set on an import job
if job.num_files:
job.current_file_num=job.current_file_num+1
session.add(log)
session.commit()
return
def RunJob(job):
# session = Session()
if job.name =="scannow":
JobScanNow(job)
elif job.name =="forcescan":
JobForceScan(job)
elif job.name =="importdir":
JobNewImportDir(job)
elif job.name =="getfiledetails":
JobGetFileDetails(job)
else:
print("ERROR: Requested to process unknown job type: {}".format(job.name))
# okay, we finished a job, so check for any jobs that are dependant on this and run them...
# session.close()
HandleJobs()
return
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
job.state=state
job.pa_job_state=pa_job_state
job.last_update=datetime.now(pytz.utc)
AddLogForJob(job, last_log)
return
def HandleJobs():
print("INFO: PA job manager is scanning for new jobs to process")
for job in session.query(Job).all():
if job.pa_job_state == 'New':
if job.wait_for != None:
j2 = session.query(Job).get(job.wait_for)
if not j2:
print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
for j in session.query(Job).all():
print ("j={}".format(j.id))
continue
if j2.pa_job_state != 'Completed':
continue
# use this to remove threads for easier debugging, and errors will stacktrace to the console
if DEBUG==1:
print("*************************************")
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
RunJob(job)
else:
try:
RunJob(job)
# threading.Thread(target=RunJob, args=(job,)).start()
except Exception as e:
try:
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
except Exception as e2:
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
print("INFO: PA job manager is waiting jobs")
return
def JobProgressState( job, state ):
job.pa_job_state = "In Progress"
job.state=state
session.commit()
return
def JobScanNow(job):
JobProgressState( job, "In Progress" )
ProcessImportDirs(job)
FinishJob( job, "Completed (scan for new files)" )
MessageToFE( job.id, "success", "Completed (scan for new files)" )
session.commit()
return
def JobForceScan(job):
JobProgressState( job, "In Progress" )
session.query(EntryDirLink).delete()
session.query(Dir).delete()
session.query(File).delete()
session.query(Entry).delete()
session.commit()
ProcessImportDirs(job)
FinishJob(job, "Completed (forced remove and recreation of all file data)")
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
session.commit()
return
def SymlinkName(path, file):
sig_bit=file.replace(path, "")
last_dir=os.path.basename(path[0:-1])
if sig_bit[-1] == '/':
last_bit = os.path.dirname(sig_bit)[0:-1]
else:
last_bit = os.path.dirname(sig_bit)
symlink = 'static'+'/'+last_dir+'/'+last_bit
if symlink[-1] == '/':
symlink=symlink[0:-1]
return symlink
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
def CreateSymlink(job,path):
symlink='static/{}'.format(os.path.basename(path[0:-1]))
if not os.path.exists(symlink):
os.symlink(path, symlink)
return symlink
def AddDir(job, dirname, path_prefix, in_dir):
# see if this exists already
dir=session.query(Dir).filter(Dir.path_prefix==dirname).first()
if dir:
if DEBUG==1:
print("Found {} returning DB object".format(dirname))
return dir
dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0, last_hash_date=0 )
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
e=Entry( name=dirname, type=dtype )
e.dir_details.append(dir)
# no in_dir occurs when we Add the actual Dir for the import_path (top of the tree)
if in_dir:
e.in_dir.append(in_dir)
if DEBUG==1:
print("AddDir: created {}".format(dirname))
AddLogForJob(job, "DEBUG: AddDir: {} in (dir_id={})".format(dirname, in_dir) )
session.add(e)
return dir
def AddFile(job, fname, type_str, fsize, in_dir ):
ftype = session.query(FileType).filter(FileType.name==type_str).first()
e=Entry( name=fname, type=ftype )
f=File( size_mb=fsize )
e.file_details.append(f)
e.in_dir.append(in_dir)
AddLogForJob(job, "Found new file: {}".format(fname) )
session.add(e)
return e
def JobNewImportDir(job):
JobProgressState( job, "In Progress" )
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
path=[jex.value for jex in job.extra if jex.name == "path"][0]
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
if DEBUG==1:
print("DEBUG: Checking Import Directory: {}".format( path ) )
if not os.path.exists( path ):
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
for j in session.query(Job).filter(Job.wait_for==job.id).all():
if DEBUG==1:
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
return
symlink=CreateSymlink(job,path)
overall_file_cnt=0
walk=os.walk(path, topdown=True)
# root == path of dir, files are in dir... subdirs are in dir
parent_dir=None
for root, subdirs, files in walk:
overall_file_cnt+= len(subdirs) + len(files)
if root == path:
pp = symlink
else:
pp=SymlinkName( path, root )+'/'+os.path.basename(root)
dir=AddDir(job, os.path.basename(root), pp, parent_dir)
parent_dir=dir
stat = os.stat( dir.path_prefix )
# check any modificaiton on fs, since last import, if none we are done
if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date:
if DEBUG==1:
print( "DEBUG: Directory has not been altered since the last import, just ignore contents" )
job.current_file_num=dir.num_files
job.num_files+=dir.num_files
continue
for basename in files:
fname=dir.path_prefix+'/'+basename
stat = os.stat(fname)
if stat.st_ctime > dir.last_import_date:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - is new/updated".format( basename ), basename )
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
if isImage(fname):
type_str = 'Image'
elif isVideo(fname):
type_str = 'Video'
else:
type_str = 'Unknown'
fsize = round(stat.st_size/(1024*1024))
e=AddFile( job, basename, type_str, fsize, dir )
else:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - is unchanged".format( basename, basename ) )
print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename )
dir.num_files=len(files)+len(subdirs)
dir.last_import_date = time.time()
job.num_files=overall_file_cnt
job.current_file_num=overall_file_cnt
FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) )
####### NEED TO FIX THIS BASED ON os.walk contents
import_dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
import_dir.num_files=overall_file_cnt
session.commit()
return
def JobImportDir(job):
JobProgressState( job, "In Progress" )
settings = session.query(Settings).first()
if settings == None:
raise Exception("Cannot create file data with no settings / import path is missing")
overall_file_cnt=0
fcnt={}
keep_dirs={}
path=[jex.value for jex in job.extra if jex.name == "path"][0]
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
if DEBUG==1:
print("DEBUG: Checking Import Directory: {}".format( path ) )
if os.path.exists( path ):
symlink=CreateSymlink(job,path)
# dont want to do add a Dir, if this already exists
dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
if dir != None:
stat = os.stat( symlink )
# check any modificaiton on fs, since last import, if none we are done
if dir.last_import_date > 0 and stat.st_ctime < dir.last_import_date:
if DEBUG==1:
print( "DEBUG: Directory has not been altered since the last import, just return" )
job.current_file_num=dir.num_files
job.num_files=dir.num_files
FinishJob( job, "No new files in directory since the last import")
return
else:
dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None )
session.commit()
keep_dirs[dir.path_prefix]=dir
import_dir=dir
fcnt[symlink]=0
files = sorted(glob.glob(path + '**', recursive=True))
job.current_file_num=0
# reduce this by 1, becasuse we skip file == path below
job.num_files=len(files)-1
session.commit()
for file in sorted(glob.glob(path + '**', recursive=True)):
if file == path:
continue
fname=file.replace(path, "")
stat = os.stat(file)
dirname=SymlinkName(path, file)
if not keep_dirs[dirname]:
print("ERROR: dirname={}, keep_dir={}, fname={}, path={}, symlink=symlink", dirname, keep_dir, fname, path, symlink )
if stat.st_ctime > keep_dirs[dirname].last_import_date:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) )
if os.path.isdir(file):
path_prefix=symlink+'/'+fname
dir=AddDir( job, fname, path_prefix, dir )
fcnt[path_prefix]=0
keep_dirs[dir.path_prefix]=dir
else:
overall_file_cnt=overall_file_cnt+1
dirname=SymlinkName(path, file)
fcnt[dirname]=fcnt[dirname]+1
if isImage(file):
type_str = 'Image'
elif isVideo(file):
type_str = 'Video'
else:
type_str = 'Unknown'
fsize = round(os.stat(file).st_size/(1024*1024))
dir=keep_dirs[dirname]
e=AddFile( job, os.path.basename(fname), type_str, fsize, dir )
else:
if DEBUG==1:
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) )
for d in keep_dirs:
keep_dirs[d].num_files = fcnt[d]
keep_dirs[d].last_import_date = time.time()
# override this to be all the files in dir & its sub dirs... (used to know how many files in jobs for this import dir)
import_dir.num_files=overall_file_cnt
else:
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
for j in session.query(Job).filter(Job.wait_for==job.id).all():
if DEBUG==1:
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
session.commit()
return
def FilesInDir( path ):
d=session.query(Dir).filter(Dir.path_prefix==path).first()
return d.files
def GenHashAndThumb(job, e):
e.file_details[0].hash = md5( job, e.in_dir[0].path_prefix+'/'+ e.name )
if e.type.name == 'Image':
e.file_details[0].thumbnail = GenImageThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Video':
e.file_details[0].thumbnail = GenVideoThumbnail( job, e.in_dir[0].path_prefix+'/'+ e.name )
elif e.type.name == 'Unknown':
job.current_file_num+=1
return
def HashAndThumbDirHasNew(dir):
session.add(dir)
stat = os.stat( dir.path_prefix )
# check any modificaiton on fs, since last import, if none we are done
if stat.st_ctime < dir.last_hash_date:
dir.last_hash_date = time.time()
AddLogForJob(job, "skip {} as it has not changed since last hashing".format(dir.path_prefix))
if DEBUG==1:
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
return 0
dir.last_hash_date = time.time()
return 1
def ProcessFilesInDir(job, e, file_func, go_into_dir_func):
if DEBUG==1:
print("DEBUG: files in dir - process: {} {}".format(e.name, e.in_dir[0].path_prefix))
if e.type.name != 'Directory':
file_func(job, e)
else:
dir=session.query(Dir).filter(Dir.eid==e.id).first()
job.current_file_num+=1
# if this func returns
if not go_into_dir_func(dir):
return
for sub in dir.files:
ProcessFilesInDir(job, sub, file_func, go_into_dir_func)
def JobGetFileDetails(job):
JobProgressState( job, "In Progress" )
path=[jex.value for jex in job.extra if jex.name == "path"][0]
path='static'+'/'+os.path.basename(path[0:-1])
if DEBUG==1:
print("DEBUG: JobGetFileDetails for path={}".format( path ) )
dir=session.query(Dir).filter(Dir.path_prefix==path).first()
stat=os.stat( path )
if stat.st_ctime < dir.last_hash_date:
session.add(dir)
dir.last_hash_date = time.time()
FinishJob(job, "{} has not changed since last hashing - finished job".format(dir.path_prefix))
if DEBUG==1:
print ("DEBUG: skip this dir {} as it has not changed since last hashing".format(dir.path_prefix))
return
dir.last_hash_date = time.time()
job.current_file_num = 0
job.num_files = dir.num_files
session.commit()
for e in FilesInDir( path ):
ProcessFilesInDir(job, e, GenHashAndThumb, HashAndThumbDirHasNew )
FinishJob(job, "File Details job finished")
session.commit()
return
def isVideo(file):
try:
fileInfo = MediaInfo.parse(file)
for track in fileInfo.tracks:
if track.track_type == "Video":
return True
return False
except Exception as e:
return False
# Converts linux paths into windows paths
# HACK: assumes c:, might be best to just look for [a-z]: ?
def FixPath(p):
if p.startswith('c:'):
p = p.replace('/', '\\')
return p
# Returns an md5 hash of the fnames' contents
def md5(job, fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
hash = hash_md5.hexdigest()
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
return hash
def isImage(file):
try:
img = Image.open(file)
return True
except:
return False
def GenImageThumbnail(job, file):
thumbnail=None
AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
f = open(file, 'rb')
try:
tags = exifread.process_file(f)
if '20210121_223307.jpg' in file:
print("Tag: img orientation={}".format( tags['Image Orientation']) )
print("Tag: GPS GPSLatitude={}".format( tags['GPS GPSLatitude']) )
thumbnail = base64.b64encode(tags['JPEGThumbnail'])
thumbnail = str(thumbnail)[2:-1]
except:
print('WARNING: NO EXIF TAGS?!?!?!?')
AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file))
f.close()
return thumbnail
def GenVideoThumbnail(job, file):
AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file )
vcap = cv2.VideoCapture(file)
res, im_ar = vcap.read()
while im_ar.mean() < 15 and res:
res, im_ar = vcap.read()
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
bt = thumb_buf.tostring()
thumbnail = base64.b64encode(bt)
thumbnail = str(thumbnail)[2:-1]
return thumbnail
if __name__ == "__main__":
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
ProcessImportDirs()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
s.listen()
while True:
conn, addr = s.accept()
HandleJobs()