Now have an AddPath (which calls AddDir if needed). This also fixes a bug where re-scanning, caused duplicate database entries. Also cleaned up comments, prints
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
###
|
||||
#
|
||||
# This file controls the 'external' job control manager, that (periodically #
|
||||
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
||||
@@ -81,7 +82,7 @@ class PathType(Base):
|
||||
name = Column(String, unique=True, nullable=False )
|
||||
|
||||
def __repr__(self):
|
||||
return f"<id: {self.id}, name={self.name}>"
|
||||
return f"<id: {self.id}, name={self.name}"
|
||||
|
||||
|
||||
class PathDirLink(Base):
|
||||
@@ -107,6 +108,10 @@ class Path(Base):
|
||||
type = relationship("PathType")
|
||||
path_prefix = Column(String, unique=True, nullable=False )
|
||||
num_files = Column(Integer)
|
||||
dir_obj = relationship("Dir", secondary="path_dir_link", uselist=False)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<id: {self.id}, type={self.type}, path_prefix={self.path_prefix}, dir_obj={self.dir_obj}>"
|
||||
|
||||
class Dir(Base):
|
||||
__tablename__ = "dir"
|
||||
@@ -421,9 +426,9 @@ def HandleJobs():
|
||||
if job.wait_for != None:
|
||||
j2 = session.query(Job).get(job.wait_for)
|
||||
if not j2:
|
||||
print ("WTF? job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
|
||||
print ("ERROR: job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
|
||||
for j in session.query(Job).all():
|
||||
print ("j={}".format(j.id))
|
||||
print ("ERROR: j={}".format(j.id))
|
||||
continue
|
||||
if j2.pa_job_state != 'Completed':
|
||||
continue
|
||||
@@ -441,7 +446,7 @@ def HandleJobs():
|
||||
try:
|
||||
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
|
||||
except Exception as e2:
|
||||
print("Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
||||
print("ERROR: Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
||||
print("INFO: PA job manager is waiting for a job")
|
||||
return
|
||||
|
||||
@@ -490,11 +495,25 @@ def CreateSymlink(job,ptype,path):
|
||||
path_type = session.query(PathType).get(ptype)
|
||||
symlink=SymlinkName(path_type.name, path, path)
|
||||
if not os.path.exists(symlink):
|
||||
print( f"symlink does not exist - but is it missing path_type after static? s={symlink}" )
|
||||
print( f"INFO: symlink does not exist, actually creating it -- s={symlink}" )
|
||||
os.makedirs( os.path.dirname(symlink), mode=0o777, exist_ok=True )
|
||||
os.symlink(path, symlink)
|
||||
return symlink
|
||||
|
||||
# function to create or return a Path object basedon the path prefix (pp) and the type (the type id of Import, Storage, Bin)
|
||||
# if the Path is created, it also creates the Dir object (which in turn creates the Entry object)
|
||||
def AddPath(job, pp, type ):
|
||||
path_obj=session.query(Path).filter(Path.path_prefix==pp).first()
|
||||
if not path_obj:
|
||||
path_obj=Path( path_prefix=pp, num_files=0, type_id=type )
|
||||
# if there is no path yet, then there is no Dir for it, so create that too (which creates the entry, etc.)
|
||||
# Dir name is os.path.basename(pp), is not in another Dir (None), and its relative path is "", inside the path_obj we just created
|
||||
dir=AddDir( job, os.path.basename(pp), None, "", path_obj )
|
||||
session.add(path_obj)
|
||||
session.add(dir)
|
||||
return path_obj
|
||||
|
||||
|
||||
################################################################################################################################################################
|
||||
#
|
||||
# Key function that runs as part of (usually) an import job. The name of the directory (dirname) is checked to see
|
||||
@@ -522,7 +541,7 @@ def AddDir(job, dirname, in_dir, rel_path, in_path ):
|
||||
if in_dir:
|
||||
e.in_dir=in_dir
|
||||
if DEBUG==1:
|
||||
print(f"AddDir: created d={dirname}, rp={rel_path}")
|
||||
print(f"DEBUG: AddDir: created d={dirname}, rp={rel_path}")
|
||||
AddLogForJob(job, f"DEBUG: Process new dir: {dirname}")
|
||||
session.add(e)
|
||||
return dir
|
||||
@@ -573,9 +592,12 @@ def RemoveFileFromFS( del_me ):
|
||||
dst=dst_dir + '/' + del_me.name
|
||||
os.replace( src, dst )
|
||||
except Exception as e:
|
||||
print( f"Failed to remove file from filesystem - which={src}, err: {e}")
|
||||
print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}")
|
||||
return
|
||||
|
||||
# Functoin that moves a file we are "deleting" to the recycle bin, it moves the
|
||||
# file on the filesystem and then changes the database path from the import or
|
||||
# storage path over to the Bin path
|
||||
def MoveFileToRecycleBin(job,del_me):
|
||||
try:
|
||||
settings = session.query(Settings).first()
|
||||
@@ -585,29 +607,22 @@ def MoveFileToRecycleBin(job,del_me):
|
||||
dst=dst_dir + '/' + del_me.name
|
||||
os.replace( src, dst )
|
||||
except Exception as e:
|
||||
print( f"Failed to remove file from filesystem - which={src}, err: {e}")
|
||||
print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}")
|
||||
bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
|
||||
print( f"bin={bin}")
|
||||
print( f"del_me={del_me}")
|
||||
new_rel_path=del_me.in_dir.in_path.path_prefix.replace('static/','')
|
||||
|
||||
# if there is a relative path on this dir, add it to the new_rel_path as there is only ever 1 Bin path
|
||||
if len(del_me.in_dir.rel_path):
|
||||
new_rel_path += '/' + del_me.in_dir.rel_path
|
||||
|
||||
print( f"new_rel_path={new_rel_path}" )
|
||||
|
||||
parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==bin_path.id).first()
|
||||
print( f"parent_dir for path={parent_dir}" )
|
||||
part_rel_path=""
|
||||
for dirname in new_rel_path.split("/"):
|
||||
part_rel_path += f"{dirname}"
|
||||
print( f"AddDir( {dirname} in {parent_dir} with {part_rel_path} as pfx ) ")
|
||||
new_dir=AddDir( job, dirname, parent_dir, part_rel_path, bin_path )
|
||||
parent_dir=new_dir
|
||||
part_rel_path += "/"
|
||||
|
||||
print( f"new_dir={new_dir}" )
|
||||
del_me.in_dir = new_dir
|
||||
return
|
||||
|
||||
@@ -619,6 +634,8 @@ def RemoveDirFromDB(id):
|
||||
session.query(Entry).filter(Entry.id==id).delete()
|
||||
return
|
||||
|
||||
# this routine is used when a scan finds files/dirs that have been removed
|
||||
# underneath PA, so it just deletes them form the DB
|
||||
def HandleAnyFSDeletions(job):
|
||||
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
||||
rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all()
|
||||
@@ -635,6 +652,7 @@ def HandleAnyFSDeletions(job):
|
||||
rm_cnt+=1
|
||||
return rm_cnt
|
||||
|
||||
# try several ways to work out date of file created (try exif, then filename, lastly filesystem)
|
||||
def GetDateFromFile(file, stat):
|
||||
# try exif
|
||||
try:
|
||||
@@ -684,26 +702,31 @@ def JobImportDir(job):
|
||||
return
|
||||
symlink=CreateSymlink(job,path_type,path)
|
||||
|
||||
path_obj=Path( path_prefix=symlink, num_files=0, type_id=path_type )
|
||||
# create/find the Path
|
||||
path_obj=AddPath( job, symlink, path_type )
|
||||
session.add(path_obj)
|
||||
# find all jobs waiting on me and their children, etc. and add a path_prefix jex to symlink, so we can just reference it form here on in, rather than recreate that string
|
||||
AddJexToDependantJobs(job,"path_prefix",symlink)
|
||||
ResetExistsOnFS(job, symlink)
|
||||
|
||||
# go through data once to work out file_cnt so progress bar works from first import
|
||||
walk=os.walk(path, topdown=True)
|
||||
ftree=list(walk)
|
||||
|
||||
# go through data once to work out file_cnt so progress bar works from first import
|
||||
overall_file_cnt=0
|
||||
for root, subdirs, files in ftree:
|
||||
overall_file_cnt+= len(subdirs) + len(files)
|
||||
path_obj.num_files=overall_file_cnt
|
||||
|
||||
parent_dir=None
|
||||
# rel_path is always '' at the top of the path objects path_prefix for the first dir
|
||||
dir=AddDir(job, os.path.basename(symlink), parent_dir, '', path_obj)
|
||||
# this is needed so that later on, when we AddDir for any dirs we find via os.walk, they have a parent dir object that is the dir for the path we are in
|
||||
parent_dir=path_obj.dir_obj
|
||||
|
||||
# first time through we need dir to be the top level, we have a special if below to no recreate the top dir that AddPath created already
|
||||
dir=parent_dir
|
||||
# session.add in case we already have imported this dir (as AddDir wont) & now we might have diff num of files to last time,
|
||||
session.add(dir)
|
||||
session.add(parent_dir)
|
||||
|
||||
# if we set / then commit this now, the web page will know how many files
|
||||
# to process as we then do the slow job of processing them
|
||||
job.num_files=overall_file_cnt
|
||||
AddLogForJob(job, f"Found {overall_file_cnt} file(s) to process")
|
||||
session.commit()
|
||||
@@ -714,7 +737,6 @@ def JobImportDir(job):
|
||||
if root != path:
|
||||
pp=SymlinkName( path_obj.type.name, path, root )+'/'+os.path.basename(root)
|
||||
rel_path=pp.replace(symlink+'/','')
|
||||
print( f"pp={pp}, root={root}, symlink={symlink}, rel_path={rel_path}" )
|
||||
dir=AddDir(job, os.path.basename(root), parent_dir, rel_path, path_obj)
|
||||
for basename in files:
|
||||
# commit every 100 files to see progress being made but not hammer the database
|
||||
@@ -909,7 +931,6 @@ def JobGetFileDetails(job):
|
||||
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
||||
path_prefix=[jex.value for jex in job.extra if jex.name == "path_prefix"][0]
|
||||
print( f"JobGetFileDetails({job}) -- pp={path_prefix}" )
|
||||
# path=SymlinkName( path_prefix, path, path )
|
||||
if DEBUG==1:
|
||||
print("DEBUG: JobGetFileDetails for path={}".format( path_prefix ) )
|
||||
p=session.query(Path).filter(Path.path_prefix==path_prefix).first()
|
||||
@@ -997,7 +1018,7 @@ def GenVideoThumbnail(job, file):
|
||||
def CheckForDups(job):
|
||||
AddLogForJob( job, f"Check for duplicates" )
|
||||
|
||||
res = session.execute( f"select count(e1.name) as count from entry e1, file f1, dir d1, entry_dir_link edl1, entry e2, file f2, dir d2, entry_dir_link edl2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and f1.hash = f2.hash and e1.id != e2.id" )
|
||||
res = session.execute( "select count(e1.id) from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb" )
|
||||
for row in res:
|
||||
if row.count > 0:
|
||||
AddLogForJob(job, f"Found duplicates, Creating Status message in front-end for attention")
|
||||
|
||||
Reference in New Issue
Block a user