JobGetFileDetails now works - first pass, need to improve handling of files in dirs - I optimised/assumed last dir is what you are in, but that is not always true, so need to set actual dir each time
This commit is contained in:
@@ -61,10 +61,8 @@ class FileData():
|
||||
try:
|
||||
tags = exifread.process_file(f)
|
||||
except:
|
||||
print('NO EXIF TAGS?!?!?!?')
|
||||
print('WARNING: NO EXIF TAGS?!?!?!?')
|
||||
AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file))
|
||||
f.close()
|
||||
raise
|
||||
f.close()
|
||||
|
||||
fthumbnail = base64.b64encode(tags['JPEGThumbnail'])
|
||||
@@ -132,6 +130,13 @@ class FileData():
|
||||
job=Job(start_time='now()', last_update='now()', name="importdir", state="New", wait_for=None )
|
||||
job.extra.append(jex)
|
||||
session.add(job)
|
||||
# force commit to make job.id be valid in use of wait_for later
|
||||
session.commit()
|
||||
jex2=JobExtra( name="path", value=path )
|
||||
job2=Job(start_time='now()', last_update='now()', name="getfiledetails", state="New", wait_for=job.id )
|
||||
job2.extra.append(jex2)
|
||||
session.add(job2)
|
||||
print ("adding job2 id={}, wait_for={}, job is: {}".format( job2.id, job2.wait_for, job.id ) )
|
||||
return
|
||||
|
||||
|
||||
@@ -151,6 +156,8 @@ class Dir(Base):
|
||||
__tablename__ = "dir"
|
||||
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
||||
path_prefix = Column(String, unique=False, nullable=False )
|
||||
num_files = Column(Integer)
|
||||
files = relationship("Entry", secondary="entry_dir_link")
|
||||
|
||||
def __repr__(self):
|
||||
return "<eid: {}, path_prefix: {}>".format(self.eid, self.path_prefix)
|
||||
@@ -186,21 +193,6 @@ class FileType(Base):
|
||||
def __repr__(self):
|
||||
return "<id: {}, name={}>".format(self.id, self.name )
|
||||
|
||||
class File(Base):
|
||||
__tablename__ = "file"
|
||||
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
||||
name = Column(String, unique=True, nullable=False )
|
||||
type = Column(String, unique=False, nullable=False)
|
||||
path_prefix = Column(String, unique=False, nullable=False)
|
||||
size_mb = Column(Integer, unique=False, nullable=False)
|
||||
# hash might not be unique, this could be the source of dupe problems
|
||||
hash = Column(Integer, unique=True, nullable=True)
|
||||
thumbnail = Column(String, unique=False, nullable=True)
|
||||
|
||||
def __repr__(self):
|
||||
return "<id: {}, name: {}>".format(self.id, self.name )
|
||||
|
||||
|
||||
class Settings(Base):
|
||||
__tablename__ = "settings"
|
||||
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
|
||||
@@ -269,7 +261,7 @@ class Job(Base):
|
||||
extra = relationship( "JobExtra")
|
||||
|
||||
def __repr__(self):
|
||||
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_passes: {}, current_passes: {}, num_files: {}, current_file_num: {}, current_file: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file, self.extra, self.logs)
|
||||
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_passes: {}, current_passes: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_passes, self.current_pass, self.num_files, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
|
||||
|
||||
class PA_JobManager_FE_Message(Base):
|
||||
__tablename__ = "pa_job_manager_fe_message"
|
||||
@@ -312,6 +304,8 @@ def RunJob(job):
|
||||
JobForceScan(job)
|
||||
elif job.name =="importdir":
|
||||
JobImportDir(job)
|
||||
elif job.name =="getfiledetails":
|
||||
JobGetFileDetails(job)
|
||||
else:
|
||||
print("Requested to process unknown job type: {}".format(job.name))
|
||||
return
|
||||
@@ -326,8 +320,20 @@ def HandleJobs():
|
||||
pa_eng.num_completed_jobs=0
|
||||
for job in jobs:
|
||||
if job.pa_job_state != 'Completed':
|
||||
if job.wait_for != None:
|
||||
j2 = session.query(Job).get(job.wait_for)
|
||||
if not j2:
|
||||
print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
|
||||
for j in session.query(Job).all():
|
||||
print ("j={}".format(j.id))
|
||||
continue
|
||||
if j2.pa_job_state != 'Completed':
|
||||
continue
|
||||
|
||||
# use this to remove threads for easier debugging, and errors will stacktrace to the console
|
||||
if DEBUG==1:
|
||||
print("*************************************")
|
||||
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
|
||||
RunJob(job)
|
||||
else:
|
||||
try:
|
||||
@@ -373,10 +379,9 @@ def MakeSymlink(job,path):
|
||||
return symlink
|
||||
|
||||
def AddDir(job, dirname, path_prefix, in_dir):
|
||||
dir=Dir( path_prefix=path_prefix )
|
||||
dir=Dir( path_prefix=path_prefix, num_files=0 )
|
||||
dtype = session.query(FileType).filter(FileType.name=='Directory').first()
|
||||
e=Entry( name=dirname, type=dtype )
|
||||
print( dtype)
|
||||
e.dir_details.append(dir)
|
||||
# this occurs when we Add the actual Dir for the import_path
|
||||
if in_dir:
|
||||
@@ -396,18 +401,21 @@ def AddFile(job, fname, type_str, fsize, in_dir ):
|
||||
return e
|
||||
|
||||
def JobImportDir(job):
|
||||
print("DEBUG: Importing dir: {}".format(job.id))
|
||||
print("DEBUG: Importing dir")
|
||||
settings = session.query(Settings).first()
|
||||
if settings == None:
|
||||
raise Exception("Cannot create file data with no settings / import path is missing")
|
||||
last_import_date = settings.last_import_date
|
||||
file_cnt=0
|
||||
for jex in job.extra:
|
||||
if jex.name =="path":
|
||||
path = FixPath( jex.value)
|
||||
AddLogForJob(job, "Checking Import Directory: {}".format( path ) )
|
||||
print("DEBUG: Checking Import Directory: {}".format( path ) )
|
||||
if os.path.exists( path ):
|
||||
symlink=MakeSymlink(job,path)
|
||||
dir=AddDir(job, os.path.basename(path[0:-1]), symlink, None )
|
||||
import_dir=dir
|
||||
for file in sorted(glob.glob(path + '**', recursive=True)):
|
||||
if file == path:
|
||||
continue
|
||||
@@ -419,7 +427,9 @@ def JobImportDir(job):
|
||||
if os.path.isdir(file):
|
||||
path_prefix=os.path.join(symlink,fname)
|
||||
dir=AddDir( job, fname, path_prefix, dir )
|
||||
print("DEBUG(adddir)");
|
||||
else:
|
||||
file_cnt=file_cnt+1
|
||||
if isImage(file):
|
||||
type_str = 'Image'
|
||||
elif isVideo(file):
|
||||
@@ -428,14 +438,65 @@ def JobImportDir(job):
|
||||
type_str = 'File'
|
||||
fsize = round(os.stat(file).st_size/(1024*1024))
|
||||
e=AddFile( job, os.path.basename(fname), type_str, fsize, dir )
|
||||
print("DEBUG(addfile)");
|
||||
else:
|
||||
AddLogForJob(job, "DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
|
||||
print("DEBUG: {} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ), file )
|
||||
#settings.last_import_date = time.time()
|
||||
import_dir.num_files=file_cnt
|
||||
AddLogForJob(job, "Finished Importing: {} - Found {} new files".format( path, file_cnt ) )
|
||||
job.pa_job_state = "Completed"
|
||||
job.state = "Completed"
|
||||
job.last_updated = datetime.now(pytz.utc)
|
||||
# settings.last_import_date = time.time()
|
||||
print ("DEBUG-END: finished Job import dir: {}".format(job))
|
||||
else:
|
||||
AddLogForJob(job, "Finished Importing: {} -- Path does not exist".format( path) )
|
||||
job.pa_job_state = "Completed"
|
||||
job.state = "Failed"
|
||||
job.last_updated = datetime.now(pytz.utc)
|
||||
for j in session.query(Job).filter(Job.wait_for==job.id).all():
|
||||
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(job.id, j.id) )
|
||||
j.pa_job_state = "Completed"
|
||||
j.state = "Withdrawn"
|
||||
j.last_updated = datetime.now(pytz.utc)
|
||||
AddLogForJob(j, "Job has been withdrawn as the job being waited for failed")
|
||||
session.commit()
|
||||
print ("DEBUG: finished Job import dir")
|
||||
return
|
||||
|
||||
def FilesInDir( path ):
|
||||
d=session.query(Dir).filter(Dir.path_prefix==path).first()
|
||||
return d.files
|
||||
|
||||
|
||||
def ProcessFilesInDir(job, e):
|
||||
print("files in dir - process: {}".format(e.name))
|
||||
if e.type.name != 'Directory':
|
||||
e.file_details[0].hash = md5( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
|
||||
if e.type.name == 'Image':
|
||||
e.file_details[0].thumbnail = GenImageThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
|
||||
elif e.type.name == 'Video':
|
||||
e.file_details[0].thumbnail = GenVideoThumbnail( job, os.path.join( e.in_dir[0].path_prefix, e.name ) )
|
||||
else:
|
||||
print("need to better process: {}".format(e))
|
||||
d=session.query(Dir).filter(Dir.eid==e.id).first()
|
||||
for sub in d.files:
|
||||
ProcessFilesInDir(job, sub )
|
||||
|
||||
def JobGetFileDetails(job):
|
||||
print("JobGetFileDetails:")
|
||||
for jex in job.extra:
|
||||
if jex.name =="path":
|
||||
path=jex.value
|
||||
path=FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
||||
print(" for path={}".format( path ) )
|
||||
for e in FilesInDir( path ):
|
||||
ProcessFilesInDir(job, e )
|
||||
job.pa_job_state = "Completed"
|
||||
job.state = "Completed"
|
||||
job.last_updated = datetime.now(pytz.utc)
|
||||
session.commit()
|
||||
return
|
||||
|
||||
def isVideo(file):
|
||||
try:
|
||||
fileInfo = MediaInfo.parse(file)
|
||||
@@ -454,12 +515,14 @@ def FixPath(p):
|
||||
return p
|
||||
|
||||
# Returns an md5 hash of the fnames' contents
|
||||
def md5(fname):
|
||||
def md5(job, fname):
|
||||
hash_md5 = hashlib.md5()
|
||||
with open(fname, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
hash_md5.update(chunk)
|
||||
return hash_md5.hexdigest()
|
||||
hash = hash_md5.hexdigest()
|
||||
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
|
||||
return hash
|
||||
|
||||
def isImage(file):
|
||||
try:
|
||||
@@ -468,6 +531,34 @@ def isImage(file):
|
||||
except:
|
||||
return False
|
||||
|
||||
def GenImageThumbnail(job, file):
|
||||
AddLogForJob( job, "Generate Thumbnail from Image file: {}".format( file ) )
|
||||
f = open(file, 'rb')
|
||||
try:
|
||||
tags = exifread.process_file(f)
|
||||
except:
|
||||
print('WARNING: NO EXIF TAGS?!?!?!?')
|
||||
AddLogForJob(job, "WARNING: No EXIF TAF found for: {}".format(file))
|
||||
f.close()
|
||||
|
||||
thumbnail = base64.b64encode(tags['JPEGThumbnail'])
|
||||
thumbnail = str(thumbnail)[2:-1]
|
||||
return thumbnail
|
||||
|
||||
def GenVideoThumbnail(job, file):
|
||||
AddLogForJob( job, "Generate Thumbnail from Video file: {}".format( file ) )
|
||||
vcap = cv2.VideoCapture(file)
|
||||
res, im_ar = vcap.read()
|
||||
while im_ar.mean() < 15 and res:
|
||||
res, im_ar = vcap.read()
|
||||
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
|
||||
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
|
||||
bt = thumb_buf.tostring()
|
||||
thumbnail = base64.b64encode(bt)
|
||||
thumbnail = str(thumbnail)[2:-1]
|
||||
return thumbnail
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("PA job manager starting")
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user