2699 lines
139 KiB
Python
2699 lines
139 KiB
Python
#
|
|
#
|
|
# This file controls the 'external' job control manager, that (periodically #
|
|
# looks / somehow is pushed an event?) picks up new jobs, and processes them.
|
|
#
|
|
# It then stores the progress/status, etc. in job and joblog tables as needed
|
|
# via wrapper functions.
|
|
#
|
|
# The whole pa_job_manager is multi-threaded, and uses the database tables for
|
|
# state management and communication back to the pa web site
|
|
#
|
|
###
|
|
|
|
# pylint: disable=no-member
|
|
|
|
### SQLALCHEMY IMPORTS ###
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy import Column, Integer, String, Sequence, Float, ForeignKey, DateTime, LargeBinary, Boolean, func
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.orm import scoped_session
|
|
|
|
### LOCAL FILE IMPORTS ###
|
|
from shared import DB_URL, PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT, THUMBSIZE, SymlinkName, GenThumb, SECS_IN_A_DAY
|
|
from datetime import datetime, timedelta, date
|
|
|
|
### PYTHON LIB IMPORTS ###
|
|
import pytz
|
|
import time
|
|
import os
|
|
import glob
|
|
from PIL import Image, ImageOps
|
|
from pymediainfo import MediaInfo
|
|
import hashlib
|
|
import exifread
|
|
import base64
|
|
import numpy
|
|
import cv2
|
|
import socket
|
|
import threading
|
|
import io
|
|
import face_recognition
|
|
import re
|
|
import re
|
|
import sys
|
|
import ffmpeg
|
|
import uuid
|
|
|
|
|
|
# global debug setting
|
|
if 'FLASK_ENV' not in os.environ or os.environ['FLASK_ENV'] != "production":
|
|
DEBUG=True
|
|
else:
|
|
DEBUG=False
|
|
|
|
# global list of override tables to allow enumeration over them ...
|
|
override_tbls={ "face_no_match_override", "face_force_match_override", "disconnected_no_match_override", "disconnected_force_match_override" }
|
|
|
|
# this is required to handle the duplicate processing code
|
|
sys.setrecursionlimit(50000)
|
|
|
|
# a Manager, which the Session will use for connection resources
|
|
some_engine = create_engine(DB_URL)
|
|
|
|
# create a configured "Session" class
|
|
#Session = sessionmaker(bind=some_engine)
|
|
|
|
# create a Session
|
|
session_factory = sessionmaker(bind=some_engine)
|
|
Session = scoped_session(session_factory)
|
|
session = Session()
|
|
|
|
# this creates the Base (like db model in flask)
|
|
Base = declarative_base()
|
|
|
|
|
|
################################################################################
|
|
# Class describing PathType & in the database (via sqlalchemy)
|
|
# series of pre-defined types of paths (import, storage, bin)
|
|
################################################################################
|
|
class PathType(Base):
|
|
__tablename__ = "path_type"
|
|
id = Column(Integer, Sequence('path_type_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}"
|
|
|
|
|
|
################################################################################
|
|
# Class describing PathDirLink & in the database (via sqlalchemy)
|
|
# connects a path with its matching entry (dir)
|
|
################################################################################
|
|
class PathDirLink(Base):
|
|
__tablename__ = "path_dir_link"
|
|
path_id = Column(Integer, ForeignKey("path.id"), primary_key=True )
|
|
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<path_id: {self.path_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
################################################################################
|
|
# Class describing EntryDirLink & in the database (via sqlalchemy)
|
|
# connects a path with its matching entry (dir)
|
|
################################################################################
|
|
class EntryDirLink(Base):
|
|
__tablename__ = "entry_dir_link"
|
|
entry_id = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
dir_eid = Column(Integer, ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
################################################################################
|
|
# Class describing Path & in the database (via sqlalchemy)
|
|
################################################################################
|
|
class Path(Base):
|
|
__tablename__ = "path"
|
|
id = Column(Integer, Sequence('path_id_seq'), primary_key=True )
|
|
type_id = Column(Integer, ForeignKey("path_type.id"))
|
|
type = relationship("PathType")
|
|
path_prefix = Column(String, unique=True, nullable=False )
|
|
num_files = Column(Integer)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, type={self.type}, path_prefix={self.path_prefix}>"
|
|
|
|
################################################################################
|
|
# Class describing Dir & in the database (via sqlalchemy)
|
|
# rel_path: rest of dir after path, e.g. if path = /..../storage, then
|
|
# rel_path could be 2021/20210101-new-years-day-pics
|
|
# in_path: only in this structure, not DB, quick ref to the path this dir is in
|
|
# PathOnFS(): method to get path on the FS for this dir
|
|
################################################################################
|
|
class Dir(Base):
|
|
__tablename__ = "dir"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
rel_path = Column(String, unique=True, nullable=False )
|
|
in_path = relationship("Path", secondary="path_dir_link", uselist=False)
|
|
last_import_date = Column(Float)
|
|
|
|
def PathOnFS(self):
|
|
return self.in_path.path_prefix+'/'+self.rel_path
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, rel_path: {self.rel_path}, in_path={self.in_path}, last_import_date: {self.last_import_date}>"
|
|
|
|
################################################################################
|
|
# Class describing Entry and in the DB (via sqlalchemy)
|
|
# an entry is the common bits between files and dirs
|
|
# type is a convenience var only in this class, not in DB
|
|
# {dir|file}_etails are convenience data for the relevant details from the Dir
|
|
# or File class - not in DB
|
|
# in_dir - is the Dir that this entry is located in (convenience for class only)
|
|
# FullPathOnFS(): method to get path on the FS for this Entry
|
|
################################################################################
|
|
class Entry(Base):
|
|
__tablename__ = "entry"
|
|
id = Column(Integer, Sequence('file_id_seq'), primary_key=True )
|
|
name = Column(String, unique=False, nullable=False )
|
|
type_id = Column(Integer, ForeignKey("file_type.id"))
|
|
exists_on_fs=Column(Boolean)
|
|
type=relationship("FileType")
|
|
dir_details = relationship( "Dir", uselist=False )
|
|
file_details = relationship( "File", uselist=False )
|
|
in_dir = relationship ("Dir", secondary="entry_dir_link", uselist=False )
|
|
|
|
# return the full path of this entry on the filesystem
|
|
def FullPathOnFS(self):
|
|
if self.in_dir:
|
|
s=self.in_dir.in_path.path_prefix + '/'
|
|
if len(self.in_dir.rel_path) > 0:
|
|
s += self.in_dir.rel_path + '/'
|
|
s += self.name
|
|
# this occurs when we have a dir that is the root of a path
|
|
else:
|
|
s=self.dir_details.in_path.path_prefix
|
|
return s
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name: {self.name}, type={self.type}, exists_on_fs={self.exists_on_fs}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}>"
|
|
|
|
################################################################################
|
|
# Class describing File and in the DB (via sqlalchemy)
|
|
# all files are entries, this is the extra bits only for a file, of note:
|
|
# hash is unique for files, and used to validate duplicates
|
|
# woy == week of year, all date fields are used to sort/show content. Date
|
|
# info can be from exif, or file system, or file name (rarely)
|
|
# faces: convenience field to show connected face(s) for this file
|
|
################################################################################
|
|
class File(Base):
|
|
__tablename__ = "file"
|
|
eid = Column(Integer, ForeignKey("entry.id"), primary_key=True )
|
|
size_mb = Column(Integer, unique=False, nullable=False)
|
|
hash = Column(Integer, unique=True, nullable=True)
|
|
thumbnail = Column(String, unique=False, nullable=True)
|
|
year = Column(Integer)
|
|
month = Column(Integer)
|
|
day = Column(Integer)
|
|
woy = Column(Integer)
|
|
last_hash_date = Column(Float)
|
|
last_ai_scan = Column(Float)
|
|
faces_created_on = Column(Float)
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, last_hash_date: {self.last_hash_date}>"
|
|
|
|
################################################################################
|
|
# Class describing DelFile and in the DB (via sqlalchemy)
|
|
# used to track deleted files so they can be restored
|
|
# and keep all associated data intact, e.g. faces, etc. are connected to file_eid
|
|
# and orig_path_prefix allows restoration to the original path/dir
|
|
################################################################################
|
|
class DelFile(Base):
|
|
__tablename__ = "del_file"
|
|
file_eid = Column(Integer, ForeignKey("file.eid"), primary_key=True )
|
|
orig_path_prefix = Column(String, unique=False )
|
|
|
|
def __repr__(self):
|
|
return f"<file_eid: {self.file_eid}, orig_path_prefix={self.orig_path_prefix}>"
|
|
|
|
################################################################################
|
|
# Class describing FileType and in the DB (via sqlalchemy)
|
|
# pre-defined list of file types (image, dir, etc.)
|
|
################################################################################
|
|
class FileType(Base):
|
|
__tablename__ = "file_type"
|
|
id = Column(Integer, Sequence('file_type_id_seq'), primary_key=True )
|
|
name = Column(String, unique=True, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}>"
|
|
|
|
################################################################################
|
|
# Class describing Settings and in the DB (via sqlalchemy)
|
|
################################################################################
|
|
class Settings(Base):
|
|
__tablename__ = "settings"
|
|
id = Column(Integer, Sequence('settings_id_seq'), primary_key=True )
|
|
base_path = Column(String)
|
|
import_path = Column(String)
|
|
storage_path = Column(String)
|
|
recycle_bin_path = Column(String)
|
|
metadata_path = Column(String)
|
|
auto_rotate = Column(Boolean)
|
|
default_refimg_model = Column(Integer,ForeignKey('ai_model.id'), unique=True, nullable=False)
|
|
default_scan_model = Column(Integer,ForeignKey('ai_model.id'), unique=True, nullable=False)
|
|
default_threshold = Column(Integer)
|
|
face_size_limit = Column(Integer)
|
|
scheduled_import_scan = Column(Integer)
|
|
scheduled_storage_scan = Column(Integer)
|
|
scheduled_bin_cleanup = Column(Integer)
|
|
bin_cleanup_file_age = Column(Integer)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, import_path: {self.import_path}, storage_path: {self.storage_path}, recycle_bin_path: {self.recycle_bin_path}, auto_rotate: {self.auto_rotate}, default_refimg_model: {self.default_refimg_model}, default_scan_model: {self.default_scan_model}, default_threshold: {self.default_threshold}, face_size_limit: {self.face_size_limit}, scheduled_import_scan:{self.scheduled_import_scan}, scheduled_storage_scan: {self.scheduled_storage_scan}, scheduled_bin_cleanup: {self.scheduled_bin_cleanup}, bin_cleanup_file_age: {self.bin_cleanup_file_age}>"
|
|
|
|
################################################################################
|
|
# Class describing Person to Refimg link in DB via sqlalchemy
|
|
################################################################################
|
|
class PersonRefimgLink(Base):
|
|
__tablename__ = "person_refimg_link"
|
|
person_id = Column(Integer, ForeignKey('person.id'), unique=True, nullable=False, primary_key=True)
|
|
refimg_id = Column(Integer, ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
|
|
|
|
def __repr__(self):
|
|
return f"<person_id: {self.person_id}, refimg_id: {self.refimg_id}>"
|
|
|
|
################################################################################
|
|
# Class describing Person in DB via sqlalchemy
|
|
################################################################################
|
|
class Person(Base):
|
|
__tablename__ = "person"
|
|
id = Column(Integer, Sequence('person_id_seq'), primary_key=True )
|
|
tag = Column(String(48), unique=False, nullable=False)
|
|
surname = Column(String(48), unique=False, nullable=False)
|
|
firstname = Column(String(48), unique=False, nullable=False)
|
|
refimg = relationship('Refimg', secondary=PersonRefimgLink.__table__)
|
|
|
|
def __repr__(self):
|
|
return f"<tag: {self.tag}, firstname: {self.firstname}, surname: {self.surname}, refimg: {self.refimg}>"
|
|
|
|
################################################################################
|
|
# Class describing Refimg in DB via sqlalchemy
|
|
# fname: original file name of refimg
|
|
# face: actual binary of numpy data for this refimg's face (always only 1)
|
|
# orig*: width/height of original image, because when we show in person, it get scaled
|
|
# thumbnail: image data of actual img. once we load refimg, we only store this data, not the orig file
|
|
# model_used: which AI model (cnn or hog) used to create face
|
|
# person: read-only convenience field not in DB, just used in html
|
|
################################################################################
|
|
class Refimg(Base):
|
|
__tablename__ = "refimg"
|
|
id = Column(Integer, Sequence('refimg_id_seq'), primary_key=True )
|
|
fname = Column(String(256), unique=True, nullable=False)
|
|
face = Column(LargeBinary, unique=True, nullable=False)
|
|
thumbnail = Column(String, unique=False, nullable=True)
|
|
created_on = Column(Float)
|
|
orig_w = Column(Integer)
|
|
orig_h = Column(Integer)
|
|
face_top = Column(Integer)
|
|
face_right = Column(Integer)
|
|
face_bottom = Column(Integer)
|
|
face_left = Column(Integer)
|
|
model_used = Column(Integer, ForeignKey("ai_model.id") )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, fname: {self.fname}, created_on: {self.created_on}>"
|
|
|
|
################################################################################
|
|
# Class describing AIModel in DB via sqlalchemy
|
|
# pre-defined list of models (cnn, hog)
|
|
################################################################################
|
|
class AIModel(Base):
|
|
__tablename__ = "ai_model"
|
|
id = Column(Integer, primary_key=True )
|
|
name = Column(String)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name: {self.name}>"
|
|
|
|
################################################################################
|
|
# Class describing Face in the database and DB via sqlalchemy
|
|
# - face contains the binary version of numpy array so we dont need to recalc it
|
|
################################################################################
|
|
class Face(Base):
|
|
__tablename__ = "face"
|
|
id = Column(Integer, Sequence('face_id_seq'), primary_key=True )
|
|
face = Column( LargeBinary )
|
|
face_top = Column(Integer)
|
|
face_right = Column(Integer)
|
|
face_bottom = Column(Integer)
|
|
face_left = Column(Integer)
|
|
w = Column(Integer)
|
|
h = Column(Integer)
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, face={self.face}"
|
|
|
|
################################################################################
|
|
# Class describing FaceFileLink in the database and DB via sqlalchemy
|
|
# each face comes from a file and used a model to find the face
|
|
# this is not perfect, each face in the same file is always foudn with the same
|
|
# model - so really should have ModelFileLink or something, in the long run
|
|
# this might even be better as ScanDetailsFileLink and ScanDetails
|
|
################################################################################
|
|
class FaceFileLink(Base):
|
|
__tablename__ = "face_file_link"
|
|
face_id = Column(Integer, ForeignKey("face.id"), primary_key=True )
|
|
file_eid = Column(Integer, ForeignKey("file.eid"), primary_key=True )
|
|
model_used = Column(Integer, ForeignKey("ai_model.id") )
|
|
|
|
def __repr__(self):
|
|
return f"<face_id: {self.face_id}, file_eid={self.file_eid}"
|
|
|
|
################################################################################
|
|
# Class describing FaceRefimgLink in the database and DB via sqlalchemy
|
|
# connects / implies a face has matched a refimg and we keep the distance too
|
|
# distance is mainly for debugging for now and shown in viewer
|
|
################################################################################
|
|
class FaceRefimgLink(Base):
|
|
__tablename__ = "face_refimg_link"
|
|
face_id = Column(Integer, ForeignKey("face.id"), primary_key=True )
|
|
refimg_id = Column(Integer, ForeignKey("refimg.id"), primary_key=True )
|
|
face_distance = Column(Float)
|
|
|
|
def __repr__(self):
|
|
return f"<face_id: {self.face_id}, refimg_id={self.refimg_id}"
|
|
|
|
|
|
################################################################################
|
|
# Class describing types of non-match overrides for faces
|
|
################################################################################
|
|
class FaceOverrideType(Base):
|
|
__tablename__ = "face_override_type"
|
|
id = Column(Integer, Sequence('face_override_type_id_seq'), primary_key=True )
|
|
name = Column( String )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}>"
|
|
|
|
################################################################################
|
|
# Class containing which faces are not ever allowed to match (and the
|
|
# type/reason why)
|
|
################################################################################
|
|
class FaceNoMatchOverride(Base):
|
|
__tablename__ = "face_no_match_override"
|
|
id = Column(Integer, Sequence('face_override_id_seq'), primary_key=True )
|
|
face_id = Column(Integer, ForeignKey("face.id"), primary_key=True )
|
|
type_id = Column(Integer, ForeignKey("face_override_type.id"))
|
|
type = relationship("FaceOverrideType")
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, face_id={self.face_id}, type: {self.type}>"
|
|
|
|
|
|
################################################################################
|
|
# Class containing a manual / forced matches of a face in a file to a person
|
|
################################################################################
|
|
class FaceForceMatchOverride(Base):
|
|
__tablename__ = "face_force_match_override"
|
|
id = Column(Integer, Sequence('face_override_id_seq'), primary_key=True )
|
|
face_id = Column(Integer, ForeignKey("face.id"), primary_key=True )
|
|
person_id = Column(Integer, ForeignKey("person.id"), primary_key=True )
|
|
person = relationship("Person")
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, face_id={self.face_id}, person_id={self.person_id}>"
|
|
|
|
|
|
################################################################################
|
|
# Class describing DisconnectedNoMatchOverride in the database and DB via
|
|
# sqlalchemy - Used when a face with an override is deleted from the DB to keep
|
|
# the raw data so that we can reconnect the override if we ever scan that same
|
|
# file/face again (think delete/undelete file, rebuild DB from file sys/from
|
|
# scratch, etc)
|
|
# used specifically for a face that should not ever be a match
|
|
################################################################################
|
|
class DisconnectedNoMatchOverride(Base):
|
|
__tablename__ = "disconnected_no_match_override"
|
|
face = Column( LargeBinary, primary_key=True )
|
|
type_id = Column(Integer, ForeignKey("face_override_type.id"))
|
|
|
|
def __repr__(self):
|
|
return f"<face: {self.face}, type_id={self.type_id}"
|
|
|
|
################################################################################
|
|
# Class describing DisconnectedForceMatchOverride in the database and DB via
|
|
# sqlalchemy - Used when a face with an override is deleted from the DB to keep
|
|
# the raw data so that we can reconnect the override if we ever scan that same
|
|
# file/face again (think delete/undelete file, rebuild DB from file sys/from
|
|
# scratch, etc)
|
|
# used specifically for a match that was forced between a face and a person
|
|
################################################################################
|
|
class DisconnectedForceMatchOverride(Base):
|
|
__tablename__ = "disconnected_force_match_override"
|
|
face = Column( LargeBinary, primary_key=True )
|
|
person_id = Column(Integer, ForeignKey('person.id'))
|
|
|
|
def __repr__(self):
|
|
return f"<face: {self.face}, person_id={self.person_id}"
|
|
|
|
################################################################################
|
|
# Class describing logs for each job and via sqlalchemy, connected to the DB as well
|
|
################################################################################
|
|
class Joblog(Base):
|
|
__tablename__ = "joblog"
|
|
id = Column(Integer, Sequence('joblog_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
log_date = Column(DateTime(timezone=True))
|
|
log = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, log_date: {}, log: {}".format(self.id, self.job_id, self.log_date, self.log )
|
|
|
|
################################################################################
|
|
# Class describing extra/specific info for a Job and via sqlalchemy, connected to the DB as well
|
|
################################################################################
|
|
class JobExtra(Base):
|
|
__tablename__ = "jobextra"
|
|
id = Column(Integer, Sequence('jobextra_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
name = Column(String)
|
|
value = Column(String)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, name: {}, value: {}>".format(self.id, self.job_id, self.name, self.value )
|
|
|
|
################################################################################
|
|
# Class describing Job and via sqlalchemy, connected to the DB as well
|
|
################################################################################
|
|
class Job(Base):
|
|
__tablename__ = "job"
|
|
id = Column(Integer, Sequence('job_id_seq'), primary_key=True )
|
|
start_time = Column(DateTime(timezone=True))
|
|
last_update = Column(DateTime(timezone=True))
|
|
name = Column(String)
|
|
state = Column(String)
|
|
num_files = Column(Integer)
|
|
current_file_num = Column(Integer)
|
|
current_file = Column(String)
|
|
wait_for = Column(Integer)
|
|
pa_job_state = Column(String)
|
|
|
|
logs = relationship( "Joblog")
|
|
extra = relationship( "JobExtra")
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, start_time: {}, last_update: {}, name: {}, state: {}, num_files: {}, current_file_num: {}, current_file: {}, pa_job_state: {}, wait_for: {}, extra: {}, logs: {}>".format(self.id, self.start_time, self.last_update, self.name, self.state, self.num_files, self.current_file_num, self.current_file, self.pa_job_state, self.wait_for, self.extra, self.logs)
|
|
|
|
|
|
##############################################################################
|
|
# Class describing PA_JobManager_FE_Message and in the DB (via sqlalchemy)
|
|
# the job manager (this code) can send a message back to the front end via the
|
|
# DB. has to be about a specific job_id and is success/danger, etc. (alert)
|
|
# and a message
|
|
################################################################################
|
|
class PA_JobManager_FE_Message(Base):
|
|
__tablename__ = "pa_job_manager_fe_message"
|
|
id = Column(Integer, Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
|
|
job_id = Column(Integer, ForeignKey('job.id') )
|
|
alert = Column(String)
|
|
message = Column(String)
|
|
def __repr__(self):
|
|
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
|
|
|
|
|
|
class PA_UserState(Base):
|
|
__tablename__ = "pa_user_state"
|
|
id = Column(Integer, Sequence('pa_user_state_id_seq'), primary_key=True )
|
|
pa_user_dn = Column(String, ForeignKey('pa_user.dn'), primary_key=True )
|
|
last_used = Column(DateTime(timezone=True))
|
|
path_type = Column(String, primary_key=True, unique=False, nullable=False )
|
|
noo = Column(String, unique=False, nullable=False )
|
|
grouping = Column(String, unique=False, nullable=False )
|
|
how_many = Column(Integer, unique=False, nullable=False )
|
|
st_offset = Column(Integer, unique=False, nullable=False )
|
|
size = Column(Integer, unique=False, nullable=False )
|
|
folders = Column(Boolean, unique=False, nullable=False )
|
|
root = Column(String, unique=False, nullable=False )
|
|
cwd = Column(String, unique=False, nullable=False )
|
|
## for now being lazy and not doing a separate table until I settle on needed fields and when
|
|
# only used if ptype == View
|
|
view_eid = Column(Integer, unique=False, nullable=False )
|
|
orig_ptype = Column(String, unique=False, nullable=False )
|
|
# only used if view and orig_ptype was search
|
|
orig_search_term = Column(String, unique=False, nullable=False )
|
|
orig_url = Column(String, unique=False, nullable=False )
|
|
current = Column(Integer)
|
|
first_eid = Column(Integer)
|
|
last_eid = Column(Integer)
|
|
num_entries = Column(Integer)
|
|
|
|
def __repr__(self):
|
|
return f"<pa_user_dn: {self.pa_user_dn}, path_type: {self.path_type}, noo: {self.noo}, grouping: {self.grouping}, how_many: {self.how_many}, st_offset: {self.st_offset}, size: {self.size}, folders: {self.folders}, root: {self.root}, cwd: {self.cwd}, view_eid: {self.view_eid}, orig_ptype: {self.orig_ptype}, orig_search_term: {self.orig_search_term}, orig_url: {self.orig_url}, current={self.current}, first_eid={self.first_eid}, last_eid={self.last_eid}, num_entries={self.num_entries}>"
|
|
|
|
|
|
##############################################################################
|
|
# NewJob(): convenience function to create a job, appropriately
|
|
##############################################################################
|
|
def NewJob(name, num_files=0, wait_for=None, jex=None, parent_job=None ):
|
|
job=Job( name=name, current_file_num=0, current_file='', num_files=num_files,
|
|
wait_for=wait_for, state="New", pa_job_state="New", start_time=None,
|
|
last_update=datetime.now(pytz.utc) )
|
|
if jex != None:
|
|
for e in jex:
|
|
job.extra.append(e)
|
|
|
|
session.add(job)
|
|
session.commit()
|
|
if parent_job:
|
|
str=f"adding <a href='/job/{job.id}'>job id={job.id} {job.name}</a>"
|
|
if job.wait_for:
|
|
str+=f" (wait for: {job.wait_for})"
|
|
AddLogForJob(parent_job, str )
|
|
return job
|
|
|
|
##############################################################################
|
|
# MessageToFE(): sends a specific alert/messasge for a given job via the DB to
|
|
# the front end
|
|
##############################################################################
|
|
def MessageToFE( job_id, alert, message ):
|
|
msg = PA_JobManager_FE_Message( job_id=job_id, alert=alert, message=message)
|
|
session.add(msg)
|
|
session.commit()
|
|
return msg.id
|
|
|
|
##############################################################################
|
|
# SettingsRBPath(): return modified array of paths (take each path in
|
|
# recycle_bin_path and add base_path if needed)
|
|
##############################################################################
|
|
def SettingsRBPath():
|
|
settings = session.query(Settings).first()
|
|
if settings == None:
|
|
print("Cannot create file data with no settings / recycle bin path is missing")
|
|
return
|
|
# path setting is an absolute path, just use it, otherwise prepend base_path first
|
|
p=settings.recycle_bin_path
|
|
if p == '/':
|
|
return p
|
|
else:
|
|
return settings.base_path+p
|
|
|
|
|
|
##############################################################################
|
|
# ProcessRecycleBinDir(): create Path/symlink if needed (func called once on
|
|
# startup)
|
|
##############################################################################
|
|
def ProcessRecycleBinDir(job):
|
|
path = SettingsRBPath()
|
|
if not os.path.exists( path ):
|
|
AddLogForJob( job, f"Not Importing {path} -- Path does not exist" )
|
|
return
|
|
|
|
ptype = session.query(PathType).filter(PathType.name=='Bin').first()
|
|
# check/create if needed
|
|
symlink=CreateSymlink(job,ptype.id,path)
|
|
# create the Path (and Dir objects for the Bin)
|
|
AddPath( job, symlink, ptype.id )
|
|
return
|
|
|
|
##############################################################################
|
|
# SettingsSPath(): return modified array of paths (take each path in
|
|
# storage_path and add base_path if needed)
|
|
##############################################################################
|
|
def SettingsSPath():
|
|
settings = session.query(Settings).first()
|
|
if settings == None or settings.storage_path == "":
|
|
print("Cannot create file data with no settings / storage path is missing")
|
|
return
|
|
p=settings.storage_path
|
|
if p[0] == '/':
|
|
return p
|
|
else:
|
|
return settings.base_path+p
|
|
|
|
##############################################################################
|
|
# ProcessStorageDirs(): wrapper func to call passed in job for each
|
|
# storage path defined in Settings - called via scan storage job
|
|
##############################################################################
|
|
def ProcessStorageDirs(parent_job):
|
|
path = SettingsSPath()
|
|
ptype = session.query(PathType).filter(PathType.name=='Storage').first()
|
|
JobsForPath( parent_job, path, ptype )
|
|
return
|
|
|
|
|
|
##############################################################################
|
|
# SettingsIPath(): return import path (abs or add base_path if needed)
|
|
##############################################################################
|
|
def SettingsIPath():
|
|
paths=[]
|
|
settings = session.query(Settings).first()
|
|
if settings == None or settings.import_path == "":
|
|
print("Cannot create file data with no settings / import path is missing")
|
|
return
|
|
p=settings.import_path
|
|
if p[0] == '/':
|
|
return p
|
|
else:
|
|
return settings.base_path+p
|
|
|
|
|
|
##############################################################################
|
|
# SettingsMPath(): return path to actual metadata path from settings
|
|
##############################################################################
|
|
def SettingsMPath():
|
|
settings = session.query(Settings).first()
|
|
if not settings or settings.metadata_path == "":
|
|
print ("WARNING: no Settings for metadata path")
|
|
return None
|
|
p=settings.metadata_path
|
|
if p[0] == '/':
|
|
return p
|
|
else:
|
|
return settings.base_path+p
|
|
|
|
|
|
##############################################################################
|
|
# ProcessImportDirs(): wrapper func to call passed in job for each
|
|
# storage path defined in Settings - called via scan import job
|
|
##############################################################################
|
|
def ProcessImportDirs(parent_job):
|
|
path = SettingsIPath()
|
|
ptype = session.query(PathType).filter(PathType.name=='Import').first()
|
|
JobsForPath( parent_job, path, ptype )
|
|
return
|
|
|
|
##############################################################################
|
|
# JobsForPath(): wrapper func to create jobs for passed in parent_job for
|
|
# path, with path_type passed in - used by Process{Storage|Import}Dirs() above
|
|
##############################################################################
|
|
def JobsForPath( parent_job, path, ptype ):
|
|
|
|
# try to find top of 'path' and use num_files for job later on
|
|
cfn=0
|
|
p=session.query(Path).filter(Path.path_prefix==SymlinkName(ptype.name,path,path+'/')).first()
|
|
if p:
|
|
cfn=p.num_files
|
|
|
|
# start with import dir
|
|
jex=[]
|
|
jex.append( JobExtra( name="path", value=path ) )
|
|
jex.append( JobExtra( name="path_type", value=ptype.id ) )
|
|
job1=NewJob( "importdir", cfn, None, jex, parent_job )
|
|
|
|
# then get file details (hash/thumbs)
|
|
jex=[]
|
|
jex.append( JobExtra( name="path", value=path ) )
|
|
job2=NewJob("getfiledetails", 0, job1.id, jex, parent_job )
|
|
|
|
# can start straight after importdir - job1, does not need details (job2)
|
|
jex=[]
|
|
jex.append( JobExtra( name="person", value="all" ) )
|
|
jex.append( JobExtra( name="path_type", value=ptype.id ) )
|
|
job3=NewJob("run_ai_on_path", 0, job1.id, jex, parent_job )
|
|
|
|
# careful here, wait for getfiledetails (job2), the ai job cannot cause a dup
|
|
# but it can fail - in which case the checkdup will be withdrawn
|
|
job4=NewJob( "checkdups", 0, job2.id, None, parent_job )
|
|
|
|
# okay, now process all the new jobs
|
|
HandleJobs(False)
|
|
return
|
|
|
|
##############################################################################
|
|
# ProcessFileForJob(): will set current_file in job for display in f/e job
|
|
# status, and if num_files is set of this job, then increment current file num
|
|
# to keep track of how far throught the job is (for display in f/e)
|
|
# and then add a log to say we have processed this file
|
|
##############################################################################
|
|
def ProcessFileForJob(job, message, current_file):
|
|
job.current_file=current_file
|
|
if job.num_files:
|
|
job.current_file_num=job.current_file_num+1
|
|
AddLogForJob(job, message )
|
|
return
|
|
|
|
##############################################################################
|
|
#
|
|
##############################################################################
|
|
def CleanFileFromBin(job, e):
|
|
# commit every 100 files to see progress being made but not hammer the database
|
|
if job.current_file_num % 100 == 0:
|
|
session.commit()
|
|
|
|
settings = session.query(Settings).first()
|
|
fname=e.FullPathOnFS()
|
|
try:
|
|
stat = os.stat(e.FullPathOnFS())
|
|
now=time.time()
|
|
# use ctime as that will be when the file was moved into the Bin path
|
|
if (now - stat.st_ctime)/SECS_IN_A_DAY >= settings.bin_cleanup_file_age:
|
|
try:
|
|
os.remove( fname )
|
|
except Exception as ex:
|
|
AddLogForJob(job, f"ERROR: Tried to delete old file: {ex}" )
|
|
RemoveFileFromDB( job, e, f"INFO: Removing file: {e.name} from system as it is older than {settings.bin_cleanup_file_age} - Age in days: {int(now - stat.st_ctime)/SECS_IN_A_DAY}" )
|
|
# if the file is no longer on the FS, then remove it.
|
|
except FileNotFoundError:
|
|
RemoveFileFromDB( job, e, f"INFO: Removing file: {e.name} from DB as it is has already been removed from the filesystem" )
|
|
# some other exception, just log and bail
|
|
except Exception as ex:
|
|
AddLogForJob(job, f"ERROR: Failed to find/stat old file - NOT removing it - {ex}" )
|
|
|
|
return
|
|
|
|
##############################################################################
|
|
# JobCleanBin(job): Job that checks to see if there are any files that are old
|
|
# and removes them for good from the Bin. Only triggered on schedule from
|
|
# Settings.scheduled_bin_cleanup days elapsed since last 'clean_bin' job
|
|
##############################################################################
|
|
def JobCleanBin(job):
|
|
JobProgressState( job, "In Progress" )
|
|
settings = session.query(Settings).first()
|
|
bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
|
|
RunFuncOnFilesInPath( job, bin_path.path_prefix, CleanFileFromBin, True )
|
|
FinishJob(job, f"Finished clean up of files older than {settings.bin_cleanup_file_age} days from Recycle Bin")
|
|
return
|
|
|
|
##############################################################################
|
|
# JobMetadata(job): is called when we add/remove an individual override
|
|
# and in future for 'notes' per file -- This function writes an 'extra' copy
|
|
# out to the filesystem. This allows a full delete/rebuild of the PA data
|
|
# and we won't lose any manual overrides
|
|
##############################################################################
|
|
def JobMetadata(job):
|
|
JobProgressState( job, "In Progress" )
|
|
which=[jex.value for jex in job.extra if jex.name == "which"][0]
|
|
face_id=[jex.value for jex in job.extra if jex.name == "face_id"][0]
|
|
f=session.query(Face).get(face_id)
|
|
try:
|
|
if which == 'add_force_match_override' or which=='remove_force_match_override':
|
|
person_id=[jex.value for jex in job.extra if jex.name == "person_id"][0]
|
|
p=session.query(Person).get(person_id)
|
|
os.makedirs( f"{SettingsMPath()}force_match_overrides", mode=0o777, exist_ok=True )
|
|
fname=f"{SettingsMPath()}force_match_overrides/{face_id}_{p.tag}"
|
|
elif which == 'add_no_match_override' or which == 'remove_no_match_override':
|
|
type_id=[jex.value for jex in job.extra if jex.name == "type_id"][0]
|
|
t=session.query(FaceOverrideType).get(type_id)
|
|
os.makedirs( f"{SettingsMPath()}no_match_overrides", mode=0o777, exist_ok=True )
|
|
fname=f"{SettingsMPath()}no_match_overrides/{face_id}_{t.name}"
|
|
else:
|
|
AddLogForJob(job, f"ERROR: Failed to process metadata (which={which})" )
|
|
return
|
|
if str.find( which, 'add_' ) == 0:
|
|
file_h=open(fname, 'wb')
|
|
file_h.write(f.face)
|
|
file_h.close()
|
|
else:
|
|
os.remove( fname )
|
|
except Exception as ex:
|
|
AddLogForJob(job, f"ERROR: Error with metadata file '{fname}': {ex}" )
|
|
FinishJob(job, f"Finished metadata job {which}" )
|
|
return
|
|
|
|
##############################################################################
|
|
# AddLogForJob(): add a log line to joblog, if the last time we wrote a log
|
|
# was over 5 seconds ago, then commit the log to the db, so in f/e we see
|
|
# progress no matter the speed of job log output
|
|
# also when DEBUG is set, print a debug log too
|
|
##############################################################################
|
|
def AddLogForJob(job, message):
|
|
now=datetime.now(pytz.utc)
|
|
log=Joblog( job_id=job.id, log=message, log_date=now )
|
|
job.last_update=datetime.now(pytz.utc)
|
|
session.add(log)
|
|
# some logs have DEBUG: in front, so clean that up
|
|
message = message.replace("DEBUG:", "" )
|
|
# if its been more than 5 seconds since our last log, then commit to the DB to show some progress
|
|
if hasattr(job, 'last_commit'):
|
|
if (now - job.last_commit).seconds > 5:
|
|
job.last_commit=now
|
|
session.commit()
|
|
else:
|
|
job.last_commit = now
|
|
if DEBUG:
|
|
print( f"DEBUG: {message}" )
|
|
return
|
|
|
|
##############################################################################
|
|
# RunJob(): runs the actual job, based on job.name we call the appropriate function
|
|
# if job.name is not 'known', then we catch all force a FinishJob to make sure
|
|
# it does not cause the job manager to choke on the unmatched job
|
|
##############################################################################
|
|
def RunJob(job):
|
|
# session = Session()
|
|
# only update start_time if we have never set it - stops restarts resetting start_time
|
|
if not job.start_time:
|
|
job.start_time=datetime.now(pytz.utc)
|
|
if job.name =="scannow":
|
|
JobScanNow(job)
|
|
elif job.name =="forcescan":
|
|
JobForceScan(job)
|
|
elif job.name =="scan_sp":
|
|
JobScanStorageDir(job)
|
|
elif job.name =="importdir":
|
|
JobImportDir(job)
|
|
elif job.name =="getfiledetails":
|
|
JobGetFileDetails(job)
|
|
elif job.name == "checkdups":
|
|
JobCheckForDups(job)
|
|
elif job.name == "rmdups":
|
|
JobRemoveDups(job)
|
|
elif job.name == "delete_files":
|
|
JobDeleteFiles(job)
|
|
elif job.name == "move_files":
|
|
JobMoveFiles(job)
|
|
elif job.name == "restore_files":
|
|
JobRestoreFiles(job)
|
|
elif job.name == "run_ai_on":
|
|
JobRunAIOn(job)
|
|
elif job.name == "run_ai_on_path":
|
|
JobRunAIOnPath(job)
|
|
elif job.name == "transform_image":
|
|
JobTransformImage(job)
|
|
elif job.name == "clean_bin":
|
|
JobCleanBin(job)
|
|
elif job.name == "metadata":
|
|
JobMetadata(job)
|
|
else:
|
|
FinishJob(job, f"ERROR: Requested to process unknown job type: {job.name}", "Failed")
|
|
# okay, we finished a job, so check for any jobs that are dependant on this and run them...
|
|
# session.close()
|
|
if job.pa_job_state != "Completed":
|
|
FinishJob(job, "PA Job Manager - This is a catchall to close of a Job, this should never be seen and implies a job did not actually complete?", "Failed" )
|
|
HandleJobs(False)
|
|
return
|
|
|
|
##############################################################################
|
|
# FinishJob(): finish this job off (if no overrides), its just marked completed
|
|
##############################################################################
|
|
def FinishJob(job, last_log, state="Completed", pa_job_state="Completed"):
|
|
job.state=state
|
|
job.pa_job_state=pa_job_state
|
|
if not job.start_time:
|
|
job.start_time=datetime.now(pytz.utc)
|
|
job.last_update=datetime.now(pytz.utc)
|
|
AddLogForJob(job, last_log)
|
|
if job.state=="Failed":
|
|
WithdrawDependantJobs( job, job.id, "failed" )
|
|
session.commit()
|
|
if DEBUG:
|
|
print( f"DEBUG: {last_log}" )
|
|
return
|
|
|
|
##############################################################################
|
|
# HandleJobs(first_run): go through each job, if it New, then tackle it --
|
|
# if first_run is True, then we are restarting the job manager and any job
|
|
# that was "In Progress" is stale, and should be handled -- mark it as Stale
|
|
# and that allows user in F/E to cancel or restart it
|
|
##############################################################################
|
|
def HandleJobs(first_run=False):
|
|
if first_run:
|
|
print("INFO: PA job manager is starting up - check for stale jobs" )
|
|
else:
|
|
if DEBUG:
|
|
print("DEBUG: PA job manager is scanning for new jobs to process")
|
|
for job in session.query(Job).filter(Job.pa_job_state != 'Complete').all():
|
|
if first_run and job.pa_job_state == 'In Progress':
|
|
print( f"INFO: Found stale job#{job.id} - {job.name}" )
|
|
job.pa_job_state = 'Stale'
|
|
session.add(job)
|
|
AddLogForJob( job, "ERROR: Job has been marked stale as it did not complete" )
|
|
MessageToFE( job.id, "danger", f'Stale job, click <a href="javascript:document.body.innerHTML+=\'<form id=_fm method=GET action=/stale_jobs></form>\'; document.getElementById(\'_fm\').submit();">here</a> to restart or cancel' )
|
|
session.commit()
|
|
continue
|
|
if job.pa_job_state == 'New':
|
|
if job.wait_for != None:
|
|
j2 = session.query(Job).get(job.wait_for)
|
|
if not j2:
|
|
AddLogForJob( job, f"ERROR: waiting for a job#({job.wait_for}) that does not exist? ")
|
|
print(f"ERROR: job.wait_for ({job.wait_for}) does not exist in below? " )
|
|
for j in session.query(Job).all():
|
|
print(f"ERROR: j={j.id}")
|
|
continue
|
|
if j2.pa_job_state != 'Completed':
|
|
continue
|
|
|
|
# use this to remove threads for easier debugging, and errors will stacktrace to the console
|
|
if DEBUG:
|
|
print("*************************************")
|
|
print("RUNNING job: id={} name={} wait_for={}".format(job.id, job.name, job.wait_for ))
|
|
RunJob(job)
|
|
else:
|
|
try:
|
|
RunJob(job)
|
|
# threading.Thread(target=RunJob, args=(job,)).start()
|
|
except Exception as e:
|
|
try:
|
|
MessageToFE( job.id, "danger", "Failed with: {} (try job log for details)".format(e) )
|
|
except Exception as e2:
|
|
print("ERROR: Failed to let front-end know, but back-end Failed to run job (id: {}, name: {} -- orig exep was: {}, this exception was: {})".format( job.id, job.name, e, e2) )
|
|
print("INFO: PA job manager is waiting for a job")
|
|
return
|
|
|
|
##############################################################################
|
|
# JobProgressState(): set pa_job_state to 'In Progress', and state to whatever
|
|
# is passed in (so the state in the f/e can be whatever, but the job mgr only
|
|
# has accurate pa_job_states
|
|
##############################################################################
|
|
def JobProgressState( job, state ):
|
|
job.pa_job_state = "In Progress"
|
|
job.state=state
|
|
session.commit()
|
|
return
|
|
|
|
##############################################################################
|
|
# JobScanNow(): start and process the job to start scanning now (import paths)
|
|
##############################################################################
|
|
def JobScanNow(job):
|
|
JobProgressState( job, "In Progress" )
|
|
ProcessImportDirs(job)
|
|
FinishJob( job, "Completed (scan for new files)" )
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
return
|
|
|
|
##############################################################################
|
|
# JobScanStorageDir(): start and process the job to start scanning now (storage paths)
|
|
##############################################################################
|
|
def JobScanStorageDir(job):
|
|
JobProgressState( job, "In Progress" )
|
|
ProcessStorageDirs(job)
|
|
FinishJob( job, "Completed (scan for new files)" )
|
|
MessageToFE( job.id, "success", "Completed (scan for new files)" )
|
|
return
|
|
|
|
|
|
##############################################################################
|
|
# DisconnectSingleNoMatchOverride( job, o ): takes a single NoMatch override
|
|
# and moves it over to the Disconnected version in the DB, and moves the
|
|
# metadata on the filesystem from a NMO to disco* version to renames file to
|
|
# use 0 for face_id and puts unique num on end
|
|
##############################################################################
|
|
def DisconnectSingleNoMatchOverride( job, o ):
|
|
f=session.query(Face).get(o.face_id)
|
|
ot=session.query(FaceOverrideType).get(o.type_id)
|
|
d=session.query(DisconnectedNoMatchOverride).filter(
|
|
DisconnectedNoMatchOverride.type_id==o.type_id, DisconnectedNoMatchOverride.face==f.face ).first()
|
|
# jic, check its not already there - shouldn't occur, but FS and DB can get out of sync
|
|
# no unique keys in Disco*, so just being over-cautious
|
|
if not d:
|
|
session.add( DisconnectedNoMatchOverride( face=f.face, type_id=o.type_id ) )
|
|
|
|
# now deal with 'renaming' the metadata on FS
|
|
mpath=f'{SettingsMPath()}/no_match_overrides/'
|
|
fname=f'{mpath}{o.face_id}_{ot.name}'
|
|
new_fname=f'{mpath}0_{ot.name}_{uuid.uuid4()}'
|
|
try:
|
|
if os.path.exists( fname ):
|
|
os.replace( fname, new_fname )
|
|
else:
|
|
file_h=open( new_fname, 'wb')
|
|
file_h.write(f.face)
|
|
file_h.close()
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to move an override to a 'DisconnectedNoMatchOverride' override in metadata: {e}")
|
|
|
|
session.query(FaceNoMatchOverride).filter( FaceNoMatchOverride.face_id==o.face_id, FaceNoMatchOverride.type_id==o.type_id).delete()
|
|
# force commit here as we now have added Disco, remove Override and made FS metadata match
|
|
session.commit()
|
|
return
|
|
|
|
##############################################################################
|
|
# DisconnectSingleForceMatchOverride( job, o ): takes a single ForceMatch
|
|
# override and moves it over to the Disconnected version in the DB, and moves
|
|
# the metadata on the filesystem from a NMO to disco* version to renames file
|
|
# to use 0 for face_id and puts unique num on end
|
|
##############################################################################
|
|
def DisconnectSingleForceMatchOverride( job, o ):
|
|
f=session.query(Face).get(o.face_id)
|
|
p=session.query(Person).get(o.person_id)
|
|
d=session.query(DisconnectedForceMatchOverride).filter(
|
|
DisconnectedForceMatchOverride.person_id==o.person_id, DisconnectedForceMatchOverride.face==f.face ).first()
|
|
# jic, check its not already there - shouldn't occur, but FS and DB can get out of sync
|
|
# no unique keys in Disco*, so just being over-cautious
|
|
if not d:
|
|
session.add( DisconnectedForceMatchOverride( face=f.face, person_id=o.person_id ) )
|
|
|
|
# now deal with 'renaming' the metadata on FS
|
|
path=f'{SettingsMPath()}/force_match_overrides/'
|
|
fname=f'{path}{o.face_id}_{p.tag}'
|
|
new_fname=f'{path}0_{p.tag}_{uuid.uuid4()}'
|
|
try:
|
|
if os.path.exists( fname ):
|
|
os.replace( fname, new_fname )
|
|
else:
|
|
file_h=open( new_fname, 'wb')
|
|
file_h.write(f.face)
|
|
file_h.close()
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to move an override to a 'DisconnectedForceMatchOverride' override in metadata: {e}")
|
|
|
|
session.query(FaceForceMatchOverride).filter( FaceForceMatchOverride.face_id==o.face_id, FaceForceMatchOverride.person_id==o.person_id).delete()
|
|
# force commit here as we now have added Disco, remove Override and made FS metadata match
|
|
session.commit()
|
|
return
|
|
|
|
##############################################################################
|
|
# All face Overrides should not just be deleted, they should be disconnected
|
|
# from the file (in face_file_link), but instead keep the raw face data so
|
|
# that a face that is found in a future scan can still keep the override
|
|
# connection
|
|
##############################################################################
|
|
def DisconnectAllOverrides(job):
|
|
overrides=session.query(FaceNoMatchOverride).all()
|
|
for o in overrides:
|
|
DisconnectSingleNoMatchOverride(job, o )
|
|
|
|
overrides=session.query(FaceForceMatchOverride).all()
|
|
for o in overrides:
|
|
DisconnectSingleForceMatchOverride( job, o )
|
|
return
|
|
|
|
|
|
##############################################################################
|
|
# JobForceScan(): start and process the job to force scanning now - so delete
|
|
# all attached data in DB, then scan import and storage paths
|
|
##############################################################################
|
|
def JobForceScan(job):
|
|
JobProgressState( job, "In Progress" )
|
|
DisconnectAllOverrides(job)
|
|
session.query(PA_UserState).delete()
|
|
session.query(FaceFileLink).delete()
|
|
session.query(FaceRefimgLink).delete()
|
|
session.query(Face).delete()
|
|
session.query(DelFile).delete()
|
|
session.query(EntryDirLink).delete()
|
|
session.query(PathDirLink).delete()
|
|
session.query(Path).delete()
|
|
session.query(Dir).delete()
|
|
session.query(File).delete()
|
|
session.query(Entry).delete()
|
|
session.commit()
|
|
ProcessRecycleBinDir(job)
|
|
ProcessImportDirs(job)
|
|
ProcessStorageDirs(job)
|
|
FinishJob(job, "Completed (forced remove and recreation of all file data)")
|
|
MessageToFE( job.id, "success", "Completed (forced remove and recreation of all file data)" )
|
|
return
|
|
|
|
##############################################################################
|
|
# CreateSymlink(): to serve static content of the images, we create a symlink
|
|
# from inside the static subdir of each path that exists
|
|
##############################################################################
|
|
def CreateSymlink(job,ptype,path):
|
|
path_type = session.query(PathType).get(ptype)
|
|
symlink=SymlinkName(path_type.name, path, path)
|
|
if not os.path.exists(symlink):
|
|
print( f"INFO: symlink does not exist, actually creating it -- s={symlink}" )
|
|
try:
|
|
os.makedirs( os.path.dirname(symlink), mode=0o777, exist_ok=True )
|
|
os.symlink(path, symlink)
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to create symlink - tried to link {symlink} -> {path}: {e}")
|
|
return symlink
|
|
|
|
##############################################################################
|
|
# AddPath() create or return a Path object basedon the path prefix (pp) and the
|
|
# type (the type id of Import, Storage, Bin) if the Path is created, it also
|
|
# creates the Dir object (which in turn creates the Entry object)
|
|
##############################################################################
|
|
def AddPath(job, pp, type ):
|
|
path_obj=session.query(Path).filter(Path.path_prefix==pp).first()
|
|
if not path_obj:
|
|
path_obj=Path( path_prefix=pp, num_files=0, type_id=type )
|
|
# if there is no path yet, then there is no Dir for it, so create that too (which creates the entry, etc.)
|
|
# Dir name is os.path.basename(pp), is not in another Dir (None), and its relative path is "", inside the path_obj we just created
|
|
dir=AddDir( job, os.path.basename(pp), None, "", path_obj )
|
|
session.add(path_obj)
|
|
session.add(dir)
|
|
session.commit()
|
|
return path_obj
|
|
|
|
|
|
####################################################################################################################################
|
|
# Key function that runs as part of (usually) an import job. The name of the directory (dirname) is checked to see
|
|
# if it already is in the database (inside of in_dir in in_path). If it is,
|
|
# just return the db entry. If not, then we create a new row in Dir, that has name of dirname, has a parent directory
|
|
# of in_dir (DB object), has the rel_path set to the relative fs path from the actual fs path to this entry (including the dirname)
|
|
# and the in_path set to the overarching path (one of an Import, Storage or Recycle_bin path in the DB)
|
|
#
|
|
# e.g. path on FS: /home/ddp/src/photoassistant/images_to_process/ ... ends in DB as path_prefix="static/Import/images_to_process"
|
|
# and we have a dir in /home/ddp/src/photoassistant/images_to_process/0000/subtest, then we call:
|
|
# AddDir( job, dirname='subtest', in_dir=Dir object for '0000', rel_path='0000/subtest',
|
|
# in_path=Path object for 'static/Import/images_to_process' )
|
|
####################################################################################################################################
|
|
def AddDir(job, dirname, in_dir, rel_path, in_path ):
|
|
dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==in_path.id).filter(Dir.rel_path==rel_path).first()
|
|
if dir:
|
|
e=session.query(Entry).get(dir.eid)
|
|
e.exists_on_fs=True
|
|
return dir
|
|
dir=Dir( last_import_date=0, rel_path=rel_path, in_path=in_path )
|
|
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
|
e=Entry( name=dirname, type=dtype, exists_on_fs=True )
|
|
e.dir_details=dir
|
|
# no in_dir occurs when we Add the actual Dir for the Path (top of the tree)
|
|
if in_dir:
|
|
e.in_dir=in_dir
|
|
if DEBUG:
|
|
AddLogForJob(job, f"DEBUG: Process new dir: {dirname}, rel_path={rel_path}")
|
|
session.add(e)
|
|
return dir
|
|
|
|
####################################################################################################################################
|
|
# AddFile(): adds a file into the given dir (in_dir) with dates provided. If
|
|
# it is already in the DB, just return the entry
|
|
####################################################################################################################################
|
|
def AddFile(job, fname, type_str, fsize, in_dir, year, month, day, woy ):
|
|
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==fname,Dir.eid==in_dir.eid).first()
|
|
if e:
|
|
e.exists_on_fs=True
|
|
return e
|
|
ftype = session.query(FileType).filter(FileType.name==type_str).first()
|
|
e=Entry( name=fname, type=ftype, exists_on_fs=True )
|
|
f=File( size_mb=fsize, last_hash_date=0, faces_created_on=0, year=year, month=month, day=day, woy=woy )
|
|
e.file_details = f
|
|
e.in_dir=in_dir
|
|
AddLogForJob(job, "Found new file: {}".format(fname) )
|
|
session.add(e)
|
|
return e
|
|
|
|
####################################################################################################################################
|
|
# reset exists_on_fs to False for everything in this import path, if we find it on the FS in the walk below, it goes back to True,
|
|
# anything that is still false, has been deleted (and will be deleted out of dir in other func as needed)
|
|
####################################################################################################################################
|
|
def ResetExistsOnFS(job, path):
|
|
reset_dirs = session.query(Entry).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).all()
|
|
for reset_dir in reset_dirs:
|
|
reset_dir.exists_on_fs=False
|
|
session.add(reset_dir)
|
|
reset_files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==reset_dir.id).all()
|
|
for reset_file in reset_files:
|
|
reset_file.exists_on_fs=False
|
|
session.add(reset_file)
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# Convenience function to remove a file from the database - and its associated links
|
|
# used when scanning and a file has been removed out from under PA, or when we remove duplicates
|
|
####################################################################################################################################
|
|
def RemoveEmtpyDirFromFS( job, del_me ):
|
|
try:
|
|
os.rmdir( del_me.FullPathOnFS() )
|
|
except FileNotFoundError:
|
|
AddLogForJob( job, f"INFO: Dir already removed -- {del_me.FullPathOnFS()}")
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to remove dir from filesystem - which={del_me.FullPathOnFS()}, err: {e}")
|
|
return
|
|
|
|
def RemoveEmptyDirFromDB( job, del_me ):
|
|
session.query(EntryDirLink).filter(EntryDirLink.entry_id==del_me.id).delete()
|
|
session.query(PathDirLink).filter(PathDirLink.dir_eid==del_me.id).delete()
|
|
session.query(Dir).filter(Dir.eid==del_me.id).delete()
|
|
session.query(Entry).filter(Entry.id==del_me.id).delete()
|
|
AddLogForJob( job, f"INFO: Removing {del_me.FullPathOnFS()} from system as removing files has left it empty" )
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# Convenience function called after we delete an entry from a DB (always starts as a file deletion), but if we have an empty dir
|
|
# this func will delete it and check that the parent dir is non-empty recursively, so this could trigger a cascading deletion of
|
|
# empty dirs in a hierachy, if the entry deletion leaves a dir with content, just finish
|
|
####################################################################################################################################
|
|
def CleanUpDirInDB(job, e):
|
|
session.commit()
|
|
print( f"CleanUpDirInDB(): checking dir: {e.FullPathOnFS()} ({e.id})" )
|
|
content = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==e.id).first()
|
|
if not content:
|
|
print( f" Dir {e.FullPathOnFS()} - {e.id} is empty - removing it" )
|
|
print( f" Entry {e}" )
|
|
# if no in_dir, we are at the root of the path, STOP
|
|
if not e.in_dir:
|
|
print( " Parent is empty, so NEVER delete this entry, returning" )
|
|
return
|
|
# okay remove this empty dir
|
|
RemoveEmtpyDirFromFS( job, e )
|
|
RemoveEmptyDirFromDB( job, e )
|
|
# get an Entry from DB (in_dir is a Dir/we need the ORM entry for code to work)
|
|
parent_dir = session.query(Entry).get(e.in_dir.eid)
|
|
print( f" Dir {e.FullPathOnFS()} is in {parent_dir.FullPathOnFS()} ({parent_dir.id}) -> check next" )
|
|
# check to see if removing the empty dir has left the parent dir empty
|
|
CleanUpDirInDB(job, parent_dir)
|
|
else:
|
|
print( f"There is content (first entry: {content.name}) in {e.FullPathOnFS()} - finished for this dir" )
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# Convenience function to remove a file from the database - and its associated links
|
|
# used when scanning and a file has been removed out from under PA, or when we remove duplicates
|
|
####################################################################################################################################
|
|
def RemoveFileFromDB(job, del_me, msg):
|
|
parent_dir_e=session.query(Entry).get(del_me.in_dir.eid)
|
|
session.query(EntryDirLink).filter(EntryDirLink.entry_id==del_me.id).delete()
|
|
connected_faces=session.query(FaceFileLink).filter(FaceFileLink.file_eid==del_me.id).all()
|
|
for ffl in connected_faces:
|
|
session.query(FaceRefimgLink).filter(FaceRefimgLink.face_id==ffl.face_id).delete()
|
|
session.query(Face).filter(Face.id==ffl.face_id).delete()
|
|
# this might be a file removed from the Bin path, so also get rid of its DelFile
|
|
session.query(DelFile).filter(DelFile.file_eid==del_me.id).delete()
|
|
session.query(File).filter(File.eid==del_me.id).delete()
|
|
session.query(Entry).filter(Entry.id==del_me.id).delete()
|
|
AddLogForJob( job, msg )
|
|
CleanUpDirInDB(job, parent_dir_e)
|
|
return
|
|
|
|
|
|
####################################################################################################################################
|
|
# Function that restores a file that was deleted (moved into the Bin)
|
|
# it moves file on the filesystem back to its original path and then changes the database path from the Bin path
|
|
# to the original import or storage path and appropriate dir
|
|
####################################################################################################################################
|
|
def RestoreFile(job,restore_me):
|
|
try:
|
|
# rel_path for a file in the Bin, is like 'Import/images_to_process/1111', so just prepend static/
|
|
dst_dir='static/' + restore_me.in_dir.rel_path + '/'
|
|
os.makedirs( dst_dir,mode=0o777, exist_ok=True )
|
|
src=restore_me.FullPathOnFS()
|
|
dst=dst_dir + '/' + restore_me.name
|
|
os.replace( src, dst )
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to restores (mv) file on filesystem - which={src} to {dst}, err: {e}")
|
|
|
|
# need these for AddDir calls below to work
|
|
orig_file_details = session.query(DelFile).get(restore_me.id)
|
|
orig_path = session.query(Path).filter(Path.path_prefix==orig_file_details.orig_path_prefix).first()
|
|
parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==orig_path.id).first()
|
|
|
|
# in case our new_rel_path is just in the top level of the path, make
|
|
# new_dir = parent_dir so restore_me.in_dir = new_dir after the for loop works
|
|
new_dir=parent_dir
|
|
|
|
# e.g. restore_me's rel_path 'Import/images_to_process/1111', orig_path was 'static/Import/images_to_process', need new rel_path to be just the 1111 bit...
|
|
new_rel_path='static/'+restore_me.in_dir.rel_path
|
|
new_rel_path=new_rel_path.replace(orig_file_details.orig_path_prefix, '')
|
|
if len(new_rel_path) > 0 and new_rel_path[-1] == '/':
|
|
new_rel_path=new_rel_path[0:-1]
|
|
if len(new_rel_path) > 0 and new_rel_path[0] == '/':
|
|
new_rel_path=new_rel_path[1:]
|
|
|
|
# okay, go through new relative path and AddDir any missing subdirs of this
|
|
new_dir=CreateFSLocation( job, orig_path, new_rel_path )
|
|
|
|
# keep orig parent dir, in recycle bin, if this restore moves last file out, then its now empty and can be deleted
|
|
orig_parent_dir_e = session.query(Entry).get(restore_me.in_dir.eid)
|
|
|
|
# reset restored file into its new dir
|
|
restore_me.in_dir = new_dir
|
|
AddLogForJob(job, f"Restored file: {restore_me.name} to {os.path.dirname(restore_me.FullPathOnFS())}" )
|
|
|
|
# remove DelFile entry for this restored file
|
|
session.query(DelFile).filter(DelFile.file_eid==restore_me.id).delete()
|
|
|
|
# remove if now empty
|
|
CleanUpDirInDB(job, orig_parent_dir_e)
|
|
session.commit()
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# Function that moves a file we are "deleting" to the recycle bin, it moves the file on the filesystem and then changes the
|
|
# database path from the import or storage path over to the Bin path
|
|
####################################################################################################################################
|
|
def MoveFileToRecycleBin(job,del_me):
|
|
try:
|
|
settings = session.query(Settings).first()
|
|
dst_dir= SettingsRBPath() + '/' + del_me.in_dir.in_path.path_prefix.replace('static/','') + '/' + del_me.in_dir.rel_path + '/'
|
|
os.makedirs( dst_dir,mode=0o777, exist_ok=True )
|
|
src=del_me.FullPathOnFS()
|
|
dst=dst_dir + '/' + del_me.name
|
|
os.replace( src, dst )
|
|
if DEBUG:
|
|
print( f"MoveFileToRecycleBin({job.id},{del_me.name}): os.replace {src} with {dst} " )
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to remove file from filesystem - which={src}, err: {e}")
|
|
|
|
# need these for AddDir calls below to work
|
|
bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
|
|
parent_dir=session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==bin_path.id).first()
|
|
|
|
# if we ever need to restore, lets remember this file's original path
|
|
# (use a string in case the dir/path is ever deleted from FS (and then DB) and we need to recreate)
|
|
del_file_details = DelFile( file_eid=del_me.id, orig_path_prefix=del_me.in_dir.in_path.path_prefix )
|
|
session.add( del_file_details )
|
|
|
|
# remove static from rel path, as it will move to static/Bin anyway...
|
|
new_rel_path=del_me.in_dir.in_path.path_prefix.replace('static/','')
|
|
# if there is a relative path on this dir, add it to the new_rel_path as there is only ever 1 Bin path
|
|
if len(del_me.in_dir.rel_path):
|
|
new_rel_path += '/' + del_me.in_dir.rel_path
|
|
|
|
# okay, go through new relative path and AddDir any missing subdirs of this
|
|
new_dir=CreateFSLocation( job, bin_path, new_rel_path )
|
|
|
|
# keep original parent_dir Entry before we 'move' it, as we need to see if its now empty, if so del it
|
|
orig_parent_dir_e = session.query(Entry).get(del_me.in_dir.eid)
|
|
del_me.in_dir = new_dir
|
|
AddLogForJob(job, f"Deleted file: {del_me.name} - (moved to {os.path.dirname(del_me.FullPathOnFS())})" )
|
|
CleanUpDirInDB(job, orig_parent_dir_e)
|
|
return
|
|
|
|
|
|
####################################################################################################################################
|
|
# move_me: (single) ENTRY being moved to a different folder. It might be a FILE or a DIR
|
|
# dst_storage_path: the path move_me is going into (import/storage, etc.)
|
|
# dst_rel_path: the relative path in the (new?) path, that is the new location
|
|
# (might contain any combo of existing/new dirs in the location)
|
|
#
|
|
# possible scenarios:
|
|
# 1: move_me is a File and it is being moved to a new folder (and maybe a new PATH)
|
|
# 2: move_me is a File and it is being moved to an existing folder
|
|
# need to just make sure that there is not a duplicate name for this file in existing folder
|
|
# 3: move_me is a Directory and it is being moved to a new folder (and maybe a new PATH)
|
|
# need to 'create' new Dir in new location (really just a logical move of the DB entry)
|
|
# This then requires all sub_dirs of move_me to have their path/rel_path's reset
|
|
# 4: move_me is a Directory and it is being moved an existing folder
|
|
# need to move old Dir INTO existing folder
|
|
# This then requires all sub_dirs of move_me to have their path/rel_path's reset
|
|
#
|
|
####################################################################################################################################
|
|
def MoveEntriesToOtherFolder(job, move_me, dst_storage_path, dst_rel_path):
|
|
if DEBUG:
|
|
print( f"DEBUG: MoveEntriesToOtherFolder( job={job.id}, move_me={move_me.name}, dst_storage_path={dst_storage_path.id}, dst_rel_path={dst_rel_path})")
|
|
orig_name=move_me.name
|
|
orig_fs_pos=move_me.FullPathOnFS()
|
|
|
|
if move_me.type.name == "Directory":
|
|
# see if there is an existing dir of new dst_rel_path already
|
|
parent_dir=session.query(Entry).join(Dir).join(PathDirLink).join(Path).filter(Path.id==dst_storage_path.id).filter(Dir.rel_path==dst_rel_path).first()
|
|
if parent_dir:
|
|
# scen 4 move move_me into existing dst_dir of requested location
|
|
dst_dir=parent_dir.dir_details
|
|
# remember, we are moving (move_me - a dir) into this existing dir (dst_dir), so add the name to dst_dir's rel_path...
|
|
move_me.dir_details.rel_path = dst_dir.rel_path + '/' + move_me.name
|
|
move_me.dir_details.in_path = dst_dir.in_path
|
|
move_me.in_dir = dst_dir
|
|
move_me.in_path = dst_dir.in_path
|
|
session.add(move_me)
|
|
# we use the new path to this new Dir with the full location (the old dir is put into the new location)
|
|
ResetAnySubdirPaths( move_me, dst_storage_path, move_me.dir_details.rel_path )
|
|
# move the actual dir to its new location
|
|
try:
|
|
os.replace( orig_fs_pos, move_me.FullPathOnFS() )
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to move dir: {orig_fs_pos} into {move_me.FullPathOnFS()}, err: {e}")
|
|
return
|
|
AddLogForJob( job, f"INFO: move {orig_fs_pos} -> {move_me.FullPathOnFS()}" )
|
|
return
|
|
else:
|
|
# scen 3: rename dir -- as the last component of dst_rel_path is what we will rename move_me to, so dont create last bit (os.path.dirname),
|
|
# we will just change move_me into that last dir -> renaming the dir
|
|
dst_dir=CreateFSLocation( job, dst_storage_path, os.path.dirname(dst_rel_path) )
|
|
|
|
move_me.dir_details.rel_path = dst_rel_path
|
|
move_me.dir_details.in_path = dst_dir.in_path
|
|
move_me.in_dir = dst_dir
|
|
move_me.in_path = dst_dir.in_path
|
|
move_me.name = os.path.basename(dst_rel_path)
|
|
session.add(move_me)
|
|
ResetAnySubdirPaths( move_me, dst_storage_path, dst_rel_path )
|
|
try:
|
|
os.replace( orig_fs_pos, move_me.FullPathOnFS() )
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to rename dir: {orig_fs_pos} -> {move_me.FullPathOnFS()}, err: {e}")
|
|
return
|
|
AddLogForJob( job, f"INFO: rename {orig_fs_pos} -> {move_me.FullPathOnFS()}" )
|
|
return
|
|
else:
|
|
# make (any needed) Dir for the new destination FS location
|
|
dst_dir=CreateFSLocation( job, dst_storage_path, dst_rel_path )
|
|
|
|
# check for duplicate name? (scen 2)
|
|
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==move_me.name,Dir.eid==dst_dir.eid).first()
|
|
if e:
|
|
AddLogForJob( job, f"INFO: Moving {move_me.name} and it is a duplicate of a another file (by name), prepending 'Move of'" )
|
|
# as we have original fs_pos, we can just rename new to be Move of XXX, and it will move it to that new name
|
|
move_me.name = 'Move of ' + move_me.name
|
|
|
|
# its a unique file in this new structure, so just make sure it is in the right DIR
|
|
orig_dir_eid = move_me.in_dir.eid
|
|
move_me.in_dir = dst_dir
|
|
move_me.in_path = dst_dir.in_path
|
|
session.add(move_me)
|
|
# move the actual file to its new location
|
|
AddLogForJob( job, f"DEBUG: move of FILE - {orig_fs_pos} -> {move_me.FullPathOnFS()}" )
|
|
try:
|
|
os.replace( orig_fs_pos, move_me.FullPathOnFS() )
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to move file: {orig_fs_pos} -> {move_me.FullPathOnFS()}, err: {e}")
|
|
return
|
|
|
|
old_dir = session.query(Entry).filter(Entry.id==orig_dir_eid).first()
|
|
CleanUpDirInDB(job, old_dir)
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# For a job, take a location in a path, traverse it making any new Dir's in the
|
|
# DB, and then make the dirs on the file system
|
|
####################################################################################################################################
|
|
def CreateFSLocation( job, dst_path, dst_locn ):
|
|
part_rel_path=""
|
|
parent_dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==dst_path.id).filter(Dir.rel_path=='').first()
|
|
for dirname in dst_locn.split("/"):
|
|
part_rel_path += f"{dirname}"
|
|
parent_dir=AddDir( job, dirname, parent_dir, part_rel_path, dst_path )
|
|
part_rel_path += "/"
|
|
try:
|
|
os.makedirs( dst_path.path_prefix + '/' + dst_locn, mode=0o777, exist_ok=True )
|
|
except Exception as e:
|
|
AddLogForJob( job, f"ERROR: Failed to makedirs: {dst_path.path_prefix + '/' + dst_locn} Err: {e}")
|
|
return parent_dir
|
|
|
|
|
|
####################################################################################################################################
|
|
# take a dir that is being moved, and reset its own and any sub dirs rel_paths,
|
|
# to the new PATH and relevant rel_path
|
|
####################################################################################################################################
|
|
def ResetAnySubdirPaths( moving_dir, dst_storage_path, parent_rel_path ):
|
|
if DEBUG:
|
|
print( f"ResetAnySubdirPaths( {moving_dir.name}, {dst_storage_path.path_prefix}, {parent_rel_path} )" )
|
|
sub_dirs = session.query(Entry).join(FileType).join(EntryDirLink).filter(EntryDirLink.dir_eid==moving_dir.id).filter(FileType.name=='Directory').all()
|
|
for sub in sub_dirs:
|
|
if DEBUG:
|
|
print( f"ResetAnySubdirPaths: WAS sub={sub.name}, ip={sub.in_dir.in_path.path_prefix}, rp={sub.dir_details.rel_path}" )
|
|
sub.in_path = dst_storage_path
|
|
sub.dir_details.in_path = dst_storage_path
|
|
sub.dir_details.rel_path = parent_rel_path + '/' + sub.name
|
|
if DEBUG:
|
|
print( f"ResetAnySubdirPaths: NOW sub={sub.name}, ip={sub.in_dir.in_path.path_prefix}, rp={sub.dir_details.rel_path}" )
|
|
ResetAnySubdirPaths( sub, dst_storage_path, sub.dir_details.rel_path )
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# Convenience function to remove a dir from the database - and its associated links
|
|
####################################################################################################################################
|
|
def RemoveDirFromDB(job, del_me):
|
|
session.query(PathDirLink).filter(PathDirLink.dir_eid==del_me.id).delete()
|
|
session.query(EntryDirLink).filter(EntryDirLink.entry_id==del_me.id).delete()
|
|
session.query(Dir).filter(Dir.eid==del_me.id).delete()
|
|
session.query(Entry).filter(Entry.id==del_me.id).delete()
|
|
AddLogForJob( job, f"INFO: Removing dir: id={del_me.name} from system as it is no longer on the file system")
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# this routine is used when a scan finds files/dirs that have been removed
|
|
# underneath PA, so it just deletes them form the DB
|
|
####################################################################################################################################
|
|
def HandleAnyFSDeletions(job):
|
|
dtype=session.query(FileType).filter(FileType.name=='Directory').first()
|
|
rms = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id!=dtype.id).all()
|
|
rm_cnt=0
|
|
for rm in rms:
|
|
DelFacesForFile( job, rm.id )
|
|
RemoveFileFromDB(job, rm, f"INFO: Removing file: {rm.name} from system as it is no longer on the file system")
|
|
rm_cnt+=1
|
|
|
|
rmdirs = session.query(Entry).filter(Entry.exists_on_fs==False,Entry.type_id==dtype.id).order_by(Entry.id.desc()).all()
|
|
for rmdir in rmdirs:
|
|
RemoveDirFromDB(job, rmdir)
|
|
rm_cnt+=1
|
|
return rm_cnt
|
|
|
|
####################################################################################################################################
|
|
# try several ways to work out date of file created (try exif, then filename, lastly filesystem)
|
|
####################################################################################################################################
|
|
def GetDateFromFile(file, stat):
|
|
# try exif
|
|
try:
|
|
f = open(file, 'rb')
|
|
tags = exifread.process_file(f)
|
|
date_str, _ = str(tags["EXIF DateTimeOriginal"]).split(" ")
|
|
year, month, day = date_str.split(":")
|
|
year=int(year)
|
|
month=int(month)
|
|
day=int(day)
|
|
check = datetime( year, month, day )
|
|
except:
|
|
# try parsing filename
|
|
try:
|
|
m=re.search( r'(\d{4})(\d{2})(\d{2})', file)
|
|
year=int(m[1])
|
|
month=int(m[2])
|
|
day=int(m[3])
|
|
check2 = datetime( year, month, day )
|
|
# give up and use file sys date
|
|
except:
|
|
# use mtime - probably best to based date from when the content was created/changed
|
|
year, month, day, _, _, _, _, _, _ = datetime.fromtimestamp(stat.st_mtime).timetuple()
|
|
c=date(year, month, day).isocalendar()
|
|
woy=c[1]
|
|
return year, month, day, woy
|
|
|
|
####################################################################################################################################
|
|
# AddJexToDependantJobs(): if a parent job has jex, then copy them down into dependant jobs so we can just use the data
|
|
####################################################################################################################################
|
|
def AddJexToDependantJobs(job,name,value):
|
|
for j in session.query(Job).filter(Job.wait_for==job.id).all():
|
|
jex=JobExtra( name=name, value=value )
|
|
j.extra.append(jex)
|
|
AddJexToDependantJobs(j, name, value)
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# WithdrawDependantJobs(): cancel/withdraw this dependant job, and if any other job is waiting on it then
|
|
# cancel it too (this is done recursively)
|
|
####################################################################################################################################
|
|
def WithdrawDependantJobs( job, id, reason ):
|
|
for j in session.query(Job).filter(Job.wait_for==id).all():
|
|
FinishJob(j, f"Job (#{j.id}) has been withdrawn -- #{job.id} {reason}", "Withdrawn" )
|
|
WithdrawDependantJobs(j, j.id, reason)
|
|
return
|
|
|
|
|
|
####################################################################################################################################
|
|
# next 3 funcs used to optimise whether to do dependant jobs (i.e. no new files, dont keep doing file details, ai scans
|
|
# find last successful importdir job for this path
|
|
####################################################################################################################################
|
|
def find_last_time_new_files_found(job):
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
jobs = session.execute( f"select j.* from job j, jobextra jex1, jobextra jex2 where j.id = jex1.job_id and j.id = jex2.job_id and jex1.name ='path' and jex1.value = '{path}' and jex2.name = 'new_files'")
|
|
|
|
for j in jobs:
|
|
return j.last_update.timestamp()
|
|
return 0
|
|
|
|
####################################################################################################################################
|
|
# find time of last getfiledetails job for this path
|
|
####################################################################################################################################
|
|
def find_last_successful_gfd_job(job):
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
jobs=session.query(Job).join(JobExtra).filter(Job.name=="getfiledetails").filter(JobExtra.value==path).filter(Job.state=='Completed').order_by(Job.id.desc()).limit(1).all()
|
|
for j in jobs:
|
|
return j.last_update.timestamp()
|
|
return 0
|
|
|
|
####################################################################################################################################
|
|
# find time of last run_ai_on_path job for this path
|
|
####################################################################################################################################
|
|
def find_last_successful_ai_scan(job):
|
|
path_type=[jex.value for jex in job.extra if jex.name == "path_type"][0]
|
|
jobs=session.query(Job).join(JobExtra).filter(Job.name=="run_ai_on_path").filter(JobExtra.name=='path_type',JobExtra.value==path_type).filter(Job.state=='Completed').order_by(Job.id.desc()).limit(1).all()
|
|
for j in jobs:
|
|
return j.last_update.timestamp()
|
|
return 0
|
|
|
|
####################################################################################################################################
|
|
# when an import job actually finds new files, then the pa_user_state caches will become invalid (offsets are now wrong)
|
|
####################################################################################################################################
|
|
def DeleteOldPA_UserState(job):
|
|
# clear them out for now - this is 'dumb', just delete ALL. Eventually, can do this based on just the path &/or whether the last_used is
|
|
# newer than this delete moment (only would be a race condition between an import changing things and someone simultaneously viewing)
|
|
# path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
session.query(PA_UserState).delete()
|
|
return
|
|
|
|
|
|
|
|
####################################################################################################################################
|
|
# JobImportDir(): job that scan import dir and processes entries in there - key function that uses os.walk() to traverse the
|
|
# file system and calls AddFile()/AddDir() as necessary
|
|
####################################################################################################################################
|
|
def JobImportDir(job):
|
|
JobProgressState( job, "In Progress" )
|
|
settings = session.query(Settings).first()
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path_type=[jex.value for jex in job.extra if jex.name == "path_type"][0]
|
|
ptype = session.query(PathType).get(path_type)
|
|
AddLogForJob(job, f"Checking {ptype.name} Directory: {path}" )
|
|
if DEBUG:
|
|
print( f"DEBUG: Checking Directory: {path}" )
|
|
if not os.path.exists( path ):
|
|
WithdrawDependantJobs( job, job.id, "scan job found no new files to process" )
|
|
FinishJob( job, f"Finished Importing: {path} -- Path does not exist", "Failed" )
|
|
return
|
|
symlink=SymlinkName(ptype.name, path, path)
|
|
|
|
# create/find the Path
|
|
path_obj=AddPath( job, symlink, path_type )
|
|
# for recycle bin path, we dont want to import content, just create the path/dir vars (above) in the DB
|
|
bin_path=session.query(Path).join(PathType).filter(PathType.name=='Bin').first()
|
|
if bin_path != None and path_type == bin_path.type.id:
|
|
return
|
|
|
|
# find all jobs waiting on me and their children, etc. and add a path_prefix jex to symlink, so we can just reference it form here on in, rather than recreate that string
|
|
AddJexToDependantJobs(job,"path_prefix",symlink)
|
|
ResetExistsOnFS(job, symlink)
|
|
|
|
# go through data once to work out file_cnt so progress bar works from first import
|
|
try:
|
|
walk=os.walk(path, topdown=True)
|
|
except Exception as e:
|
|
WithdrawDependantJobs( job, job.id, "scan job FAILED" )
|
|
FinishJob(job, f"ERROR: Failed to 'walk' the filesystem at: {path} Err: {e}", "Failed" )
|
|
return
|
|
|
|
ftree=list(walk)
|
|
overall_file_cnt=0
|
|
for root, subdirs, files in ftree:
|
|
overall_file_cnt+= len(subdirs) + len(files)
|
|
path_obj.num_files=overall_file_cnt
|
|
|
|
# find the dir we created with AddPath to use for entries at top-level of path
|
|
dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==path_obj.id,Dir.rel_path=='').first()
|
|
# session.add in case we already have imported this dir (as AddDir wont) & now we might have diff num of files to last time,
|
|
session.add(dir)
|
|
|
|
orig_last_import = dir.last_import_date
|
|
|
|
# if we set / then commit this now, the web page will know how many files
|
|
# to process as we then do the slow job of processing them
|
|
job.num_files=overall_file_cnt
|
|
AddLogForJob(job, f"Found {overall_file_cnt} file(s) to process")
|
|
session.commit()
|
|
|
|
# we will store this into the job extras as the final result of whether this job found any new files,
|
|
# if it did not, then the dependant jobs dont need to really run - use to optimise them
|
|
found_new_files=0
|
|
# root == path of dir, files are in dir... subdirs are in dir
|
|
for root, subdirs, files in ftree:
|
|
# already create root above to work out num_files for whole os.walk
|
|
if root != path:
|
|
pp=SymlinkName( path_obj.type.name, path, root )+'/'+os.path.basename(root)
|
|
rel_path=pp.replace(symlink+'/','')
|
|
parent_dir=session.query(Dir).join(PathDirLink).join(Path).filter(Path.id==path_obj.id,Dir.rel_path==os.path.dirname(rel_path)).first()
|
|
dir=AddDir(job, os.path.basename(root), parent_dir, rel_path, path_obj)
|
|
for basename in files:
|
|
# commit every 100 files to see progress being made but not hammer the database
|
|
if job.current_file_num % 100 == 0:
|
|
session.commit()
|
|
fname=dir.PathOnFS()+'/'+basename
|
|
|
|
try:
|
|
stat = os.stat(fname)
|
|
except Exception as e:
|
|
AddLogForJob(job, f"failed to stat file - was it removed from the underlying filesystem while PA was scanning? Err: {e}" )
|
|
continue
|
|
|
|
# use ctime as even a metadata change (mv'd file on the fs, or a perms change) needs to be checked
|
|
if stat.st_ctime > dir.last_import_date:
|
|
if DEBUG:
|
|
print("DEBUG: {} - {} is newer than {}".format( basename, stat.st_ctime, dir.last_import_date ) )
|
|
if isImage(fname):
|
|
type_str = 'Image'
|
|
elif isVideo(fname):
|
|
type_str = 'Video'
|
|
else:
|
|
type_str = 'Unknown'
|
|
fsize = round(stat.st_size/(1024*1024))
|
|
|
|
year, month, day, woy = GetDateFromFile(fname, stat)
|
|
e=AddFile( job, basename, type_str, fsize, dir, year, month, day, woy )
|
|
found_new_files += 1
|
|
else:
|
|
if DEBUG:
|
|
print( f"DEBUG: { basename} - {stat.st_ctime} is OLDER than {dir.last_import_date}" )
|
|
e=session.query(Entry).join(EntryDirLink).join(Dir).filter(Entry.name==basename,Dir.eid==dir.eid).first()
|
|
e.exists_on_fs=True
|
|
job.current_file=basename
|
|
job.current_file_num+=1
|
|
job.current_file_num += len(subdirs)
|
|
dir.last_import_date = time.time()
|
|
job.num_files=overall_file_cnt
|
|
if found_new_files:
|
|
job.extra.append( JobExtra( name="new_files", value=found_new_files ) )
|
|
session.add(job)
|
|
# this will invalidate pa_user_state for this path's contents (offsets are now wrong), clear them out
|
|
DeleteOldPA_UserState(job)
|
|
|
|
rm_cnt=HandleAnyFSDeletions(job)
|
|
|
|
if found_new_files == 0:
|
|
last_scan=find_last_time_new_files_found(job)
|
|
last_file_details=find_last_successful_gfd_job(job)
|
|
last_ai_scan=find_last_successful_ai_scan(job)
|
|
|
|
for j in session.query(Job).filter(Job.wait_for==job.id).all():
|
|
if j.name == "getfiledetails" and last_file_details > last_scan:
|
|
FinishJob(j, f"Job (#{j.id}) has been withdrawn -- #{job.id} (scan job) did not find new files", "Withdrawn" )
|
|
# scan found no new files and last ai scan was after the last file scan
|
|
if j.name == "run_ai_on_path" and last_ai_scan > last_scan:
|
|
newest_refimg = session.query(Refimg).order_by(Refimg.created_on.desc()).limit(1).all()
|
|
# IF we also have no new refimgs since last scan, then no need to run any AI again
|
|
if newest_refimg and newest_refimg[0].created_on < last_scan:
|
|
FinishJob(j, f"Job (#{j.id}) has been withdrawn -- scan did not find new files, and no new reference images since last scan", "Withdrawn" )
|
|
# IF we also have no new refimgs since last AI scan, then no need to run any AI again
|
|
elif newest_refimg and newest_refimg[0].created_on < last_ai_scan:
|
|
FinishJob(j, f"Job (#{j.id}) has been withdrawn -- scan did not find new files, and no new reference images since last scan", "Withdrawn" )
|
|
FinishJob(job, f"Finished Importing: {path} - Processed {overall_file_cnt} files, Found {found_new_files} new files, Removed {rm_cnt} file(s)")
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# RunFuncOnFilesInPath(): take a path, find the Dir Entry for it, then go through each file in the dir (use ORM here)
|
|
####################################################################################################################################
|
|
def RunFuncOnFilesInPath( job, path, file_func, count_dirs ):
|
|
d = session.query(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==path).filter(Dir.rel_path=='').first()
|
|
files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==d.eid).all()
|
|
for e in files:
|
|
ProcessFilesInDir(job, e, file_func, count_dirs)
|
|
return
|
|
|
|
|
|
####################################################################################################################################
|
|
# WrapperForScanFileForPerson(): take an entry and scan it for a person (if it is an image), update job file counts as needed too
|
|
####################################################################################################################################
|
|
def WrapperForScanFileForPerson(job, entry):
|
|
if entry.type.name == 'Image':
|
|
ScanFileForPerson( job, entry, force=False)
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# Simple function that allows us to count image files (allows us to know AI job's total # of files)
|
|
####################################################################################################################################
|
|
def AddToJobImageCount(job, entry ):
|
|
if entry.type.name == 'Image':
|
|
job.num_files += 1
|
|
return
|
|
|
|
|
|
####################################################################################################################################
|
|
# JobRunAIOnPath(): job that uses path type (Import or Storage) to add to the job extras - dir_eid of the root Dir of the Path
|
|
# so we can then just calls JobRunAIOn
|
|
####################################################################################################################################
|
|
def JobRunAIOnPath(job):
|
|
JobProgressState( job, "In Progress" )
|
|
path_type=[jex.value for jex in job.extra if jex.name == "path_type"][0]
|
|
paths=session.query(Path).join(PathType).filter(PathType.id==path_type).all()
|
|
path_cnt=0
|
|
for p in paths:
|
|
d = session.query(Dir).join(PathDirLink).filter(PathDirLink.path_id==p.id).filter(Dir.rel_path=='').first()
|
|
DelMatchesForDir( job, d.eid )
|
|
# small chance we are restarting a job (and this is the one that is likely to be 'stale' and restarted, so accommodate
|
|
# this by not adding it twice -- only really throws count out and makes it 're-process', but if this say the storage path,
|
|
# that can be another 10s of thousands of files to re-AI over...
|
|
already_there=False
|
|
for ex in job.extra:
|
|
if ex.name == f"eid-{path_cnt}":
|
|
already_there=True
|
|
break
|
|
if not already_there:
|
|
job.extra.append( JobExtra( name=f"eid-{path_cnt}", value=f"{d.eid}" ) )
|
|
path_cnt+=1
|
|
JobRunAIOn(job)
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# JobRunAIOn(): job that grabs relevant refimgs to scan against the given set of entries (from seln made in F/E)
|
|
####################################################################################################################################
|
|
def JobRunAIOn(job):
|
|
# TODO: need to use JobInProgress... (here and other jobs?) -- so should I move this to HandleJob???
|
|
JobProgressState( job, "In Progress" )
|
|
AddLogForJob(job, f"INFO: Starting job to look for faces in files...")
|
|
which_person=[jex.value for jex in job.extra if jex.name == "person"][0]
|
|
if which_person == "all":
|
|
job.refimgs = session.query(Refimg).all()
|
|
else:
|
|
job.refimgs=session.query(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag==which_person).all()
|
|
|
|
if not job.refimgs:
|
|
FinishJob(job, "Failed Processesing AI - the person(s) you chose has no reference images!", "Failed" )
|
|
return
|
|
|
|
# start by working out how many images in this selection we will need face match on
|
|
job.num_files = 0
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
entry=session.query(Entry).get(jex.value)
|
|
if entry.type.name == 'Directory':
|
|
# False in last param says, dont count dirs (we won't AI a dir entry itself)
|
|
ProcessFilesInDir( job, entry, AddToJobImageCount, False )
|
|
elif entry.type.name == 'Image':
|
|
job.num_files += 1
|
|
# update job, so file count UI progress bar will work
|
|
# remember that ProcessFilesInDir updates the current_file_num so zero it out so we can start again
|
|
job.current_file_num = 0
|
|
session.commit()
|
|
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
entry=session.query(Entry).get(jex.value)
|
|
if entry.type.name == 'Directory':
|
|
# False in last param says, dont count dirs (we won't AI a dir entry itself)
|
|
ProcessFilesInDir( job, entry, WrapperForScanFileForPerson, False )
|
|
elif entry.type.name == 'Image':
|
|
which_file=session.query(Entry).join(File).filter(Entry.id==jex.value).first()
|
|
ScanFileForPerson( job, which_file, force=False)
|
|
else:
|
|
AddLogForJob( job, f'Not processing Entry: {entry.name} - not an image' )
|
|
FinishJob(job, "Finished Processesing AI")
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# JobTransformImage(): transform an image by the amount requested (can also flip horizontal or vertical)
|
|
# TODO: should be JobTransformImage() ;)
|
|
####################################################################################################################################
|
|
def JobTransformImage(job):
|
|
JobProgressState( job, "In Progress" )
|
|
AddLogForJob(job, f"INFO: Starting rotation/flip of image file...")
|
|
id=[jex.value for jex in job.extra if jex.name == "id"][0]
|
|
amt=[jex.value for jex in job.extra if jex.name == "amt"][0]
|
|
e=session.query(Entry).join(File).filter(Entry.id==id).first()
|
|
print( f"JobTransformImage: job={job.id}, id={id}, amt={amt}" )
|
|
im = Image.open( e.FullPathOnFS() )
|
|
|
|
if amt == "fliph":
|
|
AddLogForJob(job, f"INFO: Flipping {e.FullPathOnFS()} horizontally" )
|
|
out = im.transpose(Image.FLIP_LEFT_RIGHT)
|
|
elif amt == "flipv":
|
|
AddLogForJob(job, f"INFO: Flipping {e.FullPathOnFS()} vertically" )
|
|
out = im.transpose(Image.FLIP_TOP_BOTTOM)
|
|
else:
|
|
AddLogForJob(job, f"INFO: Rotating {e.FullPathOnFS()} by {amt} degrees" )
|
|
if im.format == 'JPEG':
|
|
im=ImageOps.exif_transpose(im)
|
|
out = im.rotate(int(amt), expand=True)
|
|
out.save( e.FullPathOnFS() )
|
|
print( f"JobTransformImage DONE transform: job={job.id}, id={id}, amt={amt}" )
|
|
settings = session.query(Settings).first()
|
|
e.file_details.thumbnail, _ , _ = GenThumb( e.FullPathOnFS(), settings.auto_rotate )
|
|
e.file_details.hash = md5( job, e )
|
|
print( f"JobTransformImage DONE thumb: job={job.id}, id={id}, amt={amt}" )
|
|
session.add(e)
|
|
FinishJob(job, "Finished Processesing image rotation/flip")
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# GenHashAndThumb(): calc. MD5 hash on given entry and generate image thumbnail for image, or video thumbnail for video
|
|
####################################################################################################################################
|
|
def GenHashAndThumb(job, e):
|
|
# commit every 100 files to see progress being made but not hammer the database
|
|
if job.current_file_num % 100 == 0:
|
|
session.commit()
|
|
try:
|
|
stat = os.stat( e.FullPathOnFS() )
|
|
except Exception as e:
|
|
AddLogForJob(job, f"failed to stat file - was it removed from the underlying filesystem while PA was scanning? Err: {e}" )
|
|
job.current_file_num+=1
|
|
return
|
|
# use mtime as only if the content is different do we need to redo the hash
|
|
if stat.st_mtime < e.file_details.last_hash_date:
|
|
if DEBUG:
|
|
print(f"OPTIM: GenHashAndThumb {e.name} file is older than last hash, skip this")
|
|
job.current_file_num+=1
|
|
return
|
|
|
|
new_hash = md5( job, e )
|
|
# same hash and we already have a thumbnail-> just return
|
|
if new_hash == e.file_details.hash and e.file_details.thumbnail:
|
|
if DEBUG:
|
|
print(f"OPTIM: GenHashAndThumb {e.name} md5 is same - likely a mv on filesystem so skip md5/thumb")
|
|
job.current_file_num+=1
|
|
return
|
|
e.file_details.hash = new_hash
|
|
if e.type.name == 'Image':
|
|
e.file_details.thumbnail = GenImageThumbnail( job, e.FullPathOnFS() )
|
|
elif e.type.name == 'Video':
|
|
e.file_details.thumbnail = GenVideoThumbnail( job, e.FullPathOnFS() )
|
|
elif e.type.name == 'Unknown':
|
|
job.current_file_num+=1
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# ProcessFilesInDir(): take an entry, if its a Dir recurse into it, if its a
|
|
# File, call file_func on it
|
|
####################################################################################################################################
|
|
def ProcessFilesInDir(job, e, file_func, count_dirs):
|
|
if DEBUG:
|
|
print( f"DEBUG: ProcessFilesInDir: {e.FullPathOnFS()}")
|
|
if e.type.name != 'Directory':
|
|
file_func(job, e)
|
|
else:
|
|
d=session.query(Dir).filter(Dir.eid==e.id).first()
|
|
if count_dirs:
|
|
job.current_file_num+=1
|
|
files = session.query(Entry).join(EntryDirLink).filter(EntryDirLink.dir_eid==d.eid).all()
|
|
for sub in files:
|
|
ProcessFilesInDir(job, sub, file_func, count_dirs)
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# JobGetFileDetails(): job to get the file details (hashes & thumbnails)
|
|
####################################################################################################################################
|
|
def JobGetFileDetails(job):
|
|
JobProgressState( job, "In Progress" )
|
|
path=[jex.value for jex in job.extra if jex.name == "path"][0]
|
|
path_prefix=[jex.value for jex in job.extra if jex.name == "path_prefix"][0]
|
|
if DEBUG:
|
|
print( f"DEBUG: JobGetFileDetails for path={path_prefix}" )
|
|
p=session.query(Path).filter(Path.path_prefix==path_prefix).first()
|
|
job.current_file_num = 0
|
|
job.num_files = p.num_files
|
|
session.commit()
|
|
RunFuncOnFilesInPath( job, path_prefix, GenHashAndThumb, True )
|
|
FinishJob(job, "File Details job finished")
|
|
session.commit()
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# isVideo(): use MediaInfo python lib to see if the file is a video or not
|
|
####################################################################################################################################
|
|
def isVideo(file):
|
|
try:
|
|
fileInfo = MediaInfo.parse(file)
|
|
for track in fileInfo.tracks:
|
|
if track.track_type == "Video":
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
####################################################################################################################################
|
|
# Returns an md5 hash of the fnames' contents
|
|
####################################################################################################################################
|
|
def md5(job, e):
|
|
hash_md5 = hashlib.md5()
|
|
with open(e.FullPathOnFS(), "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
hash = hash_md5.hexdigest()
|
|
AddLogForJob( job, "Generated md5 hash: {} for file: {}".format( hash, e.FullPathOnFS() ) )
|
|
e.file_details.last_hash_date = time.time()
|
|
return hash
|
|
|
|
####################################################################################################################################
|
|
# isImage(): use python PIL library, if we can open it as an image, it is one, so return True
|
|
####################################################################################################################################
|
|
def isImage(file):
|
|
try:
|
|
Image.open(file)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
####################################################################################################################################
|
|
# GenImageThumbnail(): log and then generate the thumb for a file
|
|
# TODO: this is now sort of pointless with moving the body to shared...
|
|
####################################################################################################################################
|
|
def GenImageThumbnail(job, file):
|
|
ProcessFileForJob( job, "Generate Thumbnail from Image file: {}".format( file ), file )
|
|
settings = session.query(Settings).first()
|
|
thumb, _, _ = GenThumb(file, settings.auto_rotate)
|
|
return thumb
|
|
|
|
####################################################################################################################################
|
|
# GenVideoThumbnail(): log and then generate the thumb for a video (this grabs the width/height of a frame from the video,
|
|
# and then reads the first few frames until the mean() fo the frame indicates its not just black frame, then make the thumbnail
|
|
####################################################################################################################################
|
|
def GenVideoThumbnail( job, fname):
|
|
ProcessFileForJob( job, f"Generate Thumbnail from Video file: {fname}", fname )
|
|
height = THUMBSIZE
|
|
try:
|
|
probe = ffmpeg.probe(fname)
|
|
time = float(probe['streams'][0]['duration']) // 2
|
|
tmp_fname='/tmp/pa_tmp_'+os.path.basename(fname.split('.')[0])+'.jpg'
|
|
(
|
|
ffmpeg
|
|
.input(fname, ss=time)
|
|
.filter('scale', -1, height)
|
|
.output(tmp_fname, vframes=1)
|
|
.overwrite_output()
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
thumbnail, w, h = GenThumb( tmp_fname, False )
|
|
os.remove( tmp_fname )
|
|
except ffmpeg.Error as e:
|
|
AddLogForJob( job, f"ERROR: Failed to Generate thumbnail for video file: {fname} - error={e}" )
|
|
return None
|
|
|
|
return thumbnail
|
|
|
|
####################################################################################################################################
|
|
# utility function to clear any other future Duplicate messages, called if we
|
|
# either create a "new" CheckDups (often del/restore related), OR because we
|
|
# are actualyl handling the dups now from a front-end click through to
|
|
# /removedups, but some other job has since created another dup message...
|
|
####################################################################################################################################
|
|
def ClearOtherDupMessagesAndJobs():
|
|
msgs=session.query(PA_JobManager_FE_Message).join(Job).filter(Job.name=='checkdups')
|
|
for msg in msgs:
|
|
session.query(PA_JobManager_FE_Message).filter(PA_JobManager_FE_Message.id==msg.id).delete()
|
|
cd_jobs=session.query(Job).filter(Job.name=='checkdups').filter(Job.pa_job_state=='New').all()
|
|
for j in cd_jobs:
|
|
FinishJob(j, "New CheckForDups job/removal supercedes this job, withdrawing it", "Withdrawn")
|
|
session.commit()
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# JobCheckForDups(): job to dig into the DB with sql, find duplicates - if there are any, pop a F/E status to say so
|
|
####################################################################################################################################
|
|
def JobCheckForDups(job):
|
|
JobProgressState( job, "In Progress" )
|
|
AddLogForJob( job, f"Check for duplicates" )
|
|
ClearOtherDupMessagesAndJobs()
|
|
|
|
res = session.execute( "select count(e1.id) from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb" )
|
|
for row in res:
|
|
if row.count > 0:
|
|
AddLogForJob(job, f"Found duplicates, Creating Status message in front-end for attention")
|
|
MessageToFE( job.id, "danger", f'Found duplicate(s), click <a href="javascript:document.body.innerHTML+=\'<form id=_fm method=POST action=/fix_dups></form>\'; document.getElementById(\'_fm\').submit();">here</a> to finalise import by removing duplicates' )
|
|
else:
|
|
FinishJob(job, f"No duplicates found")
|
|
FinishJob(job, f"Finished looking for duplicates")
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# JobRemoveDups(): job to go through the f/e nominated keep/del data and then put a new CheckDups() in, as the user may have only
|
|
# partially addressed them (e.g. the first 10 dups...)
|
|
####################################################################################################################################
|
|
def JobRemoveDups(job):
|
|
JobProgressState( job, "In Progress" )
|
|
AddLogForJob(job, f"INFO: Starting Remove Duplicates job...")
|
|
# as checkdups covers all dups, delete all future dups messages, and Withdraw future checkdups jobs
|
|
ClearOtherDupMessagesAndJobs()
|
|
|
|
dup_cnt=0
|
|
for jex in job.extra:
|
|
if 'kfid-' in jex.name:
|
|
_, which = jex.name.split('-')
|
|
hash=[jex.value for jex in job.extra if jex.name == f"kfhash-{which}"][0]
|
|
AddLogForJob(job, f"deleting duplicate files with hash: {hash} but keeping file with DB id={jex.value}" )
|
|
files=session.query(Entry).join(File).filter(File.hash==hash).all()
|
|
keeping=jex.value
|
|
found=None
|
|
del_me_lst = []
|
|
for f in files:
|
|
if os.path.isfile( f.FullPathOnFS() ) == False:
|
|
AddLogForJob( job, f"ERROR: (per file del) file (DB id: {f.id} - {f.FullPathOnFS()}) does not exist? ignorning file")
|
|
elif f.id == int(keeping):
|
|
found = f
|
|
else:
|
|
del_me_lst.append(f)
|
|
if found == None:
|
|
AddLogForJob( job, f"ERROR: (per file dup) Cannot find file with hash={hash} to process - skipping it)" )
|
|
else:
|
|
AddLogForJob(job, f"Keep duplicate file: {found.FullPathOnFS()}" )
|
|
for del_me in del_me_lst:
|
|
AddLogForJob(job, f"Remove duplicate (per file dup) file: {del_me.FullPathOnFS()}" )
|
|
MoveFileToRecycleBin(job,del_me)
|
|
|
|
if 'kdid-' in jex.name:
|
|
_, which = jex.name.split('-')
|
|
hashes=[jex.value for jex in job.extra if jex.name == f"kdhash-{which}"][0]
|
|
keeping=jex.value
|
|
tmp=session.query(Dir).filter(Dir.eid==keeping).first()
|
|
AddLogForJob(job, f"Keeping files in {tmp.PathOnFS()}" )
|
|
for hash in hashes.split(","):
|
|
files=session.query(Entry).join(File).filter(File.hash==hash).all()
|
|
found=None
|
|
del_me=None
|
|
for f in files:
|
|
if os.path.isfile(f.FullPathOnFS()) == False:
|
|
AddLogForJob( job, f"ERROR: (per path del) file (DB id: {f.id} - {f.FullPathOnFS()}) does not exist? ignorning file")
|
|
if f.in_dir.eid == int(keeping):
|
|
found=f
|
|
else:
|
|
del_me=f
|
|
|
|
if found == None:
|
|
AddLogForJob( job, f"ERROR: (per path dup - dir id={keeping}) Cannot find file with hash={hash} to process - skipping it)" )
|
|
else:
|
|
AddLogForJob(job, f"Keep duplicate file: {found.FullPathOnFS()}" )
|
|
AddLogForJob(job, f"Remove duplicate (per path dup) file: {del_me.FullPathOnFS()}" )
|
|
MoveFileToRecycleBin(job,del_me)
|
|
dup_cnt += 1
|
|
|
|
FinishJob(job, f"Finished removing {dup_cnt} duplicate files" )
|
|
|
|
# Need to put another checkdups job in now to force / validate we have no dups
|
|
next_job=NewJob( "checkdups" )
|
|
AddLogForJob(job, "adding <a href='/job/{}'>job id={} {}</a> to confirm there are no more duplicates".format( next_job.id, next_job.id, next_job.name ) )
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# JobMoveFiles(): moves files from a specified location (usually in the import dir) to a specificed location (usually a brand new
|
|
# folder in the storage dir)
|
|
####################################################################################################################################
|
|
def JobMoveFiles(job):
|
|
AddLogForJob(job, f"INFO: Starting Move Files job...")
|
|
JobProgressState( job, "In Progress" )
|
|
prefix=[jex.value for jex in job.extra if jex.name == "prefix"][0]
|
|
suffix=[jex.value for jex in job.extra if jex.name == "suffix"][0]
|
|
# Sanity check, if prefix starts with /, reject it -> no /etc/shadow potentials
|
|
# Sanity check, if .. in prefix or suffix, reject it -> no ../../etc/shadow potentials
|
|
# Sanity check, if // in prefix or suffix, reject it -> not sure code wouldnt try to make empty dirs, and I dont want to chase /////// cases, any 2 in a row is enough to reject
|
|
if '..' in prefix or '..' in suffix or (prefix and prefix[0] == '/') or '//' in prefix or '//' in suffix:
|
|
FinishJob( job, f"ERROR: Not processing move as the paths contain illegal chars", "Failed" )
|
|
return
|
|
# also remove unecessary slashes, jic
|
|
prefix=prefix.rstrip('/')
|
|
suffix=suffix.lstrip('/').rstrip('/')
|
|
path_type=[jex.value for jex in job.extra if jex.name == "move_path_type"][0]
|
|
rel_path=[jex.value for jex in job.extra if jex.name == "rel_path"][0]
|
|
dst_storage_path = session.query(Path).filter(Path.path_prefix=='static/' + path_type + '/'+ rel_path).first()
|
|
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
move_me=session.query(Entry).get(jex.value)
|
|
MoveEntriesToOtherFolder( job, move_me, dst_storage_path, f"{prefix}{suffix}" )
|
|
next_job=NewJob( "checkdups" )
|
|
MessageToFE( job.id, "success", "Completed (move of selected files)" )
|
|
FinishJob(job, f"Finished move selected file(s)")
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# JobDeleteFiles(): job to delete specified files (chosen in f/e)
|
|
####################################################################################################################################
|
|
def JobDeleteFiles(job):
|
|
AddLogForJob(job, f"INFO: Starting Delete Files job...")
|
|
JobProgressState( job, "In Progress" )
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
del_me=session.query(Entry).join(File).filter(Entry.id==jex.value).first()
|
|
MoveFileToRecycleBin(job,del_me)
|
|
next_job=NewJob( "checkdups" )
|
|
MessageToFE( job.id, "success", "Completed (delete of selected files)" )
|
|
FinishJob(job, f"Finished deleting selected file(s)")
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# JobRestoreFiles(): if in the Bin path the user can restore specified files (chosen in f/e)
|
|
####################################################################################################################################
|
|
def JobRestoreFiles(job):
|
|
AddLogForJob(job, f"INFO: Starting Restore Files job...")
|
|
JobProgressState( job, "In Progress" )
|
|
for jex in job.extra:
|
|
if 'eid-' in jex.name:
|
|
restore_me=session.query(Entry).join(File).filter(Entry.id==jex.value).first()
|
|
RestoreFile(job,restore_me)
|
|
next_job=NewJob( "checkdups" )
|
|
MessageToFE( job.id, "success", "Completed (restore of selected files)" )
|
|
FinishJob(job, f"Finished restoring selected file(s)")
|
|
return
|
|
|
|
|
|
####################################################################################################################################
|
|
# CopyOverrides(): copies the overrides from 4 override tbls into tmp_<tbl>s
|
|
# Metadata is only going to be used in cases where the DB does not have the
|
|
# overrides that were once put in by hand - we are extra-careful processing
|
|
# these, so we check there is a metadata path, that we aren't in the middle of
|
|
# processing metadata when we Init (which will show up as tmp_<tbl> still
|
|
# existing
|
|
####################################################################################################################################
|
|
def CopyOverrides():
|
|
try:
|
|
for tbl in override_tbls:
|
|
session.execute( f"select * into tmp_{tbl} from {tbl}")
|
|
# force a commit here - I want to fail before I delete override content
|
|
session.commit()
|
|
# now take all 4 override tables in DB and clear them out
|
|
for tbl in override_tbls:
|
|
session.execute( f"delete from {tbl}" )
|
|
session.commit()
|
|
except Exception as ex:
|
|
print( f"ERROR: there are existing tmp tables when processing metadata. This SHOULD NEVER HAPPEN - manual intervention needed" )
|
|
print( f"ERROR: most likely the job manager was killed during processing metadata - you may want to manually put" )
|
|
print( f"ERROR: the contents of the 'tmp_*' tables back into their corresponding official metadata tables " )
|
|
print( f"ERROR: and try to restart the job manager" )
|
|
exit( 1 )
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# GetFaceInMetadata(fname): quick wrapper to return face as binary data from
|
|
# metdata file 'fname'
|
|
####################################################################################################################################
|
|
def GetFaceInMetadata(fname):
|
|
try:
|
|
file_h=open(fname, "rb")
|
|
face_data=file_h.read(-1)
|
|
file_h.close()
|
|
except Exception as ex:
|
|
print( f"ERROR: FATAL tried to read in override data and cant read content" )
|
|
print( f"ERROR: manual intervention needed - exc={ex}" )
|
|
exit(1)
|
|
return face_data
|
|
|
|
####################################################################################################################################
|
|
# ReloadMetadata(): reads in any metadata and puts it back into the DB (if needed)
|
|
# Metadata will be disconnected overrides & eventually actual metadata we store per file
|
|
# see https://wiki.depaoli.id.au/en/shared/photoassistant/metadata for detailed
|
|
# explanation of this function
|
|
####################################################################################################################################
|
|
def ReloadMetadata(job):
|
|
AddLogForJob(job, f"INFO: Loading/Retrieving any Metatdata...")
|
|
|
|
# no path, then no metadata (probably first ever run)
|
|
mpath = SettingsMPath()
|
|
if not mpath:
|
|
FinishJob( job, "No metadata path - skipping" )
|
|
return False
|
|
|
|
# copy overrides into tmp tables
|
|
CopyOverrides()
|
|
|
|
# process Metadata on FS for no_match_overrides (disco ones, will have 0 as face_id)
|
|
fnames = glob.glob( f'{mpath}/no_match_overrides/*' )
|
|
for fname in fnames:
|
|
# type derived from fname (e.g. 0_Too Young_uuid*, 1_Too Young, 2_Ingore Face, etc.)
|
|
match=re.search( '(\d+)_([^_\.]+)', fname )
|
|
face_id=match.group(1)
|
|
type_name=match.group(2)
|
|
otype = session.query(FaceOverrideType).filter(FaceOverrideType.name==type_name).one()
|
|
face_data=GetFaceInMetadata(fname)
|
|
if DEBUG:
|
|
print( f"Found metadata showing Override of type: {type_name}" )
|
|
|
|
# check that both the id and data match - if so make new FaceNoMatch otherwise Disco*FaceNoMatch
|
|
face=session.query( Face ).filter( Face.id==face_id ).filter( Face.face == face_data ). first()
|
|
if face:
|
|
session.add( FaceNoMatchOverride( face_id=face_id, type_id=otype.id ) )
|
|
else:
|
|
session.add( DisconnectedNoMatchOverride( face=face_data, type_id=otype.id ) )
|
|
if face_id:
|
|
try:
|
|
os.replace( fname, f'{mpath}no_match_overrides/0_{otype.name}_{uuid.uuid4()}' )
|
|
except Exception as ex:
|
|
print( f"ERROR: renaming no-match metadata on filesystem failed: {ex}" )
|
|
|
|
# process Metadata on FS for force_match_overrides (disco ones, will have 0 as face_id)
|
|
fnames = glob.glob( f'{mpath}force_match_overrides/*' )
|
|
for fname in fnames:
|
|
# person derived from fname (e.g. 0_ddp_uuid*, 1_ddp, 2_mich, etc.)
|
|
match=re.search( '(\d+)_([^_]+)', fname )
|
|
face_id=match.group(1)
|
|
person_tag=match.group(2)
|
|
p = session.query(Person).filter(Person.tag==person_tag).first()
|
|
if not p:
|
|
print( f"There is a metadata override on the file system for person: {person_tag} - but they are no longer in the DB - skip" )
|
|
continue
|
|
face_data=GetFaceInMetadata(fname)
|
|
if DEBUG:
|
|
print( f"Found metadata showing Override match for person: {person_tag}" )
|
|
|
|
# check that both the id and data match - if so make new FaceNoMatch otherwise Disco*FaceNoMatch
|
|
face=session.query( Face ).filter( Face.id==face_id ).filter( Face.face == face_data ).first()
|
|
if face:
|
|
session.add( FaceForceMatchOverride( face_id=face_id, person_id=p.id ) )
|
|
else:
|
|
session.add( DisconnectedForceMatchOverride( face=face_data, person_id=p.id ) )
|
|
# if face>0, then we need to move the FS copy to a disco
|
|
if face_id:
|
|
try:
|
|
os.replace( fname, f'{mpath}force_match_overrides/0_{p.tag}_{uuid.uuid4()}' )
|
|
except Exception as ex:
|
|
print( f"ERROR: renaming force-match metadata on filesystem failed: {ex}" )
|
|
|
|
|
|
# now process each of the tmp tables for anything that was in the DB but not on FS (e.g rm'd metadata)
|
|
overrides=session.execute( "select face_id, type_id from tmp_face_no_match_override" )
|
|
for o in overrides:
|
|
print( f"F Force Match: o.face_id={o.face_id}" )
|
|
print( f"F No Match: o.type_id={o.type_id}" )
|
|
nmo=session.query(FaceNoMatchOverride).filter(FaceNoMatchOverride.face_id==o.face_id).filter(FaceNoMatchOverride.type_id==o.type_id).first()
|
|
if not nmo:
|
|
session.add( FaceNoMatchOverride( face_id=o.face_id, type_id=o.type_id ) )
|
|
|
|
overrides=session.execute( "select face_id, person_id from tmp_face_force_match_override" )
|
|
for o in overrides:
|
|
print( f"F Force Match: o.face_id={o.face_id}" )
|
|
print( f"F Force Match: o.person_id={o.person_id}" )
|
|
fmo=session.query(FaceForceMatchOverride).filter(FaceForceMatchOverride.face_id==o.face_id,FaceForceMatchOverride.person_id==o.person_id).first()
|
|
if not fmo:
|
|
session.add( FaceForceMatchOverride( face_id=o.face_id, person_id=o.person_id ) )
|
|
|
|
overrides=session.execute( "select face, type_id from tmp_disconnected_no_match_override" )
|
|
for o in overrides:
|
|
print( f"D No Match: o.type_id={o.type_id}" )
|
|
dnmo=session.query(DisconnectedNoMatchOverride).filter(DisconnectedNoMatchOverride.face==o.face).filter(DisconnectedNoMatchOverride.type_id==o.type_id).first()
|
|
if not dnmo:
|
|
session.add( DisconnectedNoMatchOverride( face=o.face, type_id=o.type_id ) )
|
|
|
|
overrides=session.execute( "select face, person_id from tmp_disconnected_force_match_override" )
|
|
for o in overrides:
|
|
print( f"D Force Match: o.person_id={o.person_id}" )
|
|
dfmo=session.query(DisconnectedForceMatchOverride).filter(DisconnectedForceMatchOverride.face==o.face).filter(DisconnectedForceMatchOverride.person_id==o.person_id).first()
|
|
if not dfmo:
|
|
session.add( DisconnectedForceMatchOverride( face=o.face, person_id=o.person_id ) )
|
|
|
|
# finally, drop the tmp tables
|
|
for tbl in override_tbls:
|
|
session.execute( f"drop table tmp_{tbl}" )
|
|
|
|
# ok, finally commit all these changes - dont do this until now. Worst case if we crash/fail, the overrides should continue to be in tmp_{tbl}
|
|
session.commit()
|
|
|
|
return
|
|
|
|
|
|
####################################################################################################################################
|
|
# InitialValidationChecks(): checks paths (and dirs) exist in DB on first run.
|
|
# IF path from settings does not exists - log it
|
|
# If there is content in the Bin already, its logs this - mostly useful when testing)
|
|
####################################################################################################################################
|
|
def InitialValidationChecks():
|
|
now=datetime.now(pytz.utc)
|
|
job=NewJob( "init" )
|
|
job.start_time=datetime.now(pytz.utc)
|
|
JobProgressState( job, "In Progress" )
|
|
AddLogForJob(job, f"INFO: Starting Initial Validation checks...")
|
|
path=SettingsRBPath()
|
|
rbp_exists=0
|
|
if os.path.exists(path):
|
|
try:
|
|
root, dirs, files = next(os.walk(path))
|
|
if len(dirs) + len(files) > 0:
|
|
AddLogForJob(job, "INFO: the bin path contains content, cannot process to know where original deletes were form - skipping content!" )
|
|
AddLogForJob(job, "TODO: could be smart about what is known in the DB vs on the FS, and change below to an ERROR if it is one")
|
|
AddLogForJob(job, "WARNING: IF the files in the bin are in the DB (succeeded from GUI deletes) then this is okay, otherwise you should delete contents form the recycle bin and restart the job manager)" )
|
|
# create symlink and Path/Dir if needed
|
|
ProcessRecycleBinDir(job)
|
|
rbp_exists=1
|
|
except Exception as ex:
|
|
print( f"FATAL ERROR: Failed to walk the recycle bin at {path} Err:{ex}" )
|
|
else:
|
|
AddLogForJob(job, "ERROR: The bin path in settings does not exist - Please fix now");
|
|
sp_exists=0
|
|
path = SettingsSPath()
|
|
if os.path.exists(path):
|
|
sp_exists=1
|
|
ptype = session.query(PathType).filter(PathType.name=='Storage').first().id
|
|
symlink=CreateSymlink(job,ptype,path)
|
|
if not sp_exists:
|
|
AddLogForJob(job, "ERROR: the storage path in the settings does not exist - Please fix now");
|
|
ip_exists=0
|
|
path = SettingsIPath()
|
|
if os.path.exists(path):
|
|
ip_exists=1
|
|
ptype = session.query(PathType).filter(PathType.name=='Import').first().id
|
|
symlink=CreateSymlink(job,ptype,path)
|
|
if not ip_exists:
|
|
AddLogForJob(job, "ERROR: the import path in the settings does not exist - Please fix now");
|
|
|
|
path=SettingsMPath()
|
|
mp_exists=0
|
|
if os.path.exists(path):
|
|
mp_exists=1
|
|
ptype = session.query(PathType).filter(PathType.name=='Metadata').first().id
|
|
symlink=CreateSymlink(job,ptype,path)
|
|
if not mp_exists:
|
|
AddLogForJob(job, "ERROR: The metadata path in settings does not exist - Please fix now");
|
|
|
|
if not rbp_exists or not sp_exists or not ip_exists or not mp_exists:
|
|
FinishJob(job,"ERROR: Job manager EXITing until above errors are fixed by paths being created or settings being updated to valid paths", "Failed" )
|
|
exit(-1)
|
|
|
|
ReloadMetadata(job)
|
|
FinishJob(job,"Finished Initial Validation Checks")
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# AddFaceToFile(): adds the specified face, location & model_used to the specified file
|
|
####################################################################################################################################
|
|
def AddFaceToFile( locn_data, face_data, file_eid, model_id, settings ):
|
|
w = locn_data[1] - locn_data[3]
|
|
h = locn_data[2] - locn_data[0]
|
|
if w < settings.face_size_limit or h < settings.face_size_limit:
|
|
return
|
|
face = Face( face=face_data.tobytes(), w=w, h=h,
|
|
face_top=locn_data[0], face_right=locn_data[1], face_bottom=locn_data[2], face_left=locn_data[3] )
|
|
session.add(face)
|
|
session.commit()
|
|
ffl = FaceFileLink( face_id=face.id, file_eid=file_eid, model_used=model_id )
|
|
session.add(ffl)
|
|
session.commit()
|
|
|
|
# See if this face is included in any Disconnected overrides, if so copy it
|
|
# back to override connected to this/new face_id for same old face :)
|
|
dfmo=session.query(DisconnectedForceMatchOverride).filter(DisconnectedForceMatchOverride.face==face.face).first()
|
|
if dfmo:
|
|
session.add( FaceForceMatchOverride( face_id=face.id, person_id=dfmo.person_id ) )
|
|
session.query(DisconnectedForceMatchOverride).filter(DisconnectedForceMatchOverride.face==dfmo.face).delete()
|
|
# move metadata from Disco to Normal
|
|
p=session.query(Person).get(dfmo.person_id)
|
|
path=f'{SettingsMPath()}/force_match_overrides/'
|
|
try:
|
|
# can only be 1 match with the * being a UUID
|
|
fname=glob.glob( f'{path}0_{p.tag}_*' )[0]
|
|
new_fname=f'{path}{face.id}_{p.tag}'
|
|
os.replace( fname, new_fname )
|
|
except Exception as ex:
|
|
print( f"ERROR: AddFaceToFile-face connects to 'disconnected-force-match' metadata, but fixing the filesystem metadata failed: {ex}" )
|
|
|
|
dnmo=session.query(DisconnectedNoMatchOverride).filter(DisconnectedNoMatchOverride.face==face.face).first()
|
|
if dnmo:
|
|
session.add( FaceNoMatchOverride( face_id=face.id, type_id=dnmo.type_id ) )
|
|
session.query(DisconnectedNoMatchOverride).filter(DisconnectedNoMatchOverride.face==dnmo.face).delete()
|
|
# move metadata from Disco to Normal
|
|
t=session.query(FaceOverrideType).get(dnmo.type_id)
|
|
path=f'{SettingsMPath()}/no_match_overrides/'
|
|
try:
|
|
# can only be 1 match with the * being a UUID
|
|
fname=glob.glob( f'{path}0_{t.name}_*' )[0]
|
|
new_fname=f'{path}{face.id}_{t.name}'
|
|
os.replace( fname, new_fname )
|
|
except Exception as ex:
|
|
print( f"ERROR: AddFaceToFile-face connects to 'disconnected-no-match' metadata, but fixing the filesystem metadata failed: {ex}" )
|
|
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# DelMatchesForDir(): quick func to delete any matched faces in the specified dir
|
|
####################################################################################################################################
|
|
def DelMatchesForDir( job, eid ):
|
|
dir=session.query(Entry).get(eid)
|
|
ProcessFilesInDir( job, dir, DelMatchesForFile, False )
|
|
session.commit()
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# DelMatchesForFile(): quick func to delete any matched faces associated with the specified file
|
|
####################################################################################################################################
|
|
def DelMatchesForFile( job, ent ):
|
|
if DEBUG:
|
|
AddLogForJob(job, f'Remove any old matches in {ent.name}')
|
|
|
|
session.execute( f"delete from face_refimg_link where face_id in (select face_id from face_file_link where file_eid = {ent.id})" )
|
|
ent.file_details.last_ai_scan=0
|
|
session.add(ent)
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# DelFacesForFile(job, eid): quick func to delete any faces associated with the specified file
|
|
####################################################################################################################################
|
|
def DelFacesForFile( job, eid ):
|
|
ffl=session.query(FaceFileLink).filter(FaceFileLink.file_eid==eid).all()
|
|
|
|
for link in ffl:
|
|
# find any forced match overrides on this face (before we delete it, and put them into the disc* table)
|
|
o=session.query(FaceForceMatchOverride).filter(FaceForceMatchOverride.face_id==link.face_id).first()
|
|
if o:
|
|
DisconnectSingleForceMatchOverride(job, o )
|
|
|
|
# find any no-match overrides on this face (before we delete it, and put them into the disc* table)
|
|
o=session.query(FaceNoMatchOverride).filter(FaceNoMatchOverride.face_id==link.face_id).first()
|
|
if o:
|
|
DisconnectSingleNoMatchOverride( job, o )
|
|
|
|
session.execute( f"delete from face where id in (select face_id from face_file_link where file_eid = {eid})" )
|
|
|
|
session.commit()
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# MatchRefimgToFace(): take specified refimg and a face & distance and connect
|
|
# them in the DB (e.g. Mark this face as matching this refimge with this face distance)
|
|
####################################################################################################################################
|
|
def MatchRefimgToFace( refimg_id, face_id, face_dist ):
|
|
# remove any match to this face from previous attempts, and 'replace' with new one
|
|
session.query(FaceRefimgLink).filter(FaceRefimgLink.face_id==face_id).delete()
|
|
rfl = FaceRefimgLink( refimg_id = refimg_id, face_id = face_id, face_distance=face_dist )
|
|
session.add(rfl)
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# Util function to remove the matching face number (which_f) from the dist array (which has dist[who][<face_num>])...
|
|
####################################################################################################################################
|
|
def RemoveFaceNumFromDist(dist, which_f ):
|
|
for who in dist:
|
|
if which_f in dist[who]:
|
|
del( dist[who][which_f] )
|
|
next
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# go through dist array and find the best single match (lowest face distance)
|
|
# returning the matching refimg (which_r), and face num (which_f), and dist (which_fd)
|
|
####################################################################################################################################
|
|
def FindBestFaceMatch( dist, threshold ):
|
|
which_r=None
|
|
which_f=None
|
|
which_fd=None
|
|
lowest=1.0
|
|
for who in dist:
|
|
for fid in dist[who]:
|
|
if dist[who][fid][0] < lowest and dist[who][fid][0] <= threshold:
|
|
which_r=who
|
|
which_f=fid
|
|
which_fd=dist[who][fid][0]
|
|
lowest=which_fd
|
|
return which_r, which_f, which_fd
|
|
|
|
####################################################################################################################################
|
|
# Okay, go through dist array and find the best single match via FindBestFaceMatch(),
|
|
# then record that match and remove that refimg/person and that face number
|
|
# from the dist array, and rinse/repeat until we have no more faces to match,
|
|
# or the 'best' match is > threshold, so no more matches...
|
|
####################################################################################################################################
|
|
def ProcessFaceMatches( job, dist, threshold, e, name ):
|
|
while True:
|
|
which_r, which_f, which_fd = FindBestFaceMatch( dist, threshold )
|
|
if which_r != None:
|
|
MatchRefimgToFace( which_r, which_f, which_fd )
|
|
AddLogForJob(job, f'WE MATCHED: {name[which_r]} with file: {e.name} - face distance of {which_fd}')
|
|
# remove this refimg completely, cant be 2 of this person matched
|
|
del( dist[which_r] )
|
|
# remove this face id completely, this face cant be matched by someone else
|
|
RemoveFaceNumFromDist( dist, which_f )
|
|
else:
|
|
return
|
|
|
|
|
|
####################################################################################################################################
|
|
# ScanFileForPerson(): for a file, check to see if a person is matched via face_recognition
|
|
#
|
|
# NOTE: can pass force into this, but no f/e to trip this yet
|
|
# if we do not have (any) faces for this file, go get them and their locations and store them in the DB assocaited with this file
|
|
# then for each face (known/matched already or not), create a new array 'dist[refimg][face]' and run face_recognition code
|
|
# to calculate the distance between the refimg and this face, for each refimg and each face, then call ProcessFaceMatches() to
|
|
# go through the best matches (1-by-1) until no more faces match and store matching faces in DB
|
|
####################################################################################################################################
|
|
def ScanFileForPerson( job, e, force=False ):
|
|
# get default_scan_model from settings (test this)
|
|
settings = session.query(Settings).first()
|
|
model=session.query(AIModel).get(settings.default_scan_model)
|
|
threshold = settings.default_threshold
|
|
|
|
# add log, set current_file and increment file_num in job
|
|
if DEBUG:
|
|
ProcessFileForJob( job, f'DEBUG: processing File: {e.name} and threshold face distance of {threshold}', e.name )
|
|
else:
|
|
job.current_file=e.name
|
|
if job.num_files:
|
|
job.current_file_num=job.current_file_num+1
|
|
file_h = session.query(File).get( e.id )
|
|
# if we are forcing this, delete any old faces (this will also delete linked tables), and reset faces_created_on to None
|
|
if force:
|
|
AddLogForJob( job, f'INFO: force is true, so deleting old face information for {e.name}' )
|
|
DelFacesForFile( job, e.id )
|
|
file_h.faces_created_on = 0
|
|
|
|
# optimise: dont rescan if we already have faces
|
|
if file_h.faces_created_on == 0:
|
|
if DEBUG:
|
|
AddLogForJob( job, f"DEBUG: {e.name} is missing unknown faces, generating them with model {model.name}" )
|
|
im = face_recognition.load_image_file(e.FullPathOnFS())
|
|
face_locations = face_recognition.face_locations(im, model=model.name )
|
|
unknown_encodings = face_recognition.face_encodings(im, known_face_locations=face_locations)
|
|
for locn, face in zip( face_locations, unknown_encodings ):
|
|
AddFaceToFile( locn, face, e.id, model.id, settings )
|
|
file_h.faces_created_on = time.time()
|
|
|
|
faces = session.query(Face).join(FaceFileLink).filter(FaceFileLink.file_eid==e.id).all()
|
|
# if there are no faces for this file, then dont go any futher
|
|
if not faces:
|
|
file_h.last_ai_scan = time.time()
|
|
return
|
|
|
|
ri_newest = session.query(func.max(Refimg.created_on)).first()[0]
|
|
|
|
# ri_newest has to exist, no ris and we dont process files but, if we have never scanned before, then last_ai_scan will be None
|
|
# if last_ai_scan is newer than the most recent refimg created, no need to look again, we have checked those refimgs in the last scan, just skip this file
|
|
if file_h.last_ai_scan and file_h.last_ai_scan > ri_newest:
|
|
return
|
|
|
|
dist={}
|
|
name={}
|
|
for r in job.refimgs:
|
|
dist[r.id]={}
|
|
name[r.id]=r.fname
|
|
for face in faces:
|
|
for r in job.refimgs:
|
|
unknown_face_data = numpy.frombuffer(face.face, dtype=numpy.float64)
|
|
refimg_face_data = numpy.frombuffer(r.face, dtype=numpy.float64)
|
|
dist[r.id][face.id] = face_recognition.face_distance(unknown_face_data, [refimg_face_data])
|
|
|
|
# record matches in DB...
|
|
ProcessFaceMatches( job, dist, threshold, e, name )
|
|
file_h.last_ai_scan = time.time()
|
|
return
|
|
|
|
####################################################################################################################################
|
|
# DaysSinceLastScan(): which == "Import" or "Storage" and is used to match the path_type, and then the Dir entry for the root Dir
|
|
# of that path. Then calc days since last scan and return the oldest for all Dir(s) in which Path (as scan works on path, not dir)
|
|
####################################################################################################################################
|
|
def DaysSinceLastScan(which):
|
|
now=time.time()
|
|
oldest=0
|
|
dirs=session.query(Dir).join(PathDirLink).join(Path).join(PathType).filter(PathType.name==which).filter(Dir.rel_path=='').all()
|
|
for d in dirs:
|
|
last_scan_days_ago = (now - d.last_import_date) / SECS_IN_A_DAY
|
|
if last_scan_days_ago > oldest:
|
|
oldest=last_scan_days_ago
|
|
return oldest
|
|
|
|
####################################################################################################################################
|
|
# CheckAndRunBinClean(): calc days since last job ran to clean out the Recycle Bin
|
|
####################################################################################################################################
|
|
def CheckAndRunBinClean():
|
|
created_jobs=False
|
|
# get most recent clean_bin job
|
|
j = session.query(Job).filter(Job.name=='clean_bin').order_by(Job.id.desc()).first()
|
|
settings = session.query(Settings).first()
|
|
|
|
now=datetime.now(pytz.utc)
|
|
if not j or (now-j.last_update).days >= settings.scheduled_bin_cleanup:
|
|
print( f"INFO: Should force clean up bin path, del files older than {settings.bin_cleanup_file_age} days old" )
|
|
job=NewJob( "clean_bin" )
|
|
created_jobs=True
|
|
return created_jobs
|
|
|
|
####################################################################################################################################
|
|
# ScheduledJobs() is triggered when any job is run, or when the socket times out once a day. It uses the settings to find any time
|
|
# based jobs that should run (e.g. last scanned a path X day(s) ago, then scan now), etc. X is defined in settings
|
|
####################################################################################################################################
|
|
def ScheduledJobs():
|
|
print("DEBUG: Time to check for any scheduled jobs needing to be run" )
|
|
|
|
created_jobs=False
|
|
|
|
ndays_since_last_im_scan = DaysSinceLastScan( "Import" )
|
|
ndays_since_last_st_scan = DaysSinceLastScan( "Storage" )
|
|
|
|
settings = session.query(Settings).first()
|
|
now=datetime.now(pytz.utc)
|
|
if ndays_since_last_im_scan >= settings.scheduled_import_scan:
|
|
print( f"INFO: Time to force an import scan, last scan was {ndays_since_last_im_scan} days ago" )
|
|
job=NewJob( "scannow" )
|
|
created_jobs=True
|
|
if ndays_since_last_st_scan >= settings.scheduled_storage_scan:
|
|
print( f"INFO: Time to force a storage scan, last scan was {ndays_since_last_st_scan}" )
|
|
job=NewJob( "scan_sp" )
|
|
created_jobs=True
|
|
if CheckAndRunBinClean():
|
|
created_jobs=True
|
|
return created_jobs
|
|
|
|
|
|
####################################################################################################################################
|
|
# MAIN - start with validation, then grab any jobs in the DB to process, then
|
|
# go into waiting on a socket to be woken up (and then if woken, back into HandleJobs()
|
|
####################################################################################################################################
|
|
if __name__ == "__main__":
|
|
print("INFO: PA job manager starting - listening on {}:{}".format( PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT) )
|
|
|
|
InitialValidationChecks()
|
|
|
|
HandleJobs(True)
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind((PA_JOB_MANAGER_HOST, PA_JOB_MANAGER_PORT))
|
|
# force timeout every 1 day so we can run scheduled jobs
|
|
s.settimeout(SECS_IN_A_DAY)
|
|
s.listen()
|
|
while True:
|
|
try:
|
|
conn, addr = s.accept()
|
|
if DEBUG:
|
|
print( f"accept finished, tout={s.timeout}" )
|
|
|
|
except socket.timeout:
|
|
if DEBUG:
|
|
print( f"timeout occurred, tout={s.timeout}" )
|
|
if ScheduledJobs():
|
|
HandleJobs(False)
|
|
continue
|
|
else:
|
|
HandleJobs(False)
|
|
# in case we constantly have jobs running, the '1 day' last import might be missed, so check it after each job too
|
|
if ScheduledJobs():
|
|
HandleJobs(False)
|