958 lines
44 KiB
Python
958 lines
44 KiB
Python
from flask_wtf import FlaskForm
|
|
from flask import request, render_template, redirect, send_from_directory, url_for, jsonify, make_response
|
|
from main import db, app, ma
|
|
from sqlalchemy import Sequence, text, select
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
import os
|
|
import glob
|
|
import json
|
|
from PIL import Image
|
|
from pymediainfo import MediaInfo
|
|
import hashlib
|
|
import exifread
|
|
import base64
|
|
import numpy
|
|
import cv2
|
|
import time
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
import html
|
|
from flask_login import login_required, current_user
|
|
from states import States, PA_UserState
|
|
from query import Query
|
|
|
|
# Local Class imports
|
|
################################################################################
|
|
from job import Job, JobExtra, Joblog, NewJob, SetFELog
|
|
from path import PathType, Path, MovePathDetails
|
|
from person import Refimg, Person, PersonRefimgLink
|
|
from settings import Settings, SettingsIPath, SettingsSPath, SettingsRBPath
|
|
from shared import SymlinkName
|
|
from dups import Duplicates
|
|
from face import Face, FaceFileLink, FaceRefimgLink, FaceOverrideType, FaceNoMatchOverride, FaceForceMatchOverride
|
|
|
|
# pylint: disable=no-member
|
|
|
|
################################################################################
|
|
# Class describing PathDirLink and in the DB (via sqlalchemy)
|
|
# connects the entry (dir) with a path
|
|
################################################################################
|
|
class PathDirLink(db.Model):
|
|
__tablename__ = "path_dir_link"
|
|
path_id = db.Column(db.Integer, db.ForeignKey("path.id"), primary_key=True )
|
|
dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<path_id: {self.path_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
################################################################################
|
|
# Class describing EntryDirLInk and in the DB (via sqlalchemy)
|
|
# connects (many) entry contained in a directory (which is also an entry)
|
|
################################################################################
|
|
class EntryDirLink(db.Model):
|
|
__tablename__ = "entry_dir_link"
|
|
entry_id = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
|
|
dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
################################################################################
|
|
# Class describing Dir and in the DB (via sqlalchemy)
|
|
# rel_path: rest of dir after path, e.g. if path = /..../storage, then
|
|
# rel_path could be 2021/20210101-new-years-day-pics
|
|
# in_path: only in this structure, not DB, quick ref to the path this dir is in
|
|
################################################################################
|
|
class Dir(db.Model):
|
|
__tablename__ = "dir"
|
|
eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
|
|
rel_path = db.Column(db.String, unique=True )
|
|
in_path = db.relationship("Path", secondary="path_dir_link", uselist=False)
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, rel_path: {self.rel_path}, in_path: {self.in_path}>"
|
|
|
|
################################################################################
|
|
# Class describing Entry and in the DB (via sqlalchemy)
|
|
# an entry is the common bits between files and dirs
|
|
# type is a convenience var only in this class, not in DB
|
|
# {dir|file}_etails are convenience data for the relevant details from the Dir
|
|
# or File class - not in DB
|
|
# in_dir - is the Dir that this entry is located in (convenience for class only)
|
|
# FullPathOnFS(): method to get path on the FS for this Entry
|
|
################################################################################
|
|
class Entry(db.Model):
|
|
__tablename__ = "entry"
|
|
id = db.Column(db.Integer, db.Sequence('file_id_seq'), primary_key=True )
|
|
name = db.Column(db.String, unique=False, nullable=False )
|
|
type_id = db.Column(db.Integer, db.ForeignKey("file_type.id"))
|
|
type = db.relationship("FileType")
|
|
dir_details = db.relationship( "Dir", uselist=False )
|
|
file_details = db.relationship( "File", uselist=False )
|
|
in_dir = db.relationship ("Dir", secondary="entry_dir_link", uselist=False )
|
|
|
|
def FullPathOnFS(self):
|
|
if self.in_dir:
|
|
s=self.in_dir.in_path.path_prefix + '/'
|
|
if len(self.in_dir.rel_path) > 0:
|
|
s += self.in_dir.rel_path + '/'
|
|
s += self.name
|
|
# this occurs when we have a dir that is the root of a path
|
|
else:
|
|
s=self.dir_details.in_path.path_prefix
|
|
return s
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name: {self.name}, type={self.type}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}"
|
|
|
|
################################################################################
|
|
# Class describing File and in the DB (via sqlalchemy)
|
|
# all files are entries, this is the extra bits only for a file, of note:
|
|
# hash is unique for files, and used to validate duplicates
|
|
# woy == week of year, all date fields are used to sort/show content. Date
|
|
# info can be from exif, or file system, or file name (rarely)
|
|
# faces: convenience field to show connected face(s) for this file
|
|
################################################################################
|
|
class File(db.Model):
|
|
__tablename__ = "file"
|
|
eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
|
|
size_mb = db.Column(db.Integer, unique=False, nullable=False)
|
|
thumbnail = db.Column(db.String, unique=False, nullable=True)
|
|
hash = db.Column(db.String)
|
|
year = db.Column(db.Integer)
|
|
month = db.Column(db.Integer)
|
|
day = db.Column(db.Integer)
|
|
woy = db.Column(db.Integer)
|
|
faces = db.relationship ("Face", secondary="face_file_link" )
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, year={self.year}, month={self.month}, day={self.day}, woy={self.woy}, faces={self.faces}>"
|
|
|
|
################################################################################
|
|
# Class describing FileType and in the DB (via sqlalchemy)
|
|
# pre-defined list of file types (image, dir, etc.)
|
|
################################################################################
|
|
class FileType(db.Model):
|
|
__tablename__ = "file_type"
|
|
id = db.Column(db.Integer, db.Sequence('file_type_id_seq'), primary_key=True )
|
|
name = db.Column(db.String, unique=True, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}>"
|
|
|
|
|
|
################################################################################
|
|
# this is how we order all queries based on value of 'noo' - used with
|
|
# access *order_map.get(OPT.noo)
|
|
################################################################################
|
|
order_map = {
|
|
"Newest": (File.year.desc(),File.month.desc(),File.day.desc(),Entry.name.desc()),
|
|
"Oldest": (File.year,File.month,File.day,Entry.name),
|
|
# careful, these need to be tuples, so with a , at the end
|
|
"Z to A": (Entry.name.desc(),),
|
|
"A to Z": (Entry.name.asc(),),
|
|
}
|
|
|
|
################################################################################
|
|
|
|
################################################################################
|
|
# Schemas for Path, FileType, File, Dir - used in EntrySchema
|
|
################################################################################
|
|
class PathType(ma.SQLAlchemyAutoSchema):
|
|
class Meta: model = PathType
|
|
load_instance = True
|
|
|
|
class PathSchema(ma.SQLAlchemyAutoSchema):
|
|
class Meta: model = Path
|
|
load_instance = True
|
|
type = ma.Nested(PathType)
|
|
|
|
class FileTypeSchema(ma.SQLAlchemyAutoSchema):
|
|
class Meta: model = FileType
|
|
load_instance = True
|
|
|
|
class FileSchema(ma.SQLAlchemyAutoSchema):
|
|
class Meta: model = File
|
|
load_instance = True
|
|
|
|
class DirSchema(ma.SQLAlchemyAutoSchema):
|
|
class Meta: model = Dir
|
|
load_instance = True
|
|
in_path = ma.Nested(PathSchema)
|
|
|
|
################################################################################
|
|
# Schema for Entry so we can json for data to the client
|
|
################################################################################
|
|
class EntrySchema(ma.SQLAlchemyAutoSchema):
|
|
# gives id, name, type_id
|
|
class Meta: model = Entry
|
|
load_instance = True
|
|
|
|
type = ma.Nested(FileTypeSchema)
|
|
file_details = ma.Nested(FileSchema)
|
|
# noting dir_details needs in_path to work
|
|
dir_details = ma.Nested(DirSchema)
|
|
# noting in_dir needs in_path and in_path.type to work
|
|
in_dir = ma.Nested(DirSchema)
|
|
|
|
################################################################################
|
|
# util function to just update the current/first/last positions needed for
|
|
# viewing / using pa_user_state DB table
|
|
################################################################################
|
|
def UpdatePref( pref, OPT ):
|
|
last_used=datetime.now(pytz.utc)
|
|
if OPT.current>0:
|
|
pref.current=OPT.current
|
|
if OPT.first_eid>0:
|
|
pref.first_eid=OPT.first_eid
|
|
if OPT.last_eid>0:
|
|
pref.last_eid=OPT.last_eid
|
|
if OPT.num_entries>0:
|
|
pref.num_entries=OPT.num_entries
|
|
pref.last_used=last_used
|
|
db.session.add(pref)
|
|
db.session.commit()
|
|
|
|
################################################################################
|
|
# GetEntriesInFlatView: func. to retrieve DB entries appropriate for flat view
|
|
################################################################################
|
|
def GetEntriesInFlatView( OPT, prefix ):
|
|
entries=[]
|
|
num_entries=0
|
|
|
|
join = "Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix)"
|
|
entries = eval( f"{join}.{OPT.order}.offset({OPT.offset}).limit({OPT.how_many}).all()" )
|
|
|
|
if OPT.first_eid == 0 and OPT.offset == 0 and len(entries):
|
|
OPT.first_eid = entries[0].id
|
|
|
|
if OPT.last_eid==0:
|
|
num_entries = eval( f"{join}.count()" )
|
|
last_entry = eval( f"{join}.{OPT.last_order}.limit(1).first()" )
|
|
if last_entry:
|
|
OPT.last_eid = last_entry.id
|
|
|
|
return entries, num_entries
|
|
|
|
################################################################################
|
|
# GetEntriesInFolderView: func. to retrieve DB entries appropriate for folder view
|
|
# read inline comments to deal with variations / ordering...
|
|
################################################################################
|
|
def GetEntriesInFolderView( OPT, prefix ):
|
|
entries=[]
|
|
num_entries=0
|
|
# okay the root cwd is fake, so treat it specially - its Dir can be found by path with dir.rel_path=''
|
|
if os.path.dirname(OPT.cwd) == 'static':
|
|
dir=Entry.query.join(Dir).join(PathDirLink).join(Path).filter(Dir.rel_path=='').filter(Path.path_prefix==prefix).order_by(Entry.name).first()
|
|
# this can occur if the path in settings does not exist as it wont be in # the DB
|
|
if not dir:
|
|
return entries, num_entries
|
|
# although this is 1 entry, needs to come back via all() to be iterable
|
|
entries+= Entry.query.filter(Entry.id==dir.id).all()
|
|
else:
|
|
rp = OPT.cwd.replace( prefix, '' )
|
|
# when in subdirs, replacing prefix will leave the first char as /, get rid of it
|
|
if len(rp) and rp[0] == '/':
|
|
rp=rp[1:]
|
|
dir=Entry.query.join(Dir).join(PathDirLink).join(Path).filter(Dir.rel_path==rp).filter(Path.path_prefix==prefix).order_by(Entry.name).first()
|
|
# this can occur if the path in settings does not exist as it wont be in # the DB
|
|
if not dir:
|
|
return entries, 0
|
|
# dirs cant be sorted by date really, so do best I can for now
|
|
if OPT.noo == "Z to A" or OPT.noo == "Newest":
|
|
entries+= Entry.query.join(EntryDirLink).join(FileType).filter(EntryDirLink.dir_eid==dir.id).filter(FileType.name=='Directory').order_by(Entry.name.desc()).all()
|
|
# just do A to Z / Oldest by default or if no valid option
|
|
else:
|
|
entries+= Entry.query.join(EntryDirLink).join(FileType).filter(EntryDirLink.dir_eid==dir.id).filter(FileType.name=='Directory').order_by(Entry.name).all()
|
|
|
|
# add any files at the current CWD (based on dir_eid in DB)
|
|
join="Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id)"
|
|
file_entries= eval( f"{join}.{OPT.order}.offset(OPT.offset).limit(OPT.how_many).all()")
|
|
|
|
if OPT.offset == 0 and len(file_entries):
|
|
OPT.first_eid = file_entries[0].id
|
|
num_entries = eval( f"{join}.count()" )
|
|
last_entry = eval( f"{join}.{OPT.last_order}.limit(1).first()" )
|
|
if last_entry:
|
|
OPT.last_eid = last_entry.id
|
|
|
|
entries += file_entries;
|
|
return entries, num_entries
|
|
|
|
|
|
################################################################################
|
|
# GetEntriesInSearchView: func. to retrieve DB entries appropriate for Search view
|
|
# Defaults search is for any matching filename, contents of any matching dirname
|
|
# and any match with AI / face for that term. Explicit, only AI match via
|
|
# AI:<tag> syntax
|
|
################################################################################
|
|
def GetEntriesInSearchView( OPT ):
|
|
search_term=OPT.orig_search_term
|
|
# turn * wildcard into sql wildcard of %
|
|
search_term=search_term.replace('*', '%' )
|
|
if 'AI:' in OPT.orig_search_term:
|
|
search_term = search_term.replace('AI:','')
|
|
join=f"Entry.query.join(File).join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag == search_term)"
|
|
else:
|
|
join=f"Entry.query.join(File).join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag.ilike('%{search_term}%'))"
|
|
if 'AI:' in OPT.orig_search_term:
|
|
all_entries = eval( f"{join}.{OPT.order}.offset(OPT.offset).limit(OPT.how_many).all()")
|
|
else:
|
|
file_data=eval( f"Entry.query.join(File).filter(Entry.name.ilike('%{search_term}%')).{OPT.order}.offset({OPT.offset}).limit({OPT.how_many}).all()" )
|
|
dir_data =eval( f"Entry.query.join(File).join(EntryDirLink).join(Dir).filter(Dir.rel_path.ilike('%{search_term}%')).{OPT.order}.offset({OPT.offset}).limit({OPT.how_many}).all()" )
|
|
ai_data =eval( f"{join}.{OPT.order}.offset({OPT.offset}).limit({OPT.how_many}).all()")
|
|
|
|
# remove any duplicates from combined data
|
|
all_entries = []
|
|
for f in file_data:
|
|
all_entries.append(f)
|
|
for d in dir_data:
|
|
add_it=1
|
|
for f in file_data:
|
|
if d.name == f.name:
|
|
add_it=0
|
|
break
|
|
if add_it:
|
|
all_entries.append(d)
|
|
for a in ai_data:
|
|
add_it=1
|
|
for f in file_data:
|
|
if a.name == f.name:
|
|
add_it=0
|
|
break
|
|
if add_it:
|
|
all_entries.append(a)
|
|
|
|
# nothing found, just return now
|
|
if len(all_entries) == 0:
|
|
OPT.num_entries = 0
|
|
return []
|
|
|
|
# for all searches first_entry is worked out when first_eid not set yet & offset is 0 and we have some entries
|
|
if OPT.first_eid == 0 and OPT.offset == 0 and len(all_entries):
|
|
OPT.first_eid = all_entries[0].id
|
|
if OPT.last_eid == 0:
|
|
by_fname= f"select e.id from entry e where e.name ilike '%%{search_term}%%'"
|
|
by_dirname=f"select e.id from entry e, entry_dir_link edl where edl.entry_id = e.id and edl.dir_eid in ( select d.eid from dir d where d.rel_path ilike '%%{search_term}%%' )"
|
|
by_ai =f"select e.id from entry e, face_file_link ffl, face_refimg_link frl, person_refimg_link prl, person p where e.id = ffl.file_eid and frl.face_id = ffl.face_id and frl.refimg_id = prl.refimg_id and prl.person_id = p.id and p.tag = '{search_term}'"
|
|
|
|
if 'AI:' in OPT.orig_search_term:
|
|
sel_no_order=f"select e.*, f.* from entry e, file f where e.id=f.eid and e.id in ( {by_ai} ) "
|
|
else:
|
|
sel_no_order=f"select e.*, f.* from entry e, file f where e.id=f.eid and e.id in ( {by_fname} union {by_dirname} union {by_ai} ) "
|
|
|
|
#num_entries
|
|
num_e_sql = f"select count(1) from ( {by_fname} union {by_dirname} union {by_ai} ) as foo"
|
|
with db.engine.connect() as conn:
|
|
OPT.num_entries = conn.execute( text( num_e_sql ) ).first().count
|
|
|
|
if OPT.num_entries == 0:
|
|
return []
|
|
|
|
last_entry_sql= f"{sel_no_order} order by {OPT.last_order_raw} limit 1"
|
|
with db.engine.connect() as conn:
|
|
OPT.last_eid = conn.execute( text( last_entry_sql ) ).first().id
|
|
# store first/last eid into prefs
|
|
pref=PA_UserState.query.filter(PA_UserState.pa_user_dn==current_user.dn,PA_UserState.path_type==OPT.path_type,PA_UserState.orig_ptype==OPT.orig_ptype,PA_UserState.orig_search_term==OPT.orig_search_term).first()
|
|
UpdatePref( pref, OPT )
|
|
return all_entries
|
|
|
|
################################################################################
|
|
# set up "order strings" to use in ORM and raw queries as needed for
|
|
# GetEntries*Search*, GetEntries*Flat*, GetEntries*Fold*
|
|
################################################################################
|
|
def SetOrderStrings( OPT ):
|
|
if OPT.noo == "Newest":
|
|
OPT.order="order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name.desc())"
|
|
OPT.last_order="order_by(File.year,File.month,File.day,Entry.name)"
|
|
OPT.last_order_raw=f"f.year, f.month, f.day, e.name"
|
|
elif OPT.noo == "Oldest":
|
|
OPT.order="order_by(File.year,File.month,File.day,Entry.name)"
|
|
OPT.last_order="order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name.desc())"
|
|
OPT.last_order_raw=f"f.year desc, f.month desc, f.day desc, e.name desc"
|
|
elif OPT.noo == "Z to A":
|
|
OPT.order="order_by(Entry.name.desc())"
|
|
OPT.last_order="order_by(Entry.name)"
|
|
OPT.last_order_raw=f"e.name"
|
|
else:
|
|
# A to Z
|
|
OPT.order="order_by(Entry.name)"
|
|
OPT.last_order="order_by(Entry.name.desc())"
|
|
OPT.last_order_raw=f"e.name desc"
|
|
return
|
|
|
|
################################################################################
|
|
# /get_entries_by_ids -> route where we supply list of entry ids (for next/prev
|
|
# page of data we want to show). Returns json of all matching entries
|
|
################################################################################
|
|
@app.route('/get_entries_by_ids', methods=['POST'])
|
|
@login_required
|
|
def process_ids():
|
|
data = request.get_json() # Parse JSON body
|
|
ids = data.get('ids', []) # Extract list of ids
|
|
|
|
# DDP: debate here, do I get query_id, do I validate whether we are asking
|
|
# for ids not in the query? OR, dont even make/store/have query?
|
|
|
|
# marshmallow will allow us to json the data the way we need for the client
|
|
entries_schema = EntrySchema(many=True)
|
|
|
|
# Query DB for matching entries
|
|
entries = Entry.query.filter(Entry.id.in_(ids)).all()
|
|
|
|
# return entries as json
|
|
return jsonify(entries_schema.dump(entries))
|
|
|
|
###
|
|
# query_data = { 'entry_lst': entry_lst, 'query_id': query_id, ... }
|
|
###
|
|
# Call this ONCE on first menu choice of View files, or search box submission
|
|
def GetQueryData( OPT ):
|
|
query_data = {}
|
|
query_data['query_id']=None
|
|
query_data['entry_list']=None
|
|
|
|
# set up the sql order strings (back in OPT) based on value of noo
|
|
# FIXME: remove this for all last/first eid usage AND use order_map
|
|
SetOrderStrings( OPT )
|
|
|
|
if OPT.path_type == 'Search':
|
|
print ("NOT YET")
|
|
return query_data
|
|
|
|
if OPT.folders:
|
|
entries, tmp_num_ents = GetEntriesInFolderView( OPT, prefix )
|
|
else:
|
|
stmt = ( select(Entry.id).join(File).join(EntryDirLink).join(Dir).join(PathDirLink).
|
|
join(Path).filter(Path.path_prefix == OPT.prefix) )
|
|
stmt = stmt.order_by(*order_map.get(OPT.noo) )
|
|
query_data['entry_list']= db.session.execute(stmt).scalars().all()
|
|
|
|
# first time we get the data q_offset is 0, current=first one, search never gets here, so search_term=''
|
|
# FIXME: Doubt we need cwd -- I only need originals to either invalidate this list, or recreate it... need to think about that a lot more
|
|
query = Query( path_type=OPT.path_type, noo=OPT.noo, q_offset=0, folder=OPT.folders, grouping=OPT.grouping, root=OPT.root, cwd=OPT.cwd, search_term='',
|
|
entry_list=query_data['entry_list'], current=query_data['entry_list'][0], created=datetime.now(pytz.utc) )
|
|
db.session.add(query)
|
|
db.session.commit()
|
|
|
|
query_data['query_id']=query.id
|
|
return query_data
|
|
|
|
################################################################################
|
|
# /GetEntries -> helper function that Gets Entries for required files to show
|
|
# for several routes (ifles_ip, files_sp, files_rbp, search, view_list)
|
|
################################################################################
|
|
def GetEntries( OPT ):
|
|
entries=[]
|
|
|
|
SetOrderStrings( OPT )
|
|
if OPT.path_type == 'Search' or (OPT.path_type == 'View' and OPT.orig_ptype=='Search'):
|
|
return GetEntriesInSearchView( OPT )
|
|
|
|
# if we are a view, then it will be of something else, e.g. a list of
|
|
# import, storage, or bin images, reset OPT.path_type so that the paths array below works
|
|
if 'View' in OPT.path_type:
|
|
eid = OPT.url[6:]
|
|
OPT.path_type= OPT.orig_ptype
|
|
|
|
paths = []
|
|
if OPT.path_type == 'Storage':
|
|
path = SettingsSPath()
|
|
elif OPT.path_type == 'Import':
|
|
path = SettingsIPath()
|
|
elif OPT.path_type == 'Bin':
|
|
path = SettingsRBPath()
|
|
|
|
num_entries=0
|
|
path_cnt=1
|
|
|
|
# if we have not set last_eid yet, then we need to 'reset' it during the
|
|
# path loop below (if we have more than one dir in (say) Import path)
|
|
if OPT.last_eid == 0 or OPT.folders:
|
|
update_last_eid = True
|
|
else:
|
|
update_last_eid = False
|
|
prefix = SymlinkName(OPT.path_type,path,path+'/')
|
|
if OPT.folders:
|
|
tmp_ents, tmp_num_ents = GetEntriesInFolderView( OPT, prefix )
|
|
else:
|
|
tmp_ents, tmp_num_ents = GetEntriesInFlatView( OPT, prefix )
|
|
entries += tmp_ents
|
|
num_entries += tmp_num_ents
|
|
|
|
if update_last_eid:
|
|
# find pref... via path_type if we are here
|
|
OPT.num_entries=num_entries
|
|
pref=PA_UserState.query.filter(PA_UserState.pa_user_dn==current_user.dn,PA_UserState.path_type==OPT.path_type).first()
|
|
UpdatePref( pref, OPT )
|
|
|
|
return entries
|
|
|
|
@app.route("/change_file_opts", methods=["POST"])
|
|
@login_required
|
|
def change_file_opts():
|
|
# reset options based on form post, then redirect back to orig page (with a GET to allow back button to work)
|
|
OPT=States( request )
|
|
return redirect( request.referrer )
|
|
|
|
################################################################################
|
|
# /file_list -> show detailed file list of files from import_path(s)
|
|
################################################################################
|
|
@app.route("/file_list_ip", methods=["GET", "POST"])
|
|
@login_required
|
|
def file_list_ip():
|
|
OPT=States( request )
|
|
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
|
|
if request.method=='POST':
|
|
redirect("/file_list_ip")
|
|
entries=GetEntries( OPT )
|
|
return render_template("file_list.html", page_title='View File Details (Import Path)', entry_data=entries, OPT=OPT )
|
|
|
|
################################################################################
|
|
# /files -> show thumbnail view of files from import_path(s)
|
|
################################################################################
|
|
@app.route("/files_ip", methods=["GET", "POST"])
|
|
@login_required
|
|
def files_ip():
|
|
OPT=States( request )
|
|
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
|
|
if request.method=='POST':
|
|
redirect("/files_ip")
|
|
entries=GetEntries( OPT )
|
|
people = Person.query.all()
|
|
move_paths = MovePathDetails()
|
|
query_data = GetQueryData( OPT )
|
|
return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", entry_data=entries, OPT=OPT, people=people, move_paths=move_paths, query_data=query_data )
|
|
|
|
################################################################################
|
|
# /files -> show thumbnail view of files from storage_path
|
|
################################################################################
|
|
@app.route("/files_sp", methods=["GET", "POST"])
|
|
@login_required
|
|
def files_sp():
|
|
OPT=States( request )
|
|
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
|
|
if request.method=='POST':
|
|
redirect("/files_sp")
|
|
entries=GetEntries( OPT )
|
|
people = Person.query.all()
|
|
move_paths = MovePathDetails()
|
|
return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", entry_data=entries, OPT=OPT, people=people, move_paths=move_paths )
|
|
|
|
|
|
################################################################################
|
|
# /files -> show thumbnail view of files from recycle_bin_path
|
|
################################################################################
|
|
@app.route("/files_rbp", methods=["GET", "POST"])
|
|
@login_required
|
|
def files_rbp():
|
|
OPT=States( request )
|
|
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
|
|
if request.method=='POST':
|
|
redirect("/files_rbp")
|
|
entries=GetEntries( OPT )
|
|
people = Person.query.all()
|
|
move_paths = MovePathDetails()
|
|
return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", entry_data=entries, OPT=OPT, move_paths=move_paths )
|
|
|
|
################################################################################
|
|
# search -> GET version -> has search_term in the URL and is therefore able to
|
|
# be used even if the user hits the front/back buttons in the browser.
|
|
# func shows thumbnails of matching files.
|
|
################################################################################
|
|
@app.route("/search/<search_term>", methods=["GET", "POST"])
|
|
@login_required
|
|
def search(search_term):
|
|
# print( f"req={request}" )
|
|
OPT=States( request )
|
|
# print( f"OPT={OPT}" )
|
|
|
|
# if we posted to get here, its a change in State, so save it to pa_user_state, and go back to the GET version or URL
|
|
if request.method=="POST":
|
|
redirect("/search/"+search_term)
|
|
OPT.search_term = search_term
|
|
# always show flat results for search to start with
|
|
OPT.folders=False
|
|
entries=GetEntries( OPT )
|
|
move_paths = MovePathDetails()
|
|
return render_template("files.html", page_title='View Files', search_term=search_term, entry_data=entries, OPT=OPT, move_paths=move_paths )
|
|
|
|
################################################################################
|
|
# /files/scan_ip -> allows us to force a check for new files
|
|
################################################################################
|
|
@app.route("/files/scan_ip", methods=["GET"])
|
|
@login_required
|
|
def scan_ip():
|
|
job=NewJob( name="scan_ip", num_files=0, wait_for=None, jex=None, desc="scan for new files in import path" )
|
|
return redirect("/jobs")
|
|
|
|
################################################################################
|
|
# /files/force_scan -> deletes old data in DB, and does a brand new scan
|
|
################################################################################
|
|
@app.route("/files/force_scan", methods=["GET"])
|
|
@login_required
|
|
def force_scan():
|
|
job=NewJob( name="force_scan", num_files=0, wait_for=None, jex=None, desc="remove data and rescan import & storage paths" )
|
|
return redirect("/jobs")
|
|
|
|
################################################################################
|
|
# /files/scan_sp -> allows us to force a check for new files
|
|
################################################################################
|
|
@app.route("/files/scan_sp", methods=["GET"])
|
|
@login_required
|
|
def scan_sp():
|
|
job=NewJob( name="scan_sp", num_files=0, wait_for=None, jex=None, desc="scan for new files in storage path" )
|
|
return redirect("/jobs")
|
|
|
|
|
|
################################################################################
|
|
# /fix_dups -> use sql to find duplicates based on same hash, different
|
|
# filenames, or directories. Pass this straight through to the job manager
|
|
# as job extras to a new job.
|
|
################################################################################
|
|
@app.route("/fix_dups", methods=["POST"])
|
|
@login_required
|
|
def fix_dups():
|
|
with db.engine.connect() as conn:
|
|
rows = conn.execute( text( "select e1.id as id1, f1.hash, d1.rel_path as rel_path1, d1.eid as did1, e1.name as fname1, p1.id as path1, p1.type_id as path_type1, e2.id as id2, d2.rel_path as rel_path2, d2.eid as did2, e2.name as fname2, p2.id as path2, p2.type_id as path_type2 from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb order by path1, rel_path1, fname1" ) )
|
|
|
|
if rows.returns_rows == False:
|
|
SetFELog(f"Err, No more duplicates? Old link followed, or something is wrong!", "warning")
|
|
return redirect("/")
|
|
|
|
if 'pagesize' not in request.form:
|
|
# default to 10, see if we have a larger value as someone reset it in the gui, rather than first time invoked
|
|
pagesize = 10
|
|
jexes = JobExtra.query.join(Job).filter(Job.name=='check_dups').filter(Job.pa_job_state=='New').all()
|
|
jexes.append( JobExtra( name="pagesize", value=str(pagesize) ) )
|
|
else:
|
|
pagesize=int(request.form['pagesize'])
|
|
DD=Duplicates()
|
|
for row in rows:
|
|
DD.AddDup( row )
|
|
|
|
DD.SecondPass()
|
|
# DD.Dump()
|
|
|
|
return render_template("dups.html", DD=DD, pagesize=pagesize )
|
|
|
|
################################################################################
|
|
# /rm_dups -> f/e that shows actual duplicates so that we can delete some dups
|
|
# this code creates a new job with extras that have hashes/ids to allow removal
|
|
################################################################################
|
|
@app.route("/rm_dups", methods=["POST"])
|
|
@login_required
|
|
def rm_dups():
|
|
|
|
jex=[]
|
|
for el in request.form:
|
|
if 'kfhash-' in el:
|
|
# get which row/number kf it is...
|
|
_, which = el.split('-')
|
|
jex.append( JobExtra( name=f"kfid-{which}", value=str(request.form['kfid-'+which] )) )
|
|
jex.append( JobExtra( name=f"kfhash-{which}", value=str(request.form[el] )) )
|
|
if 'kdhash-' in el:
|
|
# get which row/number kd it is...
|
|
_, which = el.split('-')
|
|
jex.append( JobExtra( name=f"kdid-{which}", value=str(request.form['kdid-'+which]) ) )
|
|
jex.append( JobExtra( name=f"kdhash-{which}", value=str(request.form[el]) ) )
|
|
|
|
jex.append( JobExtra( name="pagesize", value="10" ) )
|
|
|
|
job=NewJob( name="rm_dups", num_files=0, wait_for=None, jex=jex, desc="to delete duplicate files" )
|
|
|
|
return redirect("/jobs")
|
|
|
|
################################################################################
|
|
# /restore_files -> create a job to restore files for the b/e to process
|
|
################################################################################
|
|
@app.route("/restore_files", methods=["POST"])
|
|
@login_required
|
|
def restore_files():
|
|
jex=[]
|
|
for el in request.form:
|
|
jex.append( JobExtra( name=f"{el}", value=str(request.form[el]) ) )
|
|
|
|
job=NewJob( name="restore_files", num_files=0, wait_for=None, jex=jex, desc="to restore selected file(s)" )
|
|
return redirect("/jobs")
|
|
|
|
################################################################################
|
|
# /delete_files -> create a job to delete files for the b/e to process
|
|
################################################################################
|
|
@app.route("/delete_files", methods=["POST"])
|
|
@login_required
|
|
def delete_files():
|
|
jex=[]
|
|
for el in request.form:
|
|
jex.append( JobExtra( name=f"{el}", value=str(request.form[el]) ) )
|
|
|
|
job=NewJob( name="delete_files", num_files=0, wait_for=None, jex=jex, desc="to delete selected file(s)" )
|
|
return redirect("/jobs")
|
|
|
|
################################################################################
|
|
# /move_files -> create a job to move files for the b/e to process
|
|
################################################################################
|
|
@app.route("/move_files", methods=["POST"])
|
|
@login_required
|
|
def move_files():
|
|
|
|
jex=[]
|
|
for el in request.form:
|
|
jex.append( JobExtra( name=f"{el}", value=str(request.form[el]) ) )
|
|
job=NewJob( name="move_files", num_files=0, wait_for=None, jex=jex, desc="to move selected file(s)" )
|
|
# data is not used, but send response to trigger CheckForJobs()
|
|
return make_response( jsonify( job_id=job.id ) )
|
|
|
|
@login_required
|
|
@app.route("/view_list", methods=["POST"])
|
|
def view_list():
|
|
OPT=States( request )
|
|
# Get next/prev set of data - e.g. if next set, then it will use orig_url
|
|
# to go forward how_many from offset and then use viewer.html to show that
|
|
# first obj of the new list of entries
|
|
entries=GetEntries( OPT )
|
|
# this occurs when we went from the last image on a page (with how_many on
|
|
# it) and it just happened to also be the last in the DB...
|
|
if not entries:
|
|
SetFELog( message="DDP: DONT think this can happen anymore", level="danger", job=None, persistent=True, cant_close=True )
|
|
|
|
# undo the skip by how_many and getentries again
|
|
OPT.offset -= int(OPT.how_many)
|
|
entries=GetEntries( OPT )
|
|
# now flag we are at the last in db, to reset current below
|
|
objs = {}
|
|
eids=""
|
|
resp={}
|
|
resp['objs']={}
|
|
for e in entries:
|
|
if not e.file_details:
|
|
continue
|
|
eids=eids+f"{e.id},"
|
|
resp['objs'][e.id]={}
|
|
resp['objs'][e.id]['url'] = e.FullPathOnFS()
|
|
resp['objs'][e.id]['name'] = e.name
|
|
resp['objs'][e.id]['type'] = e.type.name
|
|
if e.file_details.faces:
|
|
# model is used for whole file, so set it at that level (based on first face)
|
|
resp['objs'][e.id]['face_model'] = e.file_details.faces[0].facefile_lnk.model_used
|
|
resp['objs'][e.id]['faces'] = []
|
|
|
|
# put face data back into array format (for js processing)
|
|
for face in e.file_details.faces:
|
|
fd= {}
|
|
fd['x'] = face.face_left
|
|
fd['y'] = face.face_top
|
|
fd['w'] = face.w
|
|
fd['h'] = face.h
|
|
if face.refimg:
|
|
fd['pid'] = face.refimg.person.id
|
|
fd['who'] = face.refimg.person.tag
|
|
fd['distance'] = round(face.refimg_lnk.face_distance,2)
|
|
resp['objs'][e.id]['faces'].append(fd)
|
|
|
|
eids=eids.rstrip(",")
|
|
lst = eids.split(',')
|
|
if 'next' in request.form:
|
|
OPT.current = int(lst[0])
|
|
if 'prev' in request.form:
|
|
OPT.current = int(lst[-1])
|
|
|
|
resp['current']=OPT.current
|
|
# OPT.first_eid can still be 0 IF we have gone past the first page, I could
|
|
# better set this in states rather than kludge this if... think about it
|
|
if OPT.first_eid>0:
|
|
resp['first_eid']=OPT.first_eid
|
|
resp['last_eid']=OPT.last_eid
|
|
resp['eids']=eids
|
|
resp['offset']=OPT.offset
|
|
# print( f"BUG-DEBUG: /view_list route #1 - OPT={OPT}, eids={eids} ")
|
|
# save pref to keep the new current value, first/last
|
|
pref=PA_UserState.query.filter(PA_UserState.pa_user_dn==current_user.dn,PA_UserState.orig_ptype==OPT.orig_ptype,PA_UserState.view_eid==OPT.view_eid).first()
|
|
# print( f"BUG-DEBUG: /view_list route #2 - OPT={OPT}, eids={eids} ")
|
|
UpdatePref( pref, OPT )
|
|
# print( f"BUG-DEBUG: /view_list route #3 - OPT={OPT}, eids={eids} ")
|
|
|
|
return make_response( resp )
|
|
|
|
################################################################################
|
|
# /view/id -> grabs data from DB and views it (GET)
|
|
################################################################################
|
|
@login_required
|
|
@app.route("/view/<id>", methods=["GET"])
|
|
def view(id):
|
|
OPT=States( request )
|
|
objs = {}
|
|
entries=GetEntries( OPT )
|
|
eids=""
|
|
for e in entries:
|
|
objs[e.id]=e
|
|
eids += f"{e.id},"
|
|
# if this is a dir, we wont view it with a click anyway, so move on...
|
|
if not e.file_details:
|
|
continue
|
|
# process any overrides
|
|
for face in e.file_details.faces:
|
|
# now get any relevant override and store it in objs...
|
|
fnmo = FaceNoMatchOverride.query.filter(FaceNoMatchOverride.face_id==face.id).first()
|
|
if fnmo:
|
|
face.no_match_override=fnmo
|
|
mo = FaceForceMatchOverride.query.filter(FaceForceMatchOverride.face_id==face.id).first()
|
|
if mo:
|
|
mo.type = FaceOverrideType.query.filter( FaceOverrideType.name== 'Manual match to existing person' ).first()
|
|
face.manual_override=mo
|
|
|
|
eids=eids.rstrip(",")
|
|
# jic, sometimes we trip this, and rather than show broken pages / destroy
|
|
if id not in eids:
|
|
SetFELog( message=f"ERROR: viewing an id, but its not in eids OPT={OPT}, id={id}, eids={eids}", level="danger", persistent=True, cant_close=False)
|
|
msg="Sorry, viewing data is confused, cannot view this image now"
|
|
if os.environ['ENV'] == "production":
|
|
msg += "Clearing out all states. This means browser back buttons will not work, please start a new tab and try again"
|
|
PA_UserState.query.delete()
|
|
db.session.commit()
|
|
SetFELog( msg, "warning", persistent=True, cant_close=False )
|
|
return redirect("/")
|
|
else:
|
|
NMO_data = FaceOverrideType.query.all()
|
|
setting = Settings.query.first()
|
|
imp_path = setting.import_path
|
|
st_path = setting.storage_path
|
|
bin_path = setting.recycle_bin_path
|
|
# print( f"BUG-DEBUG: /view/id GET route - OPT={OPT}, eids={eids}, current={int(id)} ")
|
|
return render_template("viewer.html", current=int(id), eids=eids, objs=objs, OPT=OPT, NMO_data=NMO_data, imp_path=imp_path, st_path=st_path, bin_path=bin_path )
|
|
|
|
##################################################################################
|
|
# /view/id -> grabs data from DB and views it (POST -> set state, redirect to GET)
|
|
##################################################################################
|
|
@app.route("/view/<id>", methods=["POST"])
|
|
@login_required
|
|
def view_img_post(id):
|
|
# set pa_user_states...
|
|
OPT=States( request )
|
|
# print( f"BUG-DEBUG: /view/id POST route - OPT={OPT}, id={id} ")
|
|
# then use back-button friendly URL (and use pa_user_states to view the right image in the right list
|
|
return redirect( "/view/" + id );
|
|
|
|
# route called from front/end - if multiple images are being transformed, each transorm == a separate call
|
|
# to this route (and therefore a separate transorm job. Each reponse allows the f/e to check the
|
|
# specific transorm job is finished (/check_transform_job) which will be called (say) every 1 sec. from f/e
|
|
# with a spinning wheel, then when pa_job_mgr has finished it will return the transformed thumb
|
|
@app.route("/transform", methods=["POST"])
|
|
@login_required
|
|
def transform():
|
|
id = request.form['id']
|
|
amt = request.form['amt']
|
|
|
|
jex=[]
|
|
for el in request.form:
|
|
jex.append( JobExtra( name=f"{el}", value=str(request.form[el]) ) )
|
|
|
|
job=NewJob( name="transform_image", num_files=0, wait_for=None, jex=jex, desc="to transform selected file(s)" )
|
|
return make_response( jsonify( job_id=job.id ) )
|
|
|
|
################################################################################
|
|
# /check_transform_job -> URL that is called repeatedly by front-end waiting for the
|
|
# b/e to finish the transform job. Once done, the new / now
|
|
# transformed image's thumbnail is returned so the f/e can
|
|
# update with it
|
|
################################################################################
|
|
@app.route("/check_transform_job", methods=["POST"])
|
|
@login_required
|
|
def check_transform_job():
|
|
job_id = request.form['job_id']
|
|
job = Job.query.get(job_id)
|
|
j=jsonify( finished=False )
|
|
if job.pa_job_state == 'Completed':
|
|
id=[jex.value for jex in job.extra if jex.name == "id"][0]
|
|
e=Entry.query.join(File).filter(Entry.id==id).first()
|
|
j=jsonify( finished=True, thumbnail=e.file_details.thumbnail )
|
|
return make_response( j )
|
|
|
|
################################################################################
|
|
# /include -> return contents on /include and does not need a login, so we
|
|
# can get the icon, and potentially any js, bootstrap, etc. needed for the login page
|
|
################################################################################
|
|
@app.route("/internal/<path:filename>")
|
|
def internal(filename):
|
|
return send_from_directory("internal/", filename)
|
|
|
|
################################################################################
|
|
# /static -> returns the contents of any file referenced inside /static.
|
|
# we create/use symlinks in static/ to reference the images to show
|
|
################################################################################
|
|
@app.route("/static/<filename>")
|
|
@login_required
|
|
def custom_static(filename):
|
|
return send_from_directory("static/", filename)
|
|
|
|
###############################################################################
|
|
# This func creates a new filter in jinja2 to test to see if the Dir being
|
|
# checked, is a top-level folder of 'OPT.cwd'
|
|
################################################################################
|
|
@app.template_filter('TopLevelFolderOf')
|
|
def _jinja2_filter_toplevelfolderof(path, cwd):
|
|
if os.path.dirname(path) == cwd:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
###############################################################################
|
|
# This func creates a new filter in jinja2 to test to hand back the parent path
|
|
# from a given path
|
|
################################################################################
|
|
@app.template_filter('ParentPath')
|
|
def _jinja2_filter_parentpath(path):
|
|
return os.path.dirname(path)
|
|
|
|
###############################################################################
|
|
# route to allow the Move Dialog Box to pass a date (YYYYMMDD) and returns a
|
|
# json list of existing dir names that could be near it in time. Starting
|
|
# simple, by using YYYYMM-1, YYYYMM, YYYYMM+1 dirs
|
|
###############################################################################
|
|
@app.route("/get_existing_paths/<dt>", methods=["POST"])
|
|
@login_required
|
|
def get_existing_paths(dt):
|
|
dir_ft=FileType.query.filter(FileType.name=='Directory').first()
|
|
dirs_arr=[]
|
|
for delta in range(-14, 15):
|
|
try:
|
|
new_dtime=datetime.strptime(dt, "%Y%m%d") + timedelta(days=delta)
|
|
except:
|
|
# this is not a date, so we cant work out possible dirs, just
|
|
# return an empty set
|
|
return make_response( '[]' )
|
|
new_dt=new_dtime.strftime('%Y%m%d')
|
|
# find dirs named with this date
|
|
dirs_arr+=Dir.query.filter(Dir.rel_path.ilike('%'+new_dt+'%')).all();
|
|
# find dirs with non-dirs (files) with this date
|
|
dirs_arr+=Dir.query.join(EntryDirLink).join(Entry).filter(Entry.type_id!=dir_ft.id).filter(Entry.name.ilike('%'+new_dt+'%')).all()
|
|
|
|
# remove duplicates from array
|
|
dirs = set(dirs_arr)
|
|
|
|
# turn DB output into json and return it to the f/e
|
|
ret='[ '
|
|
first_dir=1
|
|
for dir in dirs:
|
|
# this can occur if there is a file with this date name in the top-levle of the path, its legit, but only really happens in DEV
|
|
# regardless, it cant be used for a existpath button in the F/E, ignore it
|
|
if dir.rel_path == '':
|
|
continue
|
|
if not first_dir:
|
|
ret +=", "
|
|
# maxsplit 1, means bits[1] can contain dashes
|
|
bits=dir.rel_path.split('-',maxsplit=1)
|
|
|
|
ret+= '{ '
|
|
# if there is a prefix/suffix, then do prefix='bits[0]-', suffix='bits[1]', otherwise prefix='bits[0]', suffix=''
|
|
if len(bits) > 1:
|
|
ret+= '"prefix":"' + bits[0] + '-", "suffix":"' + bits[1] + '", '
|
|
else:
|
|
ret+= '"prefix":"' + bits[0] + '", "suffix":"''", '
|
|
ret += '"ptype": "'+dir.in_path.type.name+'"'
|
|
ret+= ' } '
|
|
first_dir=0
|
|
ret+= ' ]'
|
|
return make_response( ret )
|