1154 lines
48 KiB
Python
1154 lines
48 KiB
Python
###
|
|
#
|
|
# This file controls the 'external' job control manager, that (periodically #
|
|
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
|
#
|
|
# It then stores the progress/status, etc. in job and joblog tables as needed
|
|
# via wrapper functions.
|
|
#
|
|
# The whole pa_job_manager is multi-threaded, and uses the database tables for
|
|
# state management and communication back to the pa web site
|
|
#
|
|
###
|
|
|
|
# pylint: disable=no-member
|
|
|
|
|
|
# global debug setting
|
|
DEBUG=1
|
|
|
|
### SQLALCHEMY IMPORTS ###
|
|
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.orm import scoped_session
|
|
|
|
### SQLALCHEMY IMPORTS ###
|
|
|
|
### LOCAL FILE IMPORTS ###
|
|
|
|
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName
|
|
from datetime import datetime, timedelta, date
|
|
|
|
### PYTHON LIB IMPORTS ###
|
|
import pytz
|
|
import time
|
|
import os
|
|
import glob
|
|
from PIL import Image, ImageOps
|
|
from pymediainfo import MediaInfo
|
|
import hashlib
|
|
import exifread
|
|
import base64
|
|
import numpy
|
|
import cv2
|
|
import socket
|
|
import threading
|
|
import io
|
|
import face_recognition
|
|
import re
|
|
import sys
|
|
|
|
|
|
# this is required to handle the duplicate processing code
|
|
sys.setrecursionlimit(50000)
|
|
|
|
# a Manager, which the Session will use for connection resources
|
|
some_engine = create_engine(DB_URL)
|
|
|
|
# create a configured "Session" class
|
|
#Session = sessionmaker(bind=some_engine)
|
|
|
|
# create a Session
|
|
|
|
session_factory = sessionmaker(bind=some_engine)
|
|
Session = scoped_session(session_factory)
|
|
session = Session()
|
|
|
|
Base = declarative_base()
|
|
|
|
|
|
################################################################################
|
|
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
|
|
# This has to match one-for-one the DB table
|
|
################################################################################
|
|
class PathType(Base):
|
|
__tablename__ = "path_type"
|
|
id = Column(Integer, Sequence('path_type_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}"
|
|
|
|
|
|
class PathDirLink(Base):
|
|
__tablename__ = "path_dir_link"
|
|
path_id = Column(Integer, ForeignKey("path.id"), primary_key=True )
|
|
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<path_id: {self.path_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
class EntryDirLink(Base):
|
|
__tablename__ = "entry_dir_link"
|
|
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
class Path(Base):
|
|
__tablename__ = "path"
|
|
id = Column(Integer, Sequence('path_id_seq'), primary_key=True )
|
|
type_id = Column(Integer, ForeignKey("path_type.id"))
|
|
type = relationship("PathType")
|
|
path_prefix = Column(String, unique=True, nullable=False )
|
|
num_files = Column(Integer)
|
|
dir_obj = relationship("Dir", secondary="path_dir_link", uselist=False)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, type={self.type}, path_prefix={self.path_prefix}, dir_obj={self.dir_obj}>"
|
|
|
|
class Dir(Base):
|
|
__tablename__ = "dir"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
rel_path = Column(String, unique=True, nullable=False )
|
|
in_path = relationship("Path", secondary="path_dir_link", uselist=False)
|
|
last_import_date = Column(Float)
|
|
files = relationship("Entry", secondary="entry_dir_link")
|
|
|
|
def PathOnFS(self):
|
|
return self.in_path.path_prefix+'/'+self.rel_path
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, last_import_date: {self.last_import_date}, files: {self.files}>"
|
|
|
|
class Entry(Base):
|
|
__tablename__ = "entry"
|
|
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
|
name = Column(String, unique=False, nullable=False )
|
|
type_id = Column(Integer, ForeignKey("file_type.id"))
|
|
exists_on_fs=Column(Boolean)
|
|
type=relationship("FileType")
|
|
dir_details = relationship( "Dir", uselist=False )
|
|
file_details = relationship( "File", uselist=False )
|
|
in_dir = relationship ("Dir", secondary="entry_dir_link", uselist=False )
|
|
|
|
def FullPathOnFS(self):
|
|
s=self.in_dir.in_path.path_prefix + '/'
|
|
if len(self.in_dir.rel_path) > 0:
|
|
s += self.in_dir.rel_path + '/'
|
|
s += self.name
|
|
return s
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name: {self.name}, type={self.type}, exists_on_fs={self.exists_on_fs}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}>"
|
|
|
|
class FileRefimgLink(Base):
|
|
__tablename__ = "file_refimg_link"
|
|
file_id = Column(Integer, ForeignKey('file.eid'), unique=True, nullable=False, primary_key=True)
|
|
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
|
|
when_processed = Column(Float)
|
|
matched = Column(Boolean)
|
|
|
|
def __repr__(self):
|
|
return f"<file_id: {self.file_id}, refimg_id: {self.refimg_id} when_processed={self.when_processed}, matched={self.matched}"
|
|
|
|
class File(Base):
|
|
__tablename__ = "file"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
size_mb = Column(Integer, unique=False, nullable=False)
|
|
hash = Column(Integer, unique=True, nullable=True)
|
|
thumbnail = Column(String, unique=False, nullable=True)
|
|
year = Column(Integer)
|
|
month = Column(Integer)
|
|
day = Column(Integer)
|
|
woy = Column(Integer)
|
|
last_hash_date = Column(Float)
|
|
faces = Column( LargeBinary )
|
|
faces_created_on = Column(Float)
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, last_hash_date: {self.last_hash_date}>"
|
|
|
|
class FileType(Base):
|
|
__tablename__ = "file_type"
|
|
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}>"
|
|
|
|
class Settings(Base):
|
|
__tablename__ = "settings"
|
|
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
|
|
import_path = Column(String)
|
|
storage_path = Column(String)
|
|
recycle_bin_path = Column(String)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, import_path: {self.import_path}, recycle_bin_path: {self.recycle_bin_path}>"
|
|
|
|
class PersonRefimgLink(Base):
|
|
__tablename__ = "person_refimg_link"
|
|
person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True)
|
|
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
|
|
|
|
def __repr__(self):
|
|
return f"<person_id: {self.person_id}, refimg_id: {self.refimg_id}>"
|
|
|
|
class Person(Base):
|
|
__tablename__ = "person"
|
|
id = Column(Integer, Sequence('person_id_seq'), primary_key=True )
|
|
tag = Column(String(48), unique=False, nullable=False)
|
|
surname = Column(String(48), unique=False, nullable=False)
|
|
firstname = Column(String(48), unique=False, nullable=False)
|
|
refimg = relationship('Refimg', secondary=PersonRefimgLink.__table__)
|
|
|
|
def __repr__(self):
|
|
return f"<tag: {self.tag}, firstname: {self.firstname}, surname: {self.surname}, refimg: {self.refimg}>"
|
|
|
|
class Refimg(Base):
|
|
__tablename__ = "refimg"
|
|
id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True )
|
|
fname = Column(String(256), unique=True, nullable=False)
|
|
encodings = Column(LargeBinary)
|
|
created_on = Column(Float)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, fname: {self.fname}, created_on: {self.created_on}, encodings: {self.encodings}>"
|
|
|
|
|
|
|
|
################################################################################
|
|
# classes for the job manager:
|
|
# Job (and Joblog, JobExtra) for each Job, and
|
|
# PA_Jobmanager_fe_message (to pass messages to the front-end web)
|
|
################################################################################
|
|
class Joblog(Base):
|
|
__tablename__ = "joblog"
|
|
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
log_date = Column(DateTime(timezone=True))
|
|
log = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
|
|
|
|
class JobExtra(Base):
|
|
__tablename__ = "jobextra"
|
|
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
name = Column(String)
|
|
value = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
|
|
|
|
class Job(Base):
|
|
__tablename__ = "job"
|
|
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
|
|
start_time = Column(DateTime(timezone=True))
|
|
last_update = Column(DateTime(timezone=True))
|
|
name = Column(String)
|
|
state = Column(String)
|
|
num_files = Column(Integer)
|
|
current_file_num = Column(Integer)
|
|
current_file = Column(String)
|
|
wait_for = Column(Integer)
|
|
pa_job_state = Column(String)
|
|
|
|
logs = relationship( "Joblog")
|
|
extra = relationship( "JobExtra")
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
|
|
|
|
class PA_JobManager_FE_Message(Base):
|
|
__tablename__ = "pa_job_manager_fe_message"
|
|
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
alert = Column(String)
|
|
message = Column(String)
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
|
|
|
##############################################################################
|
|
# Util (non Class) functions
|
|
##############################################################################
|
|
def MessageToFE( job_id, alert, message ):
|
|
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
|
session.add(msg)
|
|
session.commit()
|
|
return msg.id
|
|
|
|
def ProcessRecycleBinDir(parent_job):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / recycle bin path is missing")
|
|
paths = settings.recycle_bin_path.split("#")
|
|
ptype = session.query(PathType).filter(PathType.name=='Bin').first().id
|
|
JobsForPaths( parent_job, paths, ptype )
|
|
|
|
def ProcessStorageDirs(parent_job):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / storage path is missing")
|
|
paths = settings.storage_path.split("#")
|
|
ptype = session.query(PathType).filter(PathType.name=='Storage').first().id
|
|
JobsForPaths( parent_job, paths, ptype )
|
|
return
|
|
|
|
def ProcessImportDirs(parent_job):
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
raise Exception("Cannot create file data with no settings / import path is missing")
|
|
paths = settings.import_path.split("#")
|
|
ptype = session.query(PathType).filter(PathType.name=='Import').first().id
|
|
JobsForPaths( parent_job, paths, ptype )
|
|
return
|
|
|
|
def JobsForPaths( parent_job, paths, ptype ):
|
|
now=datetime.now(pytz.utc)
|
|
# make new set of Jobs per path... HandleJobs will make them run later
|
|
path_type = session.query(PathType).get(ptype)
|
|
for path in paths:
|
|
p=session.query(Path).filter(Path.path_prefix==SymlinkName(path_type.name,path,path+'/')).first()
|
|
cfn=0
|
|
if p:
|
|
cfn=p.num_files
|
|
|
|
jex=JobExtra( name="path", value=path )
|
|
jex2=JobExtra( name="path_type", value=ptype )
|
|
job=Job(start_time=now, last_update=now, name="importdir", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=cfn )
|
|
job.extra.append(jex)
|
|
job.extra.append(jex2)
|
|
session.add(job)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a>".format( job.id, job.id, job.name ) )
|
|
# force commit to make job.id be valid in use of wait_for later
|
|
session.commit()
|
|
jex2=JobExtra( name="path", value=path )
|
|
job2=Job(start_time=now, last_update=now, name="getfiledetails", state="New", wait_for=job.id, pa_job_state="New", current_file_num=0 )
|
|
job2.extra.append(jex2)
|
|
session.add(job2)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job2.id, job2.id, job2.name, job2.wait_for ) )
|
|
|
|
"""
|
|
jex3=JobExtra( name="path", value=path )
|
|
job3=Job(start_time=now, last_update=now, name="processai", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
|
|
job3.extra.append(jex4)
|
|
session.add(job3)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job3.id, job3.id, job3.name, job3.wait_for ) )
|
|
"""
|
|
job4=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=job2.id, pa_job_state="New", current_file_num=0 )
|
|
session.add(job4)
|
|
session.commit()
|
|
if parent_job:
|
|
AddLogForJob(parent_job, "adding <a href='/job/{}'>job id={} {}</a> (wait for: {})".format( job4.id, job4.id, job4.name, job4.wait_for ) )
|
|
HandleJobs()
|
|
return
|
|
|
|
def ProcessFileForJob(job, message, current_file):
|
|
job.current_file=os.path.basename(current_file)
|
|
if job.num_files:
|
|
job.current_file_num=job.current_file_num+1
|
|
AddLogForJob(job, message )
|
|
return
|
|
|
|
def AddLogForJob(job, message):
|
|
now=datetime.now(pytz.utc)
|
|
log=Joblog( job_id=job.id, log=message, log_date=now )
|
|
job.last_update=now
|
|
session.add(log)
|
|
return
|
|
|
|
def RunJob(job):
|
|
# session = Session()
|
|
job.start_time=datetime.now(pytz.utc)
|
|
if job.name =="scannow":
|
|
JobScanNow(job)
|
|
elif job.name =="forcescan":
|
|
JobForceScan(job)
|
|
elif job.name =="scan_sp":
|
|
JobScanStorageDir(job)
|
|
elif job.name =="importdir":
|
|
JobImportDir(job)
|
|
elif job.name =="getfiledetails":
|
|
JobGetFileDetails(job)
|
|
elif job.name == "checkdups":
|
|
CheckForDups(job)
|
|
elif job.name == "rmdups":
|
|
RemoveDups(job)
|
|
elif job.name == "processai":
|
|
JobProcessAI(job)
|
|
else:
|
|
print("ERROR: Requested to process unknown job type: {}".format(job.name))
|
|
# okay, we finished a job, so check for any jobs that are dependant on this and run them...
|
|
# session.close()
|
|
if job.pa_job_state != "Completed":
|
|
FinishJob(job, "PA Job Manager - This is a catchall to close of a Job, this should never be seen and implies a job did not actually complete?", "Failed" )
|
|
HandleJobs()
|
|
return
|
|
|
|
def CancelJob(job,id):
|
|
for j in session.query(Job).filter(Job.wait_for==id).all():
|
|
if DEBUG==1:
|
|
print("DEBUG: cancelling job: {} as it was waiting for this failed job: {}".format(j.id, job.id) )
|
|
FinishJob(j, "Job has been withdrawn as the job being waited for failed", "Withdrawn" )
|
|
CancelJob(j, j.id)
|
|
return
|
|
|
|
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
|
job.state=state
|
|
job.pa_job_state=pa_job_state
|
|
job.last_update=datetime.now(pytz.utc)
|
|
AddLogForJob(job, last_log)
|
|
if job.state=="Failed":
|
|
CancelJob(job,job.id)
|
|
session.commit()
|
|
return
|
|
|
|
def HandleJobs():
|
|
if DEBUG==1:
|
|
print("INFO: PA job manager is scanning for new jobs to process")
|
|
for job in session.query(Job).all():
|
|
if job.pa_job_state == 'New':
|
|
if job.wait_for != None:
|
|
j2 = session.query(Job).get(job.wait_for)
|
|
if not j2:
|
|
print ("ERROR: job.wait_for ({}) does not exist in below? ".format( job.wait_for ))
|
|
for j in session.query(Job).all():
|
|
print ("ERROR: j={}".format(j.id))
|
|
continue
|
|
if j2.pa_job_state != 'Completed':
|
|
continue
|
|
|
|
# use this to remove threads for easier debugging, and errors will stacktrace to the console
|
|
if DEBUG==1:
|
|
print("*************************************")
|
|
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
|
|
RunJob(job)
|
|
else:
|
|
try:
|
|
RunJob(job)
|
|
# threading.Thread(target=RunJob, args=(job,)).start()
|
|
except Exception as e:
|
|
try:
|
|
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
|
|
except Exception as e2:
|
|
print("ERROR: Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
|
print("INFO: PA job manager is waiting for a job")
|
|
return
|
|
|
|
def JobProgressState( job, state ):
|
|
job.pa_job_state = "In Progress"
|
|
job.state=state
|
|
session.commit()
|
|
return
|
|
|
|
def JobScanNow(job):
|
|
JobProgressState( job, "In Progress" )
|
|
ProcessImportDirs(job)
|
|
FinishJob( job, "Completed (scan for new files)" )
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
session.commit()
|
|
return
|
|
|
|
def JobScanStorageDir(job):
|
|
JobProgressState( job, "In Progress" )
|
|
ProcessStorageDirs(job)
|
|
FinishJob( job, "Completed (scan for new files)" )
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
session.commit()
|
|
return
|
|
|
|
def JobForceScan(job):
|
|
JobProgressState( job, "In Progress" )
|
|
session.query(FileRefimgLink).delete()
|
|
session.query(EntryDirLink).delete()
|
|
session.query(PathDirLink).delete()
|
|
session.query(Path).delete()
|
|
session.query(Dir).delete()
|
|
session.query(File).delete()
|
|
session.query(Entry).delete()
|
|
session.commit()
|
|
ProcessRecycleBinDir(job)
|
|
ProcessImportDirs(job)
|
|
ProcessStorageDirs(job)
|
|
FinishJob(job, "Completed (forced remove and recreation of all file data)")
|
|
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
|
|
session.commit()
|
|
return
|
|
|
|
# to serve static content of the images, we create a symlink from inside the static subdir of each import_path that exists
|
|
def CreateSymlink(job,ptype,path):
|
|
path_type = session.query(PathType).get(ptype)
|
|
symlink=SymlinkName(path_type.name, path, path)
|
|
if not os.path.exists(symlink):
|
|
print( f"INFO: symlink does not exist, actually creating it -- s={symlink}" )
|
|
os.makedirs( os.path.dirname(symlink), mode=0o777, exist_ok=True )
|
|
os.symlink(path, symlink)
|
|
return symlink
|
|
|
|
# function to create or return a Path object basedon the path prefix (pp) and the type (the type id of Import, Storage, Bin)
|
|
# if the Path is created, it also creates the Dir object (which in turn creates the Entry object)
|
|
def AddPath(job, pp, type ):
|
|
path_obj=session.query(Path).filter(Path.path_prefix==pp).first()
|
|
if not path_obj:
|
|
path_obj=Path( path_prefix=pp, num_files=0, type_id=type )
|
|
# if there is no path yet, then there is no Dir for it, so create that too (which creates the entry, etc.)
|
|
# Dir name is os.path.basename(pp), is not in another Dir (None), and its relative path is "", inside the path_obj we just created
|
|
dir=AddDir( job, os.path.basename(pp), None, "", path_obj )
|
|
session.add(path_obj)
|
|
session.add(dir)
|
|
return path_obj
|
|
|
|
|
|
################################################################################################################################################################
|
|
#
|
|
# Key function that runs as part of (usually) an import job. The name of the directory (dirname) is checked to see
|
|
# if it already is in the database (inside of in_dir in in_path). If it is,
|
|
# just return the db entry. If not, then we create a new row in Dir, that has name of dirname, has a parent directory
|
|
# of in_dir (DB object), has the rel_path set to the relative fs path from the actual fs path to this entry (including the dirname)
|
|
# and the in_path set to the overarching path (one of an Import, Storage or Recycle_bin path in the DB)
|
|
#
|
|
# e.g. path on FS: /home/ddp/src/photoassistant/images_to_process/ ... ends in DB as path_prefix="static/Import/images_to_process"
|
|
# and we have a dir in /home/ddp/src/photoassistant/images_to_process/0000/subtest, then we call:
|
|
# AddDir( job, dirname='subtest', in_dir=Dir object for '0000', rel_path='0000/subtest', in_path=Path object for 'static/Import/images_to_process' )
|
|
#
|
|
################################################################################################################################################################
|
|
def AddDir(job, dirname, in_dir, rel_path, in_path ):
|
|
dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==in_path.id).filter(Dir.rel_path==rel_path).first()
|
|
if dir:
|
|
e=session.query(Entry).get(dir.eid)
|
|
e.exists_on_fs=True
|
|
return dir
|
|
dir=Dir( last_import_date=0, rel_path=rel_path, in_path=in_path )
|
|
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
|
e=Entry( name=dirname, type=dtype, exists_on_fs=True )
|
|
e.dir_details=dir
|
|
# no in_dir occurs when we Add the actual Dir for the import_path (top of the tree)
|
|
if in_dir:
|
|
e.in_dir=in_dir
|
|
if DEBUG==1:
|
|
print(f"DEBUG: AddDir: created d={dirname}, rp={rel_path}")
|
|
AddLogForJob(job, f"DEBUG: Process new dir: {dirname}")
|
|
session.add(e)
|
|
return dir
|
|
|
|
def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ):
|
|
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first()
|
|
if e:
|
|
e.exists_on_fs=True
|
|
return e
|
|
ftype = session.query(FileType).filter(FileType.name==type_str).first()
|
|
e=Entry( name=fname, type=ftype, exists_on_fs=True )
|
|
f=File( size_mb=fsize, last_hash_date=0, faces_created_on=0, year=year, month=month, day=day, woy=woy )
|
|
e.file_details = f
|
|
e.in_dir=in_dir
|
|
AddLogForJob(job, "Found new file: {}".format(fname) )
|
|
session.add(e)
|
|
return e
|
|
|
|
# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True, anything that
|
|
# is still false, has been deleted
|
|
def ResetExistsOnFS(job, path):
|
|
reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).all()
|
|
for reset_dir in reset_dirs:
|
|
reset_dir.exists_on_fs=False
|
|
session.add(reset_dir)
|
|
reset_files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==reset_dir.id).all()
|
|
for reset_file in reset_files:
|
|
reset_file.exists_on_fs=False
|
|
session.add(reset_file)
|
|
return
|
|
|
|
# Convenience function to remove a file from the database - and its associated links
|
|
# used when scanning and a file has been removed out from under PA, or
|
|
# when we remove duplicates
|
|
def RemoveFileFromDB(id):
|
|
session.query(EntryDirLink).filter(EntryDirLink.entry_id==id).delete()
|
|
session.query(File).filter(File.eid==id).delete()
|
|
session.query(Entry).filter(Entry.id==id).delete()
|
|
return
|
|
|
|
# Actually moves the physical file from its current real directory to a subdir of the recycle bin path
|
|
def RemoveFileFromFS( del_me ):
|
|
try:
|
|
settings = session.query(Settings).first()
|
|
dst_dir=settings.recycle_bin_path + '/' + del_me.in_dir.in_path.path_prefix.replace('static/','') + '/' + del_me.in_dir.rel_path + '/'
|
|
os.makedirs( dst_dir,mode=0o777, exist_ok=True )
|
|
src=del_me.FullPathOnFS()
|
|
dst=dst_dir + '/' + del_me.name
|
|
os.replace( src, dst )
|
|
except Exception as e:
|
|
print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}")
|
|
return
|
|
|
|
# Functoin that moves a file we are "deleting" to the recycle bin, it moves the
|
|
# file on the filesystem and then changes the database path from the import or
|
|
# storage path over to the Bin path
|
|
def MoveFileToRecycleBin(job,del_me):
|
|
try:
|
|
settings = session.query(Settings).first()
|
|
dst_dir=settings.recycle_bin_path + '/' + del_me.in_dir.in_path.path_prefix.replace('static/','') + '/' + del_me.in_dir.rel_path + '/'
|
|
os.makedirs( dst_dir,mode=0o777, exist_ok=True )
|
|
src=del_me.FullPathOnFS()
|
|
dst=dst_dir + '/' + del_me.name
|
|
os.replace( src, dst )
|
|
except Exception as e:
|
|
print( f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}")
|
|
bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
|
|
new_rel_path=del_me.in_dir.in_path.path_prefix.replace('static/','')
|
|
|
|
# if there is a relative path on this dir, add it to the new_rel_path as there is only ever 1 Bin path
|
|
if len(del_me.in_dir.rel_path):
|
|
new_rel_path += '/' + del_me.in_dir.rel_path
|
|
|
|
parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==bin_path.id).first()
|
|
part_rel_path=""
|
|
for dirname in new_rel_path.split("/"):
|
|
part_rel_path += f"{dirname}"
|
|
new_dir=AddDir( job, dirname, parent_dir, part_rel_path, bin_path )
|
|
parent_dir=new_dir
|
|
part_rel_path += "/"
|
|
|
|
del_me.in_dir = new_dir
|
|
return
|
|
|
|
|
|
# Convenience function to remove a dir from the database - and its associated links
|
|
def RemoveDirFromDB(id):
|
|
session.query(EntryDirLink).filter(EntryDirLink.entry_id==id).delete()
|
|
session.query(Dir).filter(Dir.eid==id).delete()
|
|
session.query(Entry).filter(Entry.id==id).delete()
|
|
return
|
|
|
|
# this routine is used when a scan finds files/dirs that have been removed
|
|
# underneath PA, so it just deletes them form the DB
|
|
def HandleAnyFSDeletions(job):
|
|
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
|
rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all()
|
|
rm_cnt=0
|
|
for rm in rms:
|
|
RemoveFileFromDB(rm.id)
|
|
AddLogForJob( job, f"INFO: Removing {rm.name} from system as it is no longer on the file system")
|
|
rm_cnt+=1
|
|
|
|
rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==1).order_by(Entry.id.desc()).all()
|
|
for rmdir in rmdirs:
|
|
RemoveFileFromDB(rmdir.id)
|
|
AddLogForJob( job, f"INFO: Removing {rmdir.name} from system as it is no longer on the file system")
|
|
rm_cnt+=1
|
|
return rm_cnt
|
|
|
|
# try several ways to work out date of file created (try exif, then filename, lastly filesystem)
|
|
def GetDateFromFile(file, stat):
|
|
# try exif
|
|
try:
|
|
f = open(file, 'rb')
|
|
tags = exifread.process_file(f)
|
|
date_str, _ = str(tags["EXIF DateTimeOriginal"]).split(" ")
|
|
year, month, day = date_str.split(":")
|
|
year=int(year)
|
|
month=int(month)
|
|
day=int(day)
|
|
check = datetime( year, month, day )
|
|
except:
|
|
# try parsing filename
|
|
try:
|
|
m=re.search( r'(\d{4})(\d{2})(\d{2})', file)
|
|
year=int(m[1])
|
|
month=int(m[2])
|
|
day=int(m[3])
|
|
check2 = datetime( year, month, day )
|
|
# give up and use file sys date
|
|
except:
|
|
year, month, day, _, _, _, _, _, _ = datetime.fromtimestamp(stat.st_ctime).timetuple()
|
|
c=date(year, month, day).isocalendar()
|
|
woy=c[1]
|
|
return year, month, day, woy
|
|
|
|
def AddJexToDependantJobs(job,name,value):
|
|
if DEBUG==1:
|
|
print( f"DEBUG: AddJexToDependantJobs({job}, {name}, {value}) ")
|
|
for j in session.query(Job).filter(Job.wait_for==job.id).all():
|
|
print( f"DEBUG: adding jex to this job.id == {j.id}" )
|
|
jex=JobExtra( name=name, value=value )
|
|
j.extra.append(jex)
|
|
AddJexToDependantJobs(j, name, value)
|
|
return
|
|
|
|
def JobImportDir(job):
|
|
JobProgressState( job, "In Progress" )
|
|
settings = session.query(Settings).first()
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path_type=[jex.value for jex in job.extra if jex.name == "path_type"][0]
|
|
AddLogForJob(job, f"Checking {path_type} Directory: {path}" )
|
|
if DEBUG==1:
|
|
print("DEBUG: Checking Directory: {}".format( path ) )
|
|
if not os.path.exists( path ):
|
|
FinishJob( job, f"Finished Importing: {path} -- Path does not exist", "Failed" )
|
|
return
|
|
symlink=CreateSymlink(job,path_type,path)
|
|
|
|
# create/find the Path
|
|
path_obj=AddPath( job, symlink, path_type )
|
|
session.add(path_obj)
|
|
# find all jobs waiting on me and their children, etc. and add a path_prefix jex to symlink, so we can just reference it form here on in, rather than recreate that string
|
|
AddJexToDependantJobs(job,"path_prefix",symlink)
|
|
ResetExistsOnFS(job, symlink)
|
|
|
|
# go through data once to work out file_cnt so progress bar works from first import
|
|
walk=os.walk(path, topdown=True)
|
|
ftree=list(walk)
|
|
overall_file_cnt=0
|
|
for root, subdirs, files in ftree:
|
|
overall_file_cnt+= len(subdirs) + len(files)
|
|
path_obj.num_files=overall_file_cnt
|
|
|
|
# this is needed so that later on, when we AddDir for any dirs we find via os.walk, they have a parent dir object that is the dir for the path we are in
|
|
parent_dir=path_obj.dir_obj
|
|
|
|
# first time through we need dir to be the top level, we have a special if below to no recreate the top dir that AddPath created already
|
|
dir=parent_dir
|
|
# session.add in case we already have imported this dir (as AddDir wont) & now we might have diff num of files to last time,
|
|
session.add(parent_dir)
|
|
|
|
# if we set / then commit this now, the web page will know how many files
|
|
# to process as we then do the slow job of processing them
|
|
job.num_files=overall_file_cnt
|
|
AddLogForJob(job, f"Found {overall_file_cnt} file(s) to process")
|
|
session.commit()
|
|
|
|
# root == path of dir, files are in dir... subdirs are in dir
|
|
for root, subdirs, files in ftree:
|
|
# already create root above to work out num_files for whole os.walk
|
|
if root != path:
|
|
pp=SymlinkName( path_obj.type.name, path, root )+'/'+os.path.basename(root)
|
|
rel_path=pp.replace(symlink+'/','')
|
|
dir=AddDir(job, os.path.basename(root), parent_dir, rel_path, path_obj)
|
|
for basename in files:
|
|
# commit every 100 files to see progress being made but not hammer the database
|
|
if job.current_file_num % 100 == 0:
|
|
session.commit()
|
|
fname=dir.PathOnFS()+'/'+basename
|
|
|
|
stat = os.stat(fname)
|
|
if stat.st_ctime > dir.last_import_date:
|
|
if DEBUG==1:
|
|
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
|
|
if isImage(fname):
|
|
type_str = 'Image'
|
|
elif isVideo(fname):
|
|
type_str = 'Video'
|
|
else:
|
|
type_str = 'Unknown'
|
|
fsize = round(stat.st_size/(1024*1024))
|
|
|
|
year, month, day, woy = GetDateFromFile(fname, stat)
|
|
e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy )
|
|
else:
|
|
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==basename,Dir.eid==dir.eid).first()
|
|
e.exists_on_fs=True
|
|
if DEBUG==1:
|
|
print("DEBUG: {} - {} is OLDER than {}".format( basename, stat.st_ctime, dir.last_import_date ), basename )
|
|
job.current_file=basename
|
|
job.current_file_num+=1
|
|
job.current_file_num += len(subdirs)
|
|
dir.last_import_date = time.time()
|
|
parent_dir=dir
|
|
job.num_files=overall_file_cnt
|
|
|
|
rm_cnt=HandleAnyFSDeletions(job)
|
|
|
|
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Removed {rm_cnt} file(s)")
|
|
return
|
|
|
|
def RunFuncOnFilesInPath( job, path, file_func ):
|
|
d = session.query(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).filter(Dir.rel_path=='').first()
|
|
for e in d.files:
|
|
ProcessFilesInDir(job, e, file_func)
|
|
|
|
|
|
def JobProcessAI(job):
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path_prefix=[jex.value for jex in job.extra if jex.name == "path_prefix"][0]
|
|
path = SymlinkName(path_prefix, path, '/')
|
|
p = session.query(Path).filter(Path.path_prefix==path).first()
|
|
job.num_files=p.num_files
|
|
|
|
people = session.query(Person).all()
|
|
for person in people:
|
|
generateKnownEncodings(person)
|
|
|
|
RunFuncOnFilesInPath( job, path, ProcessAI )
|
|
|
|
FinishJob(job, "Finished Processesing AI")
|
|
return
|
|
|
|
|
|
def GenHashAndThumb(job, e):
|
|
# commit every 100 files to see progress being made but not hammer the database
|
|
if job.current_file_num % 100 == 0:
|
|
session.commit()
|
|
stat = os.stat( e.FullPathOnFS() )
|
|
if stat.st_ctime < e.file_details.last_hash_date:
|
|
if DEBUG==1:
|
|
print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
|
|
job.current_file_num+=1
|
|
return
|
|
|
|
e.file_details.hash = md5( job, e.FullPathOnFS() )
|
|
if DEBUG==1:
|
|
print( f"{e.name} - hash={e.file_details.hash}" )
|
|
if e.type.name == 'Image':
|
|
e.file_details.thumbnail = GenImageThumbnail( job, e.FullPathOnFS() )
|
|
elif e.type.name == 'Video':
|
|
e.file_details.thumbnail = GenVideoThumbnail( job, e.FullPathOnFS() )
|
|
elif e.type.name == 'Unknown':
|
|
job.current_file_num+=1
|
|
e.file_details.last_hash_date = time.time()
|
|
return
|
|
|
|
def ProcessAI(job, e):
|
|
if e.type.name != 'Image':
|
|
job.current_file_num+=1
|
|
return
|
|
|
|
file = e.FullPathOnFS()
|
|
stat = os.stat(file)
|
|
# find if file is newer than when we found faces before (fyi: first time faces_created_on == 0)
|
|
if stat.st_ctime > e.file_details.faces_created_on:
|
|
session.add(e)
|
|
im_orig = Image.open(file)
|
|
im = ImageOps.exif_transpose(im_orig)
|
|
|
|
faces = generateUnknownEncodings(im)
|
|
e.file_details.faces_created_on=time.time()
|
|
if faces:
|
|
flat_faces = numpy.array(faces)
|
|
e.file_details.faces = flat_faces.tobytes()
|
|
else:
|
|
e.file_details.faces = None
|
|
job.current_file_num+=1
|
|
return
|
|
else:
|
|
if not e.file_details.faces:
|
|
print("OPTIM: This image has no faces, skip it")
|
|
job.current_file_num+=1
|
|
return
|
|
recover=numpy.frombuffer(e.file_details.faces,dtype=numpy.float64)
|
|
real_recover=numpy.reshape(recover,(-1,128))
|
|
l=[]
|
|
for el in real_recover:
|
|
l.append(numpy.array(el))
|
|
faces = l
|
|
people = session.query(Person).all()
|
|
for unknown_encoding in faces:
|
|
for person in people:
|
|
lookForPersonInImage(job, person, unknown_encoding, e)
|
|
ProcessFileForJob(job, f"Finished processing {e.name}", e.name )
|
|
return
|
|
|
|
def lookForPersonInImage(job, person, unknown_encoding, e):
|
|
for refimg in person.refimg:
|
|
# lets see if we have tried this check before
|
|
frl=session.query(FileRefimgLink).filter(FileRefimgLink.file_id==e.id, FileRefimgLink.refimg_id==refimg.id).first()
|
|
if not frl:
|
|
frl = FileRefimgLink(refimg_id=refimg.id, file_id=e.file_details.eid)
|
|
else:
|
|
stat=os.stat( e.FullPathOnFS() )
|
|
# file & refimg are not newer then we dont need to check
|
|
if frl.matched and stat.st_ctime < frl.when_processed and refimg.created_on < frl.when_processed:
|
|
print(f"OPTIM: lookForPersonInImage: file {e.name} has a previous match for: {refimg.fname}, and the file & refimg haven't changed")
|
|
return
|
|
|
|
session.add(frl)
|
|
frl.matched=False
|
|
frl.when_processed=time.time()
|
|
deserialized_bytes = numpy.frombuffer(refimg.encodings, dtype=numpy.float64)
|
|
results = compareAI(deserialized_bytes, unknown_encoding)
|
|
if results[0]:
|
|
print(f'Found a match between: {person.tag} and {e.name}')
|
|
AddLogForJob(job, f'Found a match between: {person.tag} and {e.name}')
|
|
frl.matched=True
|
|
return
|
|
|
|
def generateUnknownEncodings(im):
|
|
unknown_image = numpy.array(im)
|
|
face_locations = face_recognition.face_locations(unknown_image)
|
|
if not face_locations:
|
|
return None
|
|
unknown_encodings = face_recognition.face_encodings(unknown_image, known_face_locations=face_locations)
|
|
return unknown_encodings
|
|
|
|
|
|
def generateKnownEncodings(person):
|
|
for refimg in person.refimg:
|
|
file = 'reference_images/'+refimg.fname
|
|
stat = os.stat(file)
|
|
if refimg.created_on and stat.st_ctime < refimg.created_on:
|
|
print("OPTIM: skipping re-creating encoding for refimg because file has not changed")
|
|
continue
|
|
img = face_recognition.load_image_file(file)
|
|
location = face_recognition.face_locations(img)
|
|
encodings = face_recognition.face_encodings(img, known_face_locations=location)
|
|
refimg.encodings = encodings[0].tobytes()
|
|
refimg.created_on = time.time()
|
|
session.add(refimg)
|
|
session.commit()
|
|
|
|
def compareAI(known_encoding, unknown_encoding):
|
|
results = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.55)
|
|
return results
|
|
|
|
|
|
def ProcessFilesInDir(job, e, file_func):
|
|
if DEBUG==1:
|
|
print("DEBUG: files in dir - process: {}".format(e.FullPathOnFS()) )
|
|
if e.type.name != 'Directory':
|
|
file_func(job, e)
|
|
else:
|
|
dir=session.query(Dir).filter(Dir.eid==e.id).first()
|
|
job.current_file_num+=1
|
|
for sub in dir.files:
|
|
ProcessFilesInDir(job, sub, file_func)
|
|
|
|
def JobGetFileDetails(job):
|
|
JobProgressState( job, "In Progress" )
|
|
#### I think the fix here is to get JobImportDir (or whatever makes the PATH) to add a jex for path_prefix and just pull it here, and stop 're-creating' it via SymlinkName
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path_prefix=[jex.value for jex in job.extra if jex.name == "path_prefix"][0]
|
|
print( f"JobGetFileDetails({job}) -- pp={path_prefix}" )
|
|
if DEBUG==1:
|
|
print("DEBUG: JobGetFileDetails for path={}".format( path_prefix ) )
|
|
p=session.query(Path).filter(Path.path_prefix==path_prefix).first()
|
|
job.current_file_num = 0
|
|
job.num_files = p.num_files
|
|
session.commit()
|
|
RunFuncOnFilesInPath( job, path_prefix, GenHashAndThumb )
|
|
FinishJob(job, "File Details job finished")
|
|
session.commit()
|
|
return
|
|
|
|
def isVideo(file):
|
|
try:
|
|
fileInfo = MediaInfo.parse(file)
|
|
for track in fileInfo.tracks:
|
|
if track.track_type == "Video":
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
# Returns an md5 hash of the fnames' contents
|
|
def md5(job, fname):
|
|
hash_md5 = hashlib.md5()
|
|
with open(fname, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
hash = hash_md5.hexdigest()
|
|
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, fname ) )
|
|
return hash
|
|
|
|
def isImage(file):
|
|
try:
|
|
Image.open(file)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def GenImageThumbnail(job, file):
|
|
ProcessFileForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
|
|
try:
|
|
im_orig = Image.open(file)
|
|
im = ImageOps.exif_transpose(im_orig)
|
|
bands = im.getbands()
|
|
if 'A' in bands:
|
|
im = im.convert('RGB')
|
|
im.thumbnail((THUMBSIZE,THUMBSIZE))
|
|
img_bytearray = io.BytesIO()
|
|
im.save(img_bytearray, format='JPEG')
|
|
img_bytearray = img_bytearray.getvalue()
|
|
thumbnail = base64.b64encode(img_bytearray)
|
|
thumbnail = str(thumbnail)[2:-1]
|
|
except Exception as e:
|
|
AddLogForJob(job, f"WARNING: No EXIF TAF found for: {file} - error={e}")
|
|
return None
|
|
return thumbnail
|
|
|
|
def GenVideoThumbnail(job, file):
|
|
ProcessFileForJob( job, "Generate Thumbnail from Video file: {}".format( file ), file )
|
|
try:
|
|
vcap = cv2.VideoCapture(file)
|
|
res, frame = vcap.read()
|
|
# if the mean pixel value is > 15, we have something worth making a sshot of (no black frame at start being the sshot)
|
|
while frame.mean() < 15 and res:
|
|
res, frame = vcap.read()
|
|
w = vcap.get(cv2.cv2.CAP_PROP_FRAME_WIDTH)
|
|
h = vcap.get(cv2.cv2.CAP_PROP_FRAME_HEIGHT)
|
|
if w > h:
|
|
factor = w / THUMBSIZE
|
|
else:
|
|
factor = h / THUMBSIZE
|
|
new_h = int(h / factor)
|
|
new_w = int(w / factor)
|
|
frame = cv2.resize(frame, (new_w, new_h), 0, 0, cv2.INTER_LINEAR)
|
|
|
|
res, thumb_buf = cv2.imencode('.jpeg', frame)
|
|
bt = thumb_buf.tobytes()
|
|
thumbnail = base64.b64encode(bt)
|
|
thumbnail = str(thumbnail)[2:-1]
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to Generate thumbnail for video file: {file} - error={e}" )
|
|
return None
|
|
return thumbnail
|
|
|
|
def CheckForDups(job):
|
|
AddLogForJob( job, f"Check for duplicates" )
|
|
|
|
res = session.execute( "select count(e1.id) from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb" )
|
|
for row in res:
|
|
if row.count > 0:
|
|
AddLogForJob(job, f"Found duplicates, Creating Status message in front-end for attention")
|
|
MessageToFE( job.id, "danger", f'Found duplicate(s), click <a href="javascript:document.body.innerHTML+=\'<form id=_fm method=POST action=/fix_dups></form>\'; document.getElementById(\'_fm\').submit();">here</a> to finalise import by removing duplicates' )
|
|
else:
|
|
FinishJob(job, f"No duplicates found")
|
|
FinishJob(job, f"Finished looking for duplicates")
|
|
return
|
|
|
|
def RemoveDups(job):
|
|
AddLogForJob(job, f"INFO: Starting Remove Duplicates job...")
|
|
# as checkdups covers all dups, delete all future dups messages, and Withdraw future checkdups jobs
|
|
msgs=session.query(PA_JobManager_FE_Message).join(Job).filter(Job.name=='checkdups')
|
|
for msg in msgs:
|
|
session.query(PA_JobManager_FE_Message).filter(PA_JobManager_FE_Message.id==msg.id).delete()
|
|
cd_jobs=session.query(Job).filter(Job.name=='checkdups').filter(Job.pa_job_state=='New').all()
|
|
for j in cd_jobs:
|
|
FinishJob(j, "Just removed duplicates - so no need to do any other checkdups, we will force 1 last one after the remove step", "Withdrawn")
|
|
session.commit()
|
|
|
|
dup_cnt=0
|
|
for jex in job.extra:
|
|
if 'kfid-' in jex.name:
|
|
_, which = jex.name.split('-')
|
|
hash=[jex.value for jex in job.extra if jex.name == f"kfhash-{which}"][0]
|
|
AddLogForJob(job, f"deleting duplicate files with hash: {hash} but keeping file with DB id={jex.value}" )
|
|
files=session.query(Entry).join(File).filter(File.hash==hash).all()
|
|
keeping=jex.value
|
|
found=None
|
|
del_me_lst = []
|
|
for f in files:
|
|
if os.path.isfile( f.FullPathOnFS() ) == False:
|
|
AddLogForJob( job, f"ERROR: (per file del) file (DB id: {f.id} - {f.FullPathOnFS()}) does not exist? ignorning file")
|
|
elif f.id == int(keeping):
|
|
found = f
|
|
else:
|
|
del_me_lst.append(f)
|
|
if found == None:
|
|
AddLogForJob( job, f"ERROR: (per file dup) Cannot find file with hash={hash} to process - skipping it)" )
|
|
else:
|
|
AddLogForJob(job, f"Keep duplicate file: {found.FullPathOnFS()}" )
|
|
for del_me in del_me_lst:
|
|
AddLogForJob(job, f"Remove duplicate (per file dup) file: {del_me.FullPathOnFS()}" )
|
|
MoveFileToRecycleBin(job,del_me)
|
|
# RemoveFileFromFS( del_me )
|
|
# RemoveFileFromDB(del_me.id)
|
|
|
|
if 'kdid-' in jex.name:
|
|
_, which = jex.name.split('-')
|
|
hashes=[jex.value for jex in job.extra if jex.name == f"kdhash-{which}"][0]
|
|
keeping=jex.value
|
|
tmp=session.query(Dir).filter(Dir.eid==keeping).first()
|
|
AddLogForJob(job, f"Keeping files in {tmp.PathOnFS()}" )
|
|
for hash in hashes.split(","):
|
|
files=session.query(Entry).join(File).filter(File.hash==hash).all()
|
|
found=None
|
|
del_me=None
|
|
for f in files:
|
|
if os.path.isfile(f.FullPathOnFS()) == False:
|
|
AddLogForJob( job, f"ERROR: (per path del) file (DB id: {f.id} - {f.FullPathOnFS()}) does not exist? ignorning file")
|
|
if f.in_dir.eid == int(keeping):
|
|
found=f
|
|
else:
|
|
del_me=f
|
|
|
|
if found == None:
|
|
AddLogForJob( job, f"ERROR: (per path dup - dir id={keeping}) Cannot find file with hash={hash} to process - skipping it)" )
|
|
else:
|
|
AddLogForJob(job, f"Keep duplicate file: {found.FullPathOnFS()}" )
|
|
AddLogForJob(job, f"Remove duplicate (per path dup) file: {del_me.FullPathOnFS()}" )
|
|
RemoveFileFromFS( del_me )
|
|
RemoveFileFromDB(del_me.id)
|
|
dup_cnt += 1
|
|
|
|
FinishJob(job, f"Finished removing {dup_cnt} duplicate files" )
|
|
|
|
# Need to put another checkdups job in now to force / validate we have no dups
|
|
now=datetime.now(pytz.utc)
|
|
next_job=Job(start_time=now, last_update=now, name="checkdups", state="New", wait_for=None, pa_job_state="New", current_file_num=0 )
|
|
session.add(next_job)
|
|
session.commit()
|
|
AddLogForJob(job, "adding <a href='/job/{}'>job id={} {}</a> to confirm there are no more duplicates".format( next_job.id, next_job.id, next_job.name ) )
|
|
return
|
|
|
|
def ValidateSettingsPaths():
|
|
settings = session.query(Settings).first()
|
|
rbp_exists=0
|
|
paths = settings.recycle_bin_path.split("#")
|
|
for path in paths:
|
|
if os.path.exists(path):
|
|
rbp_exists=1
|
|
break
|
|
if not rbp_exists:
|
|
print("ERROR: The bin path in settings does not exist - Please fix now");
|
|
sp_exists=0
|
|
paths = settings.storage_path.split("#")
|
|
for path in paths:
|
|
if os.path.exists(path):
|
|
sp_exists=1
|
|
break
|
|
if not sp_exists:
|
|
print("ERROR: None of the storge paths in the settings does not exist - Please fix now");
|
|
ip_exists=0
|
|
for path in paths:
|
|
if os.path.exists(path):
|
|
ip_exists=1
|
|
break
|
|
if not ip_exists:
|
|
print("ERROR: None of the import paths in the settings exist - Please fix now");
|
|
paths = settings.import_path.split("#")
|
|
if not rbp_exists or not sp_exists or not ip_exists:
|
|
print("ERROR: Existing until above errors are fixed by paths being created or settings being updated to valid paths" )
|
|
exit(-1)
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
|
|
|
|
ValidateSettingsPaths()
|
|
|
|
# now=datetime.now(pytz.utc)
|
|
# job=Job(start_time=now, last_update=now, name="scannow", state="New", wait_for=None, pa_job_state="New", current_file_num=0, num_files=0 )
|
|
# session.add(job)
|
|
# session.commit()
|
|
HandleJobs()
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
|
s.listen()
|
|
while True:
|
|
conn, addr = s.accept()
|
|
HandleJobs()
|