From 4ed76961a81ec95c3f445d0e1d5b6cd1149f0c13 Mon Sep 17 00:00:00 2001 From: Damien De Paoli Date: Thu, 27 May 2021 18:34:08 +1000 Subject: [PATCH] Now have an AddPath (which calls AddDir if needed). This also fixes a bug where re-scanning, caused duplicate database entries. Also cleaned up comments, prints --- pa_job_manager.py | 71 ++++++++++++++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 25 deletions(-) diff --git a/pa_job_manager.py b/pa_job_manager.py index d09df63..adbb814 100644 --- a/pa_job_manager.py +++ b/pa_job_manager.py @@ -1,3 +1,4 @@ +### # # This file controls the 'external' job control manager, that (periodically # # looks / somehow is pushed an event?) picks up new jobs, and processes them. @@ -81,7 +82,7 @@ class PathType(Base): name = Column(String, unique=True, nullable=False ) def __repr__(self): - return f"" + return f"" class Dir(Base): __tablename__ = "dir" @@ -421,9 +426,9 @@ def HandleJobs(): if job.wait_for != None: j2 = session.query(Job).get(job.wait_for) if not j2: - print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for )) + print ("ERROR: job.wait_for ({}) does not exist in below? ".format( job.wait_for )) for j in session.query(Job).all(): - print ("j={}".format(j.id)) + print ("ERROR: j={}".format(j.id)) continue if j2.pa_job_state != 'Completed': continue @@ -441,7 +446,7 @@ def HandleJobs(): try: MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) ) except Exception as e2: - print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) ) + print("ERROR: Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) ) print("INFO: PA job manager is waiting for a job") return @@ -490,11 +495,25 @@ def CreateSymlink(job,ptype,path): path_type = session.query(PathType).get(ptype) symlink=SymlinkName(path_type.name, path, path) if not os.path.exists(symlink): - print( f"symlink does not exist - but is it missing path_type after static? s={symlink}" ) + print( f"INFO: symlink does not exist, actually creating it -- s={symlink}" ) os.makedirs( os.path.dirname(symlink), mode=0o777, exist_ok=True ) os.symlink(path, symlink) return symlink +# function to create or return a Path object basedon the path prefix (pp) and the type (the type id of Import, Storage, Bin) +# if the Path is created, it also creates the Dir object (which in turn creates the Entry object) +def AddPath(job, pp, type ): + path_obj=session.query(Path).filter(Path.path_prefix==pp).first() + if not path_obj: + path_obj=Path( path_prefix=pp, num_files=0, type_id=type ) + # if there is no path yet, then there is no Dir for it, so create that too (which creates the entry, etc.) + # Dir name is os.path.basename(pp), is not in another Dir (None), and its relative path is "", inside the path_obj we just created + dir=AddDir( job, os.path.basename(pp), None, "", path_obj ) + session.add(path_obj) + session.add(dir) + return path_obj + + ################################################################################################################################################################ # # Key function that runs as part of (usually) an import job. The name of the directory (dirname) is checked to see @@ -522,7 +541,7 @@ def AddDir(job, dirname, in_dir, rel_path, in_path ): if in_dir: e.in_dir=in_dir if DEBUG==1: - print(f"AddDir: created d={dirname}, rp={rel_path}") + print(f"DEBUG: AddDir: created d={dirname}, rp={rel_path}") AddLogForJob(job, f"DEBUG: Process new dir: {dirname}") session.add(e) return dir @@ -573,9 +592,12 @@ def RemoveFileFromFS( del_me ): dst=dst_dir + '/' + del_me.name os.replace( src, dst ) except Exception as e: - print( f"Failed to remove file from filesystem - which={src}, err: {e}") + print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}") return +# Functoin that moves a file we are "deleting" to the recycle bin, it moves the +# file on the filesystem and then changes the database path from the import or +# storage path over to the Bin path def MoveFileToRecycleBin(job,del_me): try: settings = session.query(Settings).first() @@ -585,29 +607,22 @@ def MoveFileToRecycleBin(job,del_me): dst=dst_dir + '/' + del_me.name os.replace( src, dst ) except Exception as e: - print( f"Failed to remove file from filesystem - which={src}, err: {e}") + print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}") bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first() - print( f"bin={bin}") - print( f"del_me={del_me}") new_rel_path=del_me.in_dir.in_path.path_prefix.replace('static/','') # if there is a relative path on this dir, add it to the new_rel_path as there is only ever 1 Bin path if len(del_me.in_dir.rel_path): new_rel_path += '/' + del_me.in_dir.rel_path - print( f"new_rel_path={new_rel_path}" ) - parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==bin_path.id).first() - print( f"parent_dir for path={parent_dir}" ) part_rel_path="" for dirname in new_rel_path.split("/"): part_rel_path += f"{dirname}" - print( f"AddDir( {dirname} in {parent_dir} with {part_rel_path} as pfx ) ") new_dir=AddDir( job, dirname, parent_dir, part_rel_path, bin_path ) parent_dir=new_dir part_rel_path += "/" - print( f"new_dir={new_dir}" ) del_me.in_dir = new_dir return @@ -619,6 +634,8 @@ def RemoveDirFromDB(id): session.query(Entry).filter(Entry.id==id).delete() return +# this routine is used when a scan finds files/dirs that have been removed +# underneath PA, so it just deletes them form the DB def HandleAnyFSDeletions(job): dtype=session.query(FileType).filter(FileType.name=='Directory').first() rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all() @@ -635,6 +652,7 @@ def HandleAnyFSDeletions(job): rm_cnt+=1 return rm_cnt +# try several ways to work out date of file created (try exif, then filename, lastly filesystem) def GetDateFromFile(file, stat): # try exif try: @@ -684,26 +702,31 @@ def JobImportDir(job): return symlink=CreateSymlink(job,path_type,path) - path_obj=Path( path_prefix=symlink, num_files=0, type_id=path_type ) + # create/find the Path + path_obj=AddPath( job, symlink, path_type ) session.add(path_obj) # find all jobs waiting on me and their children, etc. and add a path_prefix jex to symlink, so we can just reference it form here on in, rather than recreate that string AddJexToDependantJobs(job,"path_prefix",symlink) ResetExistsOnFS(job, symlink) + # go through data once to work out file_cnt so progress bar works from first import walk=os.walk(path, topdown=True) ftree=list(walk) - - # go through data once to work out file_cnt so progress bar works from first import overall_file_cnt=0 for root, subdirs, files in ftree: overall_file_cnt+= len(subdirs) + len(files) path_obj.num_files=overall_file_cnt - parent_dir=None - # rel_path is always '' at the top of the path objects path_prefix for the first dir - dir=AddDir(job, os.path.basename(symlink), parent_dir, '', path_obj) + # this is needed so that later on, when we AddDir for any dirs we find via os.walk, they have a parent dir object that is the dir for the path we are in + parent_dir=path_obj.dir_obj + + # first time through we need dir to be the top level, we have a special if below to no recreate the top dir that AddPath created already + dir=parent_dir # session.add in case we already have imported this dir (as AddDir wont) & now we might have diff num of files to last time, - session.add(dir) + session.add(parent_dir) + + # if we set / then commit this now, the web page will know how many files + # to process as we then do the slow job of processing them job.num_files=overall_file_cnt AddLogForJob(job, f"Found {overall_file_cnt} file(s) to process") session.commit() @@ -714,7 +737,6 @@ def JobImportDir(job): if root != path: pp=SymlinkName( path_obj.type.name, path, root )+'/'+os.path.basename(root) rel_path=pp.replace(symlink+'/','') - print( f"pp={pp}, root={root}, symlink={symlink}, rel_path={rel_path}" ) dir=AddDir(job, os.path.basename(root), parent_dir, rel_path, path_obj) for basename in files: # commit every 100 files to see progress being made but not hammer the database @@ -909,7 +931,6 @@ def JobGetFileDetails(job): path=[jex.value for jex in job.extra if jex.name == "path"][0] path_prefix=[jex.value for jex in job.extra if jex.name == "path_prefix"][0] print( f"JobGetFileDetails({job}) -- pp={path_prefix}" ) -# path=SymlinkName( path_prefix, path, path ) if DEBUG==1: print("DEBUG: JobGetFileDetails for path={}".format( path_prefix ) ) p=session.query(Path).filter(Path.path_prefix==path_prefix).first() @@ -997,7 +1018,7 @@ def GenVideoThumbnail(job, file): def CheckForDups(job): AddLogForJob( job, f"Check for duplicates" ) - res = session.execute( f"select count(e1.name) as count from entry e1, file f1, dir d1, entry_dir_link edl1, entry e2, file f2, dir d2, entry_dir_link edl2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and f1.hash = f2.hash and e1.id != e2.id" ) + res = session.execute( "select count(e1.id) from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb" ) for row in res: if row.count > 0: AddLogForJob(job, f"Found duplicates, Creating Status message in front-end for attention")