fixed BUG-21: file stucture broken after rescan, and beginnings of new job for checking for duplicate files, and having the back-end job indicate to the front-end that there are duplicates, and the basic route is in the f/e, but not built yet
This commit is contained in:
@@ -233,8 +233,9 @@ def MessageToFE( job_id, alert, message ):
|
||||
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
||||
session.add(msg)
|
||||
session.commit()
|
||||
return
|
||||
|
||||
def ProcessImportDirs(parent_job=None):
|
||||
def ProcessImportDirs(parent_job):
|
||||
settings = session.query(Settings).first()
|
||||
if settings == None:
|
||||
raise Exception("Cannot create file data with no settings / import path is missing")
|
||||
@@ -263,15 +264,23 @@ def ProcessImportDirs(parent_job=None):
|
||||
session.commit()
|
||||
if parent_job:
|
||||
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) )
|
||||
"""
|
||||
|
||||
jex3=JobExtra( name="path", value=path )
|
||||
job3=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
|
||||
job3=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
|
||||
job3.extra.append(jex3)
|
||||
session.add(job3)
|
||||
session.commit()
|
||||
if parent_job:
|
||||
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
|
||||
"""
|
||||
jex4=JobExtra( name="path", value=path )
|
||||
job4=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
|
||||
job4.extra.append(jex4)
|
||||
session.add(job4)
|
||||
session.commit()
|
||||
if parent_job:
|
||||
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
|
||||
"""
|
||||
HandleJobs()
|
||||
return
|
||||
|
||||
@@ -298,6 +307,8 @@ def RunJob(job):
|
||||
JobImportDir(job)
|
||||
elif job.name =="getfiledetails":
|
||||
JobGetFileDetails(job)
|
||||
elif job.name == "checkdups":
|
||||
CheckForDups(job)
|
||||
elif job.name == "processai":
|
||||
JobProcessAI(job)
|
||||
else:
|
||||
@@ -429,6 +440,7 @@ def AddDir(job, dirname, path_prefix, in_dir):
|
||||
def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ):
|
||||
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first()
|
||||
if e:
|
||||
print( f"################################################ FILE EXISTS ALREADY: {fname} -- {in_dir.path_prefix} {e}" )
|
||||
e.exists_on_fs=True
|
||||
return e
|
||||
ftype = session.query(FileType).filter(FileType.name==type_str).first()
|
||||
@@ -503,7 +515,6 @@ def GetDateFromFile(file, stat):
|
||||
year, month, day, _, _, _, _, _, _ = datetime.fromtimestamp(stat.st_ctime).timetuple()
|
||||
c=date(year, month, day).isocalendar()
|
||||
woy=c[1]
|
||||
print(f"DEL ME: year={year}, month={month}, day={day}")
|
||||
return year, month, day, woy
|
||||
|
||||
|
||||
@@ -536,7 +547,6 @@ def JobImportDir(job):
|
||||
root=root[0:-1]
|
||||
|
||||
dir=AddDir(job, os.path.basename(root), pp, parent_dir)
|
||||
parent_dir=dir
|
||||
for basename in files:
|
||||
# commit every 100 files to see progress being made but not hammer the database
|
||||
if job.current_file_num % 100 == 0:
|
||||
@@ -559,15 +569,15 @@ def JobImportDir(job):
|
||||
year, month, day, woy = GetDateFromFile(fname, stat)
|
||||
e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy )
|
||||
else:
|
||||
e=session.query(Entry).filter(Entry.name==basename).first()
|
||||
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==basename,Dir.eid==dir.eid).first()
|
||||
e.exists_on_fs=True
|
||||
if DEBUG==1:
|
||||
print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename )
|
||||
job.current_file=basename
|
||||
job.current_file_num+=1
|
||||
|
||||
dir.num_files=len(files)+len(subdirs)
|
||||
dir.last_import_date = time.time()
|
||||
parent_dir=dir
|
||||
job.num_files=overall_file_cnt
|
||||
job.current_file_num=overall_file_cnt
|
||||
|
||||
@@ -814,9 +824,24 @@ def GenVideoThumbnail(job, file):
|
||||
return None
|
||||
return thumbnail
|
||||
|
||||
def CheckForDups(job):
|
||||
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
||||
path='static'+'/'+os.path.basename(path[0:-1])
|
||||
AddLogForJob( job, f"Check for duplicates in import path: {path}" )
|
||||
res = session.execute( f"select count(e1.name) as count from entry e1, file f1, dir d1, entry_dir_link edl1, entry e2, file f2, dir d2, entry_dir_link edl2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and d1.path_prefix like '%{path}%' and f1.hash = f2.hash and e1.id != e2.id" )
|
||||
for row in res:
|
||||
if row.count > 0:
|
||||
MessageToFE( job.id, "danger", "Found duplicate(s), click <a href='/fix_dups'>here</a> to finalise import by removing duplicates" )
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
|
||||
ProcessImportDirs()
|
||||
|
||||
##### have to test the the lines below (to force a scan on startup)
|
||||
now=datetime.now(pytz.utc)
|
||||
job=Job(start_time=now, last_update=now, name="scannow", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=0 )
|
||||
session.add(job)
|
||||
session.commit()
|
||||
HandleJobs()
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
||||
s.listen()
|
||||
|
||||
Reference in New Issue
Block a user