removed filedata*, moved ProcessImportDir out into its own function, used it, also in scannow job, ensure job states are set appropriately to enforce all of the jobs are processed right
This commit is contained in:
@@ -50,96 +50,6 @@ Base = declarative_base()
|
|||||||
# global for us to keep state / let front-end know our state
|
# global for us to keep state / let front-end know our state
|
||||||
pa_eng=None
|
pa_eng=None
|
||||||
|
|
||||||
|
|
||||||
################################################################################
|
|
||||||
# FileData class...
|
|
||||||
################################################################################
|
|
||||||
|
|
||||||
class FileData():
|
|
||||||
def getExif(self, file):
|
|
||||||
f = open(file, 'rb')
|
|
||||||
try:
|
|
||||||
tags = exifread.process_file(f)
|
|
||||||
except:
|
|
||||||
print('WARNING: NO EXIF TAGS?!?!?!?')
|
|
||||||
AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file))
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
fthumbnail = base64.b64encode(tags['JPEGThumbnail'])
|
|
||||||
fthumbnail = str(fthumbnail)[2:-1]
|
|
||||||
return fthumbnail
|
|
||||||
|
|
||||||
def isVideo(self, file):
|
|
||||||
try:
|
|
||||||
fileInfo = MediaInfo.parse(file)
|
|
||||||
for track in fileInfo.tracks:
|
|
||||||
if track.track_type == "Video":
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
except Exception as e:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Converts linux paths into windows paths
|
|
||||||
# HACK: assumes c:, might be best to just look for [a-z]: ?
|
|
||||||
def FixPath(self, p):
|
|
||||||
if p.startswith('c:'):
|
|
||||||
p = p.replace('/', '\\')
|
|
||||||
return p
|
|
||||||
|
|
||||||
# Returns an md5 hash of the fnames' contents
|
|
||||||
def md5(self, fname):
|
|
||||||
hash_md5 = hashlib.md5()
|
|
||||||
with open(fname, "rb") as f:
|
|
||||||
for chunk in iter(lambda: f.read(4096), b""):
|
|
||||||
hash_md5.update(chunk)
|
|
||||||
return hash_md5.hexdigest()
|
|
||||||
|
|
||||||
def isImage(self, file):
|
|
||||||
try:
|
|
||||||
img = Image.open(file)
|
|
||||||
return True
|
|
||||||
except:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def generateVideoThumbnail(self, file):
|
|
||||||
#overall wrapper function for generating video thumbnails
|
|
||||||
vcap = cv2.VideoCapture(file)
|
|
||||||
res, im_ar = vcap.read()
|
|
||||||
while im_ar.mean() < 15 and res:
|
|
||||||
res, im_ar = vcap.read()
|
|
||||||
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
|
|
||||||
#save on a buffer for direct transmission
|
|
||||||
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
|
|
||||||
# '.jpeg' etc are permitted
|
|
||||||
#get the bytes content
|
|
||||||
bt = thumb_buf.tostring()
|
|
||||||
fthumbnail = base64.b64encode(bt)
|
|
||||||
fthumbnail = str(fthumbnail)[2:-1]
|
|
||||||
return fthumbnail
|
|
||||||
|
|
||||||
##############################################################################
|
|
||||||
def ProcessImportDirs(self):
|
|
||||||
settings = session.query(Settings).first()
|
|
||||||
if settings == None:
|
|
||||||
raise Exception("Cannot create file data with no settings / import path is missing")
|
|
||||||
last_import_date = settings.last_import_date
|
|
||||||
paths = settings.import_path.split("#")
|
|
||||||
for path in paths:
|
|
||||||
# make new Job; HandleJobs will make them run later
|
|
||||||
jex=JobExtra( name="path", value=path )
|
|
||||||
job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None )
|
|
||||||
job.extra.append(jex)
|
|
||||||
session.add(job)
|
|
||||||
# force commit to make job.id be valid in use of wait_for later
|
|
||||||
session.commit()
|
|
||||||
jex2=JobExtra( name="path", value=path )
|
|
||||||
job2=Job(start_time='now()', last_update='now()', name="getfiledetails", state="New", wait_for=job.id )
|
|
||||||
job2.extra.append(jex2)
|
|
||||||
session.add(job2)
|
|
||||||
print ("adding job2 id={}, wait_for={}, job is: {}".format( job2.id, job2.wait_for, job.id ) )
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
################################################################################
|
################################################################################
|
||||||
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
||||||
# This has to match one-for-one the DB table
|
# This has to match one-for-one the DB table
|
||||||
@@ -157,10 +67,11 @@ class Dir(Base):
|
|||||||
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
||||||
path_prefix = Column(String, unique=False, nullable=False )
|
path_prefix = Column(String, unique=False, nullable=False )
|
||||||
num_files = Column(Integer)
|
num_files = Column(Integer)
|
||||||
|
last_import_date = Column(Float)
|
||||||
files = relationship("Entry", secondary="entry_dir_link")
|
files = relationship("Entry", secondary="entry_dir_link")
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<eid: {}, path_prefix: {}>".format(self.eid, self.path_prefix)
|
return "<eid: {}, path_prefix: {}, num_files: {}>".format(self.eid, self.path_prefix, self.num_files)
|
||||||
|
|
||||||
class Entry(Base):
|
class Entry(Base):
|
||||||
__tablename__ = "entry"
|
__tablename__ = "entry"
|
||||||
@@ -197,15 +108,11 @@ class Settings(Base):
|
|||||||
__tablename__ = "settings"
|
__tablename__ = "settings"
|
||||||
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
|
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
|
||||||
import_path = Column(String)
|
import_path = Column(String)
|
||||||
last_import_date = Column(Float)
|
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<id: {}, import_path: {}, last_import_date: {}>".format(self.id, self.import_path, self.last_import_date)
|
return "<id: {}, import_path: {}>".format(self.id, self.import_path )
|
||||||
|
|
||||||
|
|
||||||
### Initiatlise the file data set
|
|
||||||
filedata = FileData()
|
|
||||||
|
|
||||||
################################################################################
|
################################################################################
|
||||||
# classes for the job manager:
|
# classes for the job manager:
|
||||||
# PA_JobManager overall status tracking),
|
# PA_JobManager overall status tracking),
|
||||||
@@ -249,8 +156,6 @@ class Job(Base):
|
|||||||
last_update = Column(DateTime(timezone=True))
|
last_update = Column(DateTime(timezone=True))
|
||||||
name = Column(String)
|
name = Column(String)
|
||||||
state = Column(String)
|
state = Column(String)
|
||||||
num_passes = Column(Integer)
|
|
||||||
current_pass = Column(Integer)
|
|
||||||
num_files = Column(Integer)
|
num_files = Column(Integer)
|
||||||
current_file_num = Column(Integer)
|
current_file_num = Column(Integer)
|
||||||
current_file = Column(String)
|
current_file = Column(String)
|
||||||
@@ -261,7 +166,7 @@ class Job(Base):
|
|||||||
extra = relationship( "JobExtra")
|
extra = relationship( "JobExtra")
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_passes: {}, current_pass: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
|
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
|
||||||
|
|
||||||
class PA_JobManager_FE_Message(Base):
|
class PA_JobManager_FE_Message(Base):
|
||||||
__tablename__ = "pa_job_manager_fe_message"
|
__tablename__ = "pa_job_manager_fe_message"
|
||||||
@@ -272,14 +177,12 @@ class PA_JobManager_FE_Message(Base):
|
|||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
||||||
|
|
||||||
|
##############################################################################
|
||||||
def MessageToFE( job_id, alert, message ):
|
def MessageToFE( job_id, alert, message ):
|
||||||
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
||||||
session.add(msg)
|
session.add(msg)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
def GetJobs():
|
|
||||||
return session.query(Job).all()
|
|
||||||
|
|
||||||
def InitialiseManager():
|
def InitialiseManager():
|
||||||
global pa_eng
|
global pa_eng
|
||||||
|
|
||||||
@@ -289,12 +192,45 @@ def InitialiseManager():
|
|||||||
session.add(pa_eng)
|
session.add(pa_eng)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def ProcessImportDirs(parent_job=None):
|
||||||
|
settings = session.query(Settings).first()
|
||||||
|
if settings == None:
|
||||||
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
||||||
|
paths = settings.import_path.split("#")
|
||||||
|
for path in paths:
|
||||||
|
# make new Job; HandleJobs will make them run later
|
||||||
|
jex=JobExtra( name="path", value=path )
|
||||||
|
job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None, pa_job_state="New" )
|
||||||
|
job.extra.append(jex)
|
||||||
|
session.add(job)
|
||||||
|
session.commit()
|
||||||
|
if parent_job:
|
||||||
|
AddLogForJob(parent_job, "adding job id={}, job is: {}".format( job.id, job.name ) )
|
||||||
|
# force commit to make job.id be valid in use of wait_for later
|
||||||
|
session.commit()
|
||||||
|
jex2=JobExtra( name="path", value=path )
|
||||||
|
job2=Job(start_time='now()', last_update='now()', name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New" )
|
||||||
|
job2.extra.append(jex2)
|
||||||
|
session.add(job2)
|
||||||
|
session.commit()
|
||||||
|
if parent_job:
|
||||||
|
AddLogForJob(parent_job, "adding job2 id={}, wait_for={}, job is: {}".format( job2.id, job2.wait_for, job2.name ) )
|
||||||
|
print("about to handleJobs")
|
||||||
|
HandleJobs()
|
||||||
|
return
|
||||||
|
|
||||||
def AddLogForJob(job, message, current_file=''):
|
def AddLogForJob(job, message, current_file=''):
|
||||||
now=datetime.now(pytz.utc)
|
now=datetime.now(pytz.utc)
|
||||||
log=Joblog( job_id=job.id, log=message, log_date=now )
|
log=Joblog( job_id=job.id, log=message, log_date=now )
|
||||||
job.last_update=now
|
job.last_update=now
|
||||||
job.current_file=current_file
|
if current_file != '':
|
||||||
|
job.current_file=os.path.basename(current_file)
|
||||||
|
# this may not be set on an import job
|
||||||
|
if job.num_files:
|
||||||
|
job.current_file_num=job.current_file_num+1
|
||||||
|
print("cfm for job: {} is now {}".format(job.id, job.current_file_num));
|
||||||
session.add(log)
|
session.add(log)
|
||||||
|
session.commit()
|
||||||
return
|
return
|
||||||
|
|
||||||
def RunJob(job):
|
def RunJob(job):
|
||||||
@@ -310,16 +246,21 @@ def RunJob(job):
|
|||||||
print("Requested to process unknown job type: {}".format(job.name))
|
print("Requested to process unknown job type: {}".format(job.name))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
||||||
|
job.state=state
|
||||||
|
job.pa_job_state=pa_job_state
|
||||||
|
job.last_update=datetime.now(pytz.utc)
|
||||||
|
AddLogForJob(job, last_log)
|
||||||
|
return
|
||||||
|
|
||||||
def HandleJobs():
|
def HandleJobs():
|
||||||
global pa_eng
|
global pa_eng
|
||||||
|
|
||||||
print("PA job manager is scanning for new jobs to process")
|
print("PA job manager is scanning for new jobs to process")
|
||||||
pa_eng.state = 'Scanning Jobs'
|
pa_eng.state = 'Scanning Jobs'
|
||||||
jobs=GetJobs()
|
for job in session.query(Job).all():
|
||||||
pa_eng.num_active_jobs=0
|
print("processing: {}".format(job))
|
||||||
pa_eng.num_completed_jobs=0
|
if job.pa_job_state == 'New':
|
||||||
for job in jobs:
|
|
||||||
if job.pa_job_state != 'Completed':
|
|
||||||
if job.wait_for != None:
|
if job.wait_for != None:
|
||||||
j2 = session.query(Job).get(job.wait_for)
|
j2 = session.query(Job).get(job.wait_for)
|
||||||
if not j2:
|
if not j2:
|
||||||
@@ -347,39 +288,55 @@ def HandleJobs():
|
|||||||
pa_eng.state = 'Waiting for new Jobs'
|
pa_eng.state = 'Waiting for new Jobs'
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def JobProgressState( job, state ):
|
||||||
|
job.pa_job_state = "In Progress"
|
||||||
|
job.state=state
|
||||||
|
session.commit()
|
||||||
|
return
|
||||||
|
|
||||||
def JobScanNow(job):
|
def JobScanNow(job):
|
||||||
filedata.GenerateFileData(job)
|
JobProgressState( job, "In Progress" )
|
||||||
job.state="Completed"
|
ProcessImportDirs(job)
|
||||||
job.pa_job_state="Completed"
|
FinishJob( job, "Completed (scan for new files)" )
|
||||||
job.last_update=datetime.now(pytz.utc)
|
|
||||||
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
||||||
session.commit()
|
session.commit()
|
||||||
return
|
return
|
||||||
|
|
||||||
def JobForceScan(job):
|
def JobForceScan(job):
|
||||||
|
return
|
||||||
|
JobProgressState( job, "In Progress" )
|
||||||
session.query(File).delete()
|
session.query(File).delete()
|
||||||
settings = session.query(Settings).first()
|
settings = session.query(Settings).first()
|
||||||
if settings == None:
|
if settings == None:
|
||||||
raise Exception("Cannot create file data with no settings / import path is missing")
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
||||||
settings.last_import_date = 0
|
settings.last_import_date = 0
|
||||||
session.commit()
|
FinishJob(job, "Completed (forced remove and recreation of all file data)")
|
||||||
filedata.GenerateFileData(job)
|
|
||||||
job.state="Completed"
|
|
||||||
job.pa_job_state="Completed"
|
|
||||||
job.last_update=datetime.now(pytz.utc)
|
|
||||||
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
|
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
|
||||||
session.commit()
|
session.commit()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def SymlinkName(path, file):
|
||||||
|
sig_bit=file.replace(path, "")
|
||||||
|
last_dir=format( os.path.basename(path[0:-1]))
|
||||||
|
|
||||||
|
if sig_bit[-1] == os.path.sep:
|
||||||
|
last_bit = os.path.dirname(sig_bit)[0:-1]
|
||||||
|
else:
|
||||||
|
last_bit = os.path.dirname(sig_bit)
|
||||||
|
symlink = os.path.join('static', last_dir, last_bit )
|
||||||
|
if symlink[-1] == os.path.sep:
|
||||||
|
symlink=symlink[0:-1]
|
||||||
|
return symlink
|
||||||
|
|
||||||
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
|
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
|
||||||
def MakeSymlink(job,path):
|
def CreateSymlink(job,path):
|
||||||
symlink = FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
symlink = FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
||||||
if not os.path.exists(symlink):
|
if not os.path.exists(symlink):
|
||||||
os.symlink(path, symlink)
|
os.symlink(path, symlink)
|
||||||
return symlink
|
return symlink
|
||||||
|
|
||||||
def AddDir(job, dirname, path_prefix, in_dir):
|
def AddDir(job, dirname, path_prefix, in_dir):
|
||||||
dir=Dir( path_prefix=path_prefix, num_files=0 )
|
dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0 )
|
||||||
dtype = session.query(FileType).filter(FileType.name=='Directory').first()
|
dtype = session.query(FileType).filter(FileType.name=='Directory').first()
|
||||||
e=Entry( name=dirname, type=dtype )
|
e=Entry( name=dirname, type=dtype )
|
||||||
e.dir_details.append(dir)
|
e.dir_details.append(dir)
|
||||||
@@ -401,34 +358,52 @@ def AddFile(job, fname, type_str, fsize, in_dir ):
|
|||||||
return e
|
return e
|
||||||
|
|
||||||
def JobImportDir(job):
|
def JobImportDir(job):
|
||||||
|
JobProgressState( job, "In Progress" )
|
||||||
print("DEBUG: Importing dir")
|
print("DEBUG: Importing dir")
|
||||||
settings = session.query(Settings).first()
|
settings = session.query(Settings).first()
|
||||||
if settings == None:
|
if settings == None:
|
||||||
raise Exception("Cannot create file data with no settings / import path is missing")
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
||||||
last_import_date = settings.last_import_date
|
overall_file_cnt=0
|
||||||
file_cnt=0
|
fcnt={}
|
||||||
|
keep_dirs={}
|
||||||
for jex in job.extra:
|
for jex in job.extra:
|
||||||
if jex.name =="path":
|
if jex.name =="path":
|
||||||
path = FixPath( jex.value)
|
path = FixPath(jex.value)
|
||||||
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
|
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
|
||||||
print("DEBUG: Checking Import Directory: {}".format( path ) )
|
print("DEBUG: Checking Import Directory: {}".format( path ) )
|
||||||
if os.path.exists( path ):
|
if os.path.exists( path ):
|
||||||
symlink=MakeSymlink(job,path)
|
symlink=CreateSymlink(job,path)
|
||||||
dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None )
|
# DDP: dont want to do add a Dir, if this already exists (and if so, check any modificaiton on fs, since last import)!!!!
|
||||||
|
dir=session.query(Dir).filter(Dir.path_prefix==symlink).first()
|
||||||
|
if dir != None:
|
||||||
|
stat = os.stat( os.path.basename(path[0:-1]) )
|
||||||
|
if stat.st_ctime < dir.last_import_date:
|
||||||
|
print( "DEBUG: Directory has not been altered since the last import, just return" )
|
||||||
|
FinishJob( job, "No new files in directory since the last import")
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None )
|
||||||
|
keep_dirs[dir.path_prefix]=dir
|
||||||
import_dir=dir
|
import_dir=dir
|
||||||
|
fcnt[symlink]=0
|
||||||
for file in sorted(glob.glob(path + '**', recursive=True)):
|
for file in sorted(glob.glob(path + '**', recursive=True)):
|
||||||
if file == path:
|
if file == path:
|
||||||
continue
|
continue
|
||||||
fname=file.replace(path, "")
|
fname=file.replace(path, "")
|
||||||
stat = os.stat(file)
|
stat = os.stat(file)
|
||||||
if last_import_date == 0 or stat.st_ctime > last_import_date:
|
dirname=SymlinkName(path, file)
|
||||||
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
|
if keep_dirs[dirname].last_import_date == 0 or stat.st_ctime > keep_dirs[dirname].last_import_date:
|
||||||
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) )
|
AddLogForJob(job, "DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
|
||||||
|
print("DEBUG: {} - {} is newer than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ) )
|
||||||
if os.path.isdir(file):
|
if os.path.isdir(file):
|
||||||
path_prefix=os.path.join(symlink,fname)
|
path_prefix=os.path.join(symlink,fname)
|
||||||
dir=AddDir( job, fname, path_prefix, dir )
|
dir=AddDir( job, fname, path_prefix, dir )
|
||||||
|
fcnt[path_prefix]=0
|
||||||
|
keep_dirs[dir.path_prefix]=dir
|
||||||
else:
|
else:
|
||||||
file_cnt=file_cnt+1
|
overall_file_cnt=overall_file_cnt+1
|
||||||
|
dirname=SymlinkName(path, file)
|
||||||
|
fcnt[dirname]=fcnt[dirname]+1
|
||||||
if isImage(file):
|
if isImage(file):
|
||||||
type_str = 'Image'
|
type_str = 'Image'
|
||||||
elif isVideo(file):
|
elif isVideo(file):
|
||||||
@@ -438,26 +413,21 @@ def JobImportDir(job):
|
|||||||
fsize = round(os.stat(file).st_size/(1024*1024))
|
fsize = round(os.stat(file).st_size/(1024*1024))
|
||||||
e=AddFile( job, os.path.basename(fname), type_str, fsize, dir )
|
e=AddFile( job, os.path.basename(fname), type_str, fsize, dir )
|
||||||
else:
|
else:
|
||||||
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
|
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
|
||||||
print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
|
print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, keep_dirs[dirname].last_import_date ), file )
|
||||||
import_dir.num_files=file_cnt
|
FinishJob(job, "Finished Importing: {} - Found {} new files".format( path, overall_file_cnt ) )
|
||||||
AddLogForJob(job, "Finished Importing: {} - Found {} new files".format( path, file_cnt ) )
|
for d in keep_dirs:
|
||||||
job.pa_job_state = "Completed"
|
keep_dirs[d].num_files = fcnt[d]
|
||||||
job.state = "Completed"
|
keep_dirs[d].last_import_date = time.time()
|
||||||
job.last_updated = datetime.now(pytz.utc)
|
# override this to be all the files in dir & its sub dirs... (used to know how many files in jobs for this import dir)
|
||||||
# settings.last_import_date = time.time()
|
import_dir.num_files=overall_file_cnt
|
||||||
else:
|
else:
|
||||||
AddLogForJob(job, "Finished Importing: {} -- Path does not exist".format( path) )
|
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
|
||||||
job.pa_job_state = "Completed"
|
|
||||||
job.state = "Failed"
|
|
||||||
job.last_updated = datetime.now(pytz.utc)
|
|
||||||
for j in session.query(Job).filter(Job.wait_for==job.id).all():
|
for j in session.query(Job).filter(Job.wait_for==job.id).all():
|
||||||
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
|
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
|
||||||
j.pa_job_state = "Completed"
|
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
|
||||||
j.state = "Withdrawn"
|
|
||||||
j.last_updated = datetime.now(pytz.utc)
|
|
||||||
AddLogForJob(j, "Job has been withdrawn as the job being waited for failed")
|
|
||||||
session.commit()
|
session.commit()
|
||||||
|
print("fcnt:", fcnt)
|
||||||
return
|
return
|
||||||
|
|
||||||
def FilesInDir( path ):
|
def FilesInDir( path ):
|
||||||
@@ -466,6 +436,7 @@ def FilesInDir( path ):
|
|||||||
|
|
||||||
|
|
||||||
def ProcessFilesInDir(job, e):
|
def ProcessFilesInDir(job, e):
|
||||||
|
time.sleep(0.3)
|
||||||
print("files in dir - process: {}".format(e.name))
|
print("files in dir - process: {}".format(e.name))
|
||||||
if e.type.name != 'Directory':
|
if e.type.name != 'Directory':
|
||||||
e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
|
e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
|
||||||
@@ -480,17 +451,21 @@ def ProcessFilesInDir(job, e):
|
|||||||
ProcessFilesInDir(job, sub )
|
ProcessFilesInDir(job, sub )
|
||||||
|
|
||||||
def JobGetFileDetails(job):
|
def JobGetFileDetails(job):
|
||||||
|
JobProgressState( job, "In Progress" )
|
||||||
print("JobGetFileDetails:")
|
print("JobGetFileDetails:")
|
||||||
for jex in job.extra:
|
for jex in job.extra:
|
||||||
if jex.name =="path":
|
if jex.name =="path":
|
||||||
path=jex.value
|
path=jex.value
|
||||||
path=FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
path=FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
||||||
print(" for path={}".format( path ) )
|
print(" for path={}".format( path ) )
|
||||||
|
dir=session.query(Dir).filter(Dir.path_prefix==path).first()
|
||||||
|
job.current_file_num = 0
|
||||||
|
job.num_files = dir.num_files
|
||||||
|
print("cfm set to 0 for job: {}, num_files: {}".format(job.id, job.num_files));
|
||||||
|
session.commit()
|
||||||
for e in FilesInDir( path ):
|
for e in FilesInDir( path ):
|
||||||
ProcessFilesInDir(job, e )
|
ProcessFilesInDir(job, e )
|
||||||
job.pa_job_state = "Completed"
|
FinishJob(job, "File Details processed")
|
||||||
job.state = "Completed"
|
|
||||||
job.last_updated = datetime.now(pytz.utc)
|
|
||||||
session.commit()
|
session.commit()
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -529,7 +504,7 @@ def isImage(file):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def GenImageThumbnail(job, file):
|
def GenImageThumbnail(job, file):
|
||||||
AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ) )
|
AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
|
||||||
f = open(file, 'rb')
|
f = open(file, 'rb')
|
||||||
try:
|
try:
|
||||||
tags = exifread.process_file(f)
|
tags = exifread.process_file(f)
|
||||||
@@ -543,7 +518,7 @@ def GenImageThumbnail(job, file):
|
|||||||
return thumbnail
|
return thumbnail
|
||||||
|
|
||||||
def GenVideoThumbnail(job, file):
|
def GenVideoThumbnail(job, file):
|
||||||
AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ) )
|
AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file )
|
||||||
vcap = cv2.VideoCapture(file)
|
vcap = cv2.VideoCapture(file)
|
||||||
res, im_ar = vcap.read()
|
res, im_ar = vcap.read()
|
||||||
while im_ar.mean() < 15 and res:
|
while im_ar.mean() < 15 and res:
|
||||||
@@ -560,12 +535,10 @@ if __name__ == "__main__":
|
|||||||
print("PA job manager starting")
|
print("PA job manager starting")
|
||||||
try:
|
try:
|
||||||
InitialiseManager()
|
InitialiseManager()
|
||||||
filedata.ProcessImportDirs()
|
ProcessImportDirs()
|
||||||
session.commit()
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print( "Failed to initialise PA Job Manager: {}".format(e) )
|
print( "Failed to initialise PA Job Manager: {}".format(e) )
|
||||||
session.rollback()
|
session.rollback()
|
||||||
HandleJobs()
|
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
||||||
s.listen()
|
s.listen()
|
||||||
|
|||||||
Reference in New Issue
Block a user