Path in the DB is complete, still have hacks around displaying folders (hardcoded path name)
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
|
||||
#
|
||||
# This file controls the 'external' job control manager, that (periodically #
|
||||
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
||||
@@ -11,9 +10,10 @@
|
||||
#
|
||||
###
|
||||
|
||||
### SQLALCHEMY IMPORTS ###
|
||||
# pylint: disable=no-member
|
||||
|
||||
### SQLALCHEMY IMPORTS ###
|
||||
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
@@ -28,6 +28,8 @@ from sqlalchemy.orm import scoped_session
|
||||
|
||||
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName
|
||||
from datetime import datetime, timedelta, date
|
||||
|
||||
### PYTHON LIB IMPORTS ###
|
||||
import pytz
|
||||
import time
|
||||
import os
|
||||
@@ -47,10 +49,12 @@ import re
|
||||
import sys
|
||||
|
||||
|
||||
DEBUG=0
|
||||
# global debug setting
|
||||
DEBUG=1
|
||||
# this is required to handle the duplicate processing code
|
||||
sys.setrecursionlimit(50000)
|
||||
|
||||
# an Manager, which the Session will use for connection resources
|
||||
# a Manager, which the Session will use for connection resources
|
||||
some_engine = create_engine(DB_URL)
|
||||
|
||||
# create a configured "Session" class
|
||||
@@ -69,6 +73,14 @@ Base = declarative_base()
|
||||
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
||||
# This has to match one-for-one the DB table
|
||||
################################################################################
|
||||
class PathDirLink(Base):
|
||||
__tablename__ = "path_dir_link"
|
||||
path_id = Column(Integer, ForeignKey("path.id"), primary_key=True )
|
||||
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
|
||||
|
||||
def __repr__(self):
|
||||
return f"<path_id: {self.path_id}, dir_eid: {self.dir_eid}>"
|
||||
|
||||
class EntryDirLink(Base):
|
||||
__tablename__ = "entry_dir_link"
|
||||
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
||||
@@ -77,19 +89,25 @@ class EntryDirLink(Base):
|
||||
def __repr__(self):
|
||||
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
|
||||
|
||||
class Path(Base):
|
||||
__tablename__ = "path"
|
||||
id = Column(Integer, Sequence('path_id_seq'), primary_key=True )
|
||||
path_prefix = Column(String, unique=True, nullable=False )
|
||||
num_files = Column(Integer)
|
||||
|
||||
class Dir(Base):
|
||||
__tablename__ = "dir"
|
||||
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
||||
path_prefix = Column(String, unique=True, nullable=False )
|
||||
num_files = Column(Integer)
|
||||
rel_path = Column(String, unique=True, nullable=False )
|
||||
in_path = relationship("Path", secondary="path_dir_link", uselist=False)
|
||||
last_import_date = Column(Float)
|
||||
files = relationship("Entry", secondary="entry_dir_link")
|
||||
|
||||
def PathOnFS(self):
|
||||
return self.path_prefix
|
||||
return self.in_path.path_prefix+'/'+self.rel_path
|
||||
|
||||
def __repr__(self):
|
||||
return f"<eid: {self.eid}, path_prefix: {self.path_prefix}, num_files: {self.num_files}, last_import_date: {self.last_import_date}, files: {self.files}>"
|
||||
return f"<eid: {self.eid}, last_import_date: {self.last_import_date}, files: {self.files}>"
|
||||
|
||||
class Entry(Base):
|
||||
__tablename__ = "entry"
|
||||
@@ -103,7 +121,12 @@ class Entry(Base):
|
||||
in_dir = relationship ("Dir", secondary="entry_dir_link", uselist=False )
|
||||
|
||||
def FullPathOnFS(self):
|
||||
return self.in_dir.path_prefix + '/' + self.name
|
||||
print( f"(FullPathOnFS: pp={self.in_dir.in_path.path_prefix}, rp={self.in_dir.rel_path}, n={self.name}" )
|
||||
s=self.in_dir.in_path.path_prefix + '/'
|
||||
if len(self.in_dir.rel_path) > 0:
|
||||
s += self.in_dir.rel_path + '/'
|
||||
s += self.name
|
||||
return s
|
||||
|
||||
def __repr__(self):
|
||||
return f"<id: {self.id}, name: {self.name}, type={self.type}, exists_on_fs={self.exists_on_fs}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}>"
|
||||
@@ -266,10 +289,10 @@ def JobsForPaths( parent_job, paths ):
|
||||
now=datetime.now(pytz.utc)
|
||||
# make new set of Jobs per path... HandleJobs will make them run later
|
||||
for path in paths:
|
||||
d=session.query(Dir).filter(Dir.path_prefix==SymlinkName(path,path+'/')).first()
|
||||
p=session.query(Path).filter(Path.path_prefix==SymlinkName(path,path+'/')).first()
|
||||
cfn=0
|
||||
if d:
|
||||
cfn=d.num_files
|
||||
if p:
|
||||
cfn=p.num_files
|
||||
|
||||
jex=JobExtra( name="path", value=path )
|
||||
job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=cfn )
|
||||
@@ -423,6 +446,8 @@ def JobForceScan(job):
|
||||
JobProgressState( job, "In Progress" )
|
||||
session.query(FileRefimgLink).delete()
|
||||
session.query(EntryDirLink).delete()
|
||||
session.query(PathDirLink).delete()
|
||||
session.query(Path).delete()
|
||||
session.query(Dir).delete()
|
||||
session.query(File).delete()
|
||||
session.query(Entry).delete()
|
||||
@@ -441,13 +466,13 @@ def CreateSymlink(job,path):
|
||||
os.symlink(path, symlink)
|
||||
return symlink
|
||||
|
||||
def AddDir(job, dirname, path_prefix, in_dir):
|
||||
dir=session.query(Dir).filter(Dir.path_prefix==path_prefix).first()
|
||||
def AddDir(job, dirname, in_dir, rel_path, in_path ):
|
||||
dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==in_path.id).filter(Dir.rel_path==rel_path).first()
|
||||
if dir:
|
||||
e=session.query(Entry).get(dir.eid)
|
||||
e.exists_on_fs=True
|
||||
return dir
|
||||
dir=Dir( path_prefix=path_prefix, num_files=0, last_import_date=0 )
|
||||
dir=Dir( last_import_date=0, rel_path=rel_path, in_path=in_path )
|
||||
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
||||
e=Entry( name=dirname, type=dtype, exists_on_fs=True )
|
||||
e.dir_details=dir
|
||||
@@ -455,7 +480,7 @@ def AddDir(job, dirname, path_prefix, in_dir):
|
||||
if in_dir:
|
||||
e.in_dir=in_dir
|
||||
if DEBUG==1:
|
||||
print(f"AddDir: created d={dirname}, pp={path_prefix}")
|
||||
print(f"AddDir: created d={dirname}, rp={rel_path}")
|
||||
AddLogForJob(job, f"DEBUG: Process new dir: {dirname}")
|
||||
session.add(e)
|
||||
return dir
|
||||
@@ -477,7 +502,7 @@ def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ):
|
||||
# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that
|
||||
# is still false, has been deleted
|
||||
def ResetExistsOnFS(job, path):
|
||||
reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).filter(Dir.path_prefix.ilike(path+'%')).all()
|
||||
reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).all()
|
||||
for reset_dir in reset_dirs:
|
||||
reset_dir.exists_on_fs=False
|
||||
session.add(reset_dir)
|
||||
@@ -558,6 +583,9 @@ def JobImportDir(job):
|
||||
FinishJob( job, "Finished Importing: {} -- Path does not exist".format( path), "Failed" )
|
||||
return
|
||||
symlink=CreateSymlink(job,path)
|
||||
|
||||
path_obj=Path( path_prefix=symlink, num_files=0 )
|
||||
session.add(path_obj)
|
||||
ResetExistsOnFS(job, symlink)
|
||||
|
||||
walk=os.walk(path, topdown=True)
|
||||
@@ -567,10 +595,11 @@ def JobImportDir(job):
|
||||
overall_file_cnt=0
|
||||
for root, subdirs, files in ftree:
|
||||
overall_file_cnt+= len(subdirs) + len(files)
|
||||
path_obj.num_files=overall_file_cnt
|
||||
|
||||
parent_dir=None
|
||||
dir=AddDir(job, os.path.basename(symlink), symlink, parent_dir)
|
||||
dir.num_files=overall_file_cnt
|
||||
# rel_path is always '' at the top of the path objects path_prefix for the first dir
|
||||
dir=AddDir(job, os.path.basename(symlink), parent_dir, '', path_obj)
|
||||
# session.add in case we already have imported this dir (as AddDir wont) & now we might have diff num of files to last time,
|
||||
session.add(dir)
|
||||
job.num_files=overall_file_cnt
|
||||
@@ -582,7 +611,9 @@ def JobImportDir(job):
|
||||
# already create root above to work out num_files for whole os.walk
|
||||
if root != path:
|
||||
pp=SymlinkName( path, root )+'/'+os.path.basename(root)
|
||||
dir=AddDir(job, os.path.basename(root), pp, parent_dir)
|
||||
print( F"pp={pp}, root={root}, symlink={symlink}" )
|
||||
rel_path=pp.replace(symlink+'/','')
|
||||
dir=AddDir(job, os.path.basename(root), parent_dir, rel_path, path_obj)
|
||||
for basename in files:
|
||||
# commit every 100 files to see progress being made but not hammer the database
|
||||
if job.current_file_num % 100 == 0:
|
||||
@@ -612,8 +643,6 @@ def JobImportDir(job):
|
||||
job.current_file_num+=1
|
||||
job.current_file_num += len(subdirs)
|
||||
dir.last_import_date = time.time()
|
||||
if parent_dir != None:
|
||||
dir.num_files=len(files)+len(subdirs)
|
||||
parent_dir=dir
|
||||
job.num_files=overall_file_cnt
|
||||
|
||||
@@ -623,7 +652,7 @@ def JobImportDir(job):
|
||||
return
|
||||
|
||||
def RunFuncOnFilesInPath( job, path, file_func ):
|
||||
d=session.query(Dir).filter(Dir.path_prefix==path).first()
|
||||
d = session.query(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).filter(Dir.rel_path=='').first()
|
||||
for e in d.files:
|
||||
ProcessFilesInDir(job, e, file_func)
|
||||
|
||||
@@ -631,8 +660,8 @@ def RunFuncOnFilesInPath( job, path, file_func ):
|
||||
def JobProcessAI(job):
|
||||
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
||||
path = SymlinkName(path, '/')
|
||||
d=session.query(Dir).filter(Dir.path_prefix==path).first()
|
||||
job.num_files=d.num_files
|
||||
p = session.query(Path).filter(Path.path_prefix==path).first()
|
||||
job.num_files=p.num_files
|
||||
|
||||
people = session.query(Person).all()
|
||||
for person in people:
|
||||
@@ -777,9 +806,9 @@ def JobGetFileDetails(job):
|
||||
path=SymlinkName( path, path )
|
||||
if DEBUG==1:
|
||||
print("DEBUG: JobGetFileDetails for path={}".format( path ) )
|
||||
dir=session.query(Dir).filter(Dir.path_prefix==path).first()
|
||||
p=session.query(Path).filter(Path.path_prefix==path).first()
|
||||
job.current_file_num = 0
|
||||
job.num_files = dir.num_files
|
||||
job.num_files = p.num_files
|
||||
session.commit()
|
||||
RunFuncOnFilesInPath( job, path, GenHashAndThumb )
|
||||
FinishJob(job, "File Details job finished")
|
||||
@@ -875,8 +904,7 @@ def CheckForDups(job):
|
||||
def RemoveFileFromFS( del_me ):
|
||||
try:
|
||||
settings = session.query(Settings).first()
|
||||
m=re.search( r'^static/(.+)', del_me.in_dir.path_prefix)
|
||||
dst_dir=settings.recycle_bin_path + m[1] + '/'
|
||||
dst_dir=settings.recycle_bin_path + '/' + del_me.in_dir.in_path.path_prefix.replace('static/','') + '/' + del_me.in_dir.rel_path + '/'
|
||||
os.makedirs( dst_dir,mode=0o777, exist_ok=True )
|
||||
src=del_me.FullPathOnFS()
|
||||
dst=dst_dir + '/' + del_me.name
|
||||
|
||||
Reference in New Issue
Block a user