862 lines
42 KiB
Python
862 lines
42 KiB
Python
from wtforms import SubmitField, StringField, HiddenField, validators, Form
|
|
from flask_wtf import FlaskForm
|
|
from flask import request, render_template, redirect, send_from_directory, url_for, jsonify
|
|
from path import MovePathDetails
|
|
from main import db, app, ma
|
|
from sqlalchemy import Sequence
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from status import st, Status
|
|
import os
|
|
import glob
|
|
from PIL import Image
|
|
from pymediainfo import MediaInfo
|
|
import hashlib
|
|
import exifread
|
|
import base64
|
|
import numpy
|
|
import cv2
|
|
import time
|
|
import re
|
|
import json
|
|
from datetime import datetime
|
|
import pytz
|
|
from flask_login import login_required, current_user
|
|
from states import States, PA_UserState
|
|
|
|
################################################################################
|
|
# Local Class imports
|
|
################################################################################
|
|
from job import Job, JobExtra, Joblog, NewJob
|
|
from path import PathType, Path
|
|
from person import Refimg, Person, PersonRefimgLink
|
|
from settings import Settings, SettingsIPath, SettingsSPath, SettingsRBPath
|
|
from shared import SymlinkName
|
|
from dups import Duplicates
|
|
from face import Face, FaceFileLink, FaceRefimgLink
|
|
|
|
# pylint: disable=no-member
|
|
|
|
################################################################################
|
|
# Class describing PathDirLink and in the DB (via sqlalchemy)
|
|
# connects the entry (dir) with a path
|
|
################################################################################
|
|
class PathDirLink(db.Model):
|
|
__tablename__ = "path_dir_link"
|
|
path_id = db.Column(db.Integer, db.ForeignKey("path.id"), primary_key=True )
|
|
dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<path_id: {self.path_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
################################################################################
|
|
# Class describing EntryDirLInk and in the DB (via sqlalchemy)
|
|
# connects (many) entry contained in a directory (which is also an entry)
|
|
################################################################################
|
|
class EntryDirLink(db.Model):
|
|
__tablename__ = "entry_dir_link"
|
|
entry_id = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
|
|
dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True )
|
|
|
|
def __repr__(self):
|
|
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
|
|
|
|
################################################################################
|
|
# Class describing Dir and in the DB (via sqlalchemy)
|
|
# rel_path: rest of dir after path, e.g. if path = /..../storage, then
|
|
# rel_path could be 2021/20210101-new-years-day-pics
|
|
# in_path: only in this structure, not DB, quick ref to the path this dir is in
|
|
################################################################################
|
|
class Dir(db.Model):
|
|
__tablename__ = "dir"
|
|
eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
|
|
rel_path = db.Column(db.String, unique=True )
|
|
in_path = db.relationship("Path", secondary="path_dir_link", uselist=False)
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, rel_path: {self.rel_path}, in_path: {self.in_path}>"
|
|
|
|
################################################################################
|
|
# Class describing Entry and in the DB (via sqlalchemy)
|
|
# an entry is the common bits between files and dirs
|
|
# type is a convenience var only in this class, not in DB
|
|
# {dir|file}_etails are convenience data for the relevant details from the Dir
|
|
# or File class - not in DB
|
|
# in_dir - is the Dir that this entry is located in (convenience for class only)
|
|
# FullPathOnFS(): method to get path on the FS for this Entry
|
|
################################################################################
|
|
class Entry(db.Model):
|
|
__tablename__ = "entry"
|
|
id = db.Column(db.Integer, db.Sequence('file_id_seq'), primary_key=True )
|
|
name = db.Column(db.String, unique=False, nullable=False )
|
|
type_id = db.Column(db.Integer, db.ForeignKey("file_type.id"))
|
|
type = db.relationship("FileType")
|
|
dir_details = db.relationship( "Dir", uselist=False )
|
|
file_details = db.relationship( "File", uselist=False )
|
|
in_dir = db.relationship ("Dir", secondary="entry_dir_link", uselist=False )
|
|
|
|
def FullPathOnFS(self):
|
|
if self.in_dir:
|
|
s=self.in_dir.in_path.path_prefix + '/'
|
|
if len(self.in_dir.rel_path) > 0:
|
|
s += self.in_dir.rel_path + '/'
|
|
s += self.name
|
|
# this occurs when we have a dir that is the root of a path
|
|
else:
|
|
s=self.dir_details.in_path.path_prefix
|
|
return s
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name: {self.name}, type={self.type}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}"
|
|
|
|
################################################################################
|
|
# Class describing File and in the DB (via sqlalchemy)
|
|
# all files are entries, this is the extra bits only for a file, of note:
|
|
# hash is unique for files, and used to validate duplicates
|
|
# woy == week of year, all date fields are used to sort/show content. Date
|
|
# info can be from exif, or file system, or file name (rarely)
|
|
# faces: convenience field to show connected face(s) for this file
|
|
################################################################################
|
|
class File(db.Model):
|
|
__tablename__ = "file"
|
|
eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
|
|
size_mb = db.Column(db.Integer, unique=False, nullable=False)
|
|
thumbnail = db.Column(db.String, unique=False, nullable=True)
|
|
hash = db.Column(db.Integer)
|
|
year = db.Column(db.Integer)
|
|
month = db.Column(db.Integer)
|
|
day = db.Column(db.Integer)
|
|
woy = db.Column(db.Integer)
|
|
faces = db.relationship ("Face", secondary="face_file_link" )
|
|
|
|
def __repr__(self):
|
|
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, year={self.year}, month={self.month}, day={self.day}, woy={self.woy}, faces={self.faces}>"
|
|
|
|
################################################################################
|
|
# Class describing FileType and in the DB (via sqlalchemy)
|
|
# pre-defined list of file types (image, dir, etc.)
|
|
################################################################################
|
|
class FileType(db.Model):
|
|
__tablename__ = "file_type"
|
|
id = db.Column(db.Integer, db.Sequence('file_type_id_seq'), primary_key=True )
|
|
name = db.Column(db.String, unique=True, nullable=False )
|
|
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, name={self.name}>"
|
|
|
|
################################################################################
|
|
# Class describing PA_JobManager_Message and in the DB (via sqlalchemy)
|
|
# the job manager can send a message back to the front end (this code) via the
|
|
# DB. has to be about a specific job_id and is success/danger, etc. (alert)
|
|
# and a message
|
|
################################################################################
|
|
class PA_JobManager_Message(db.Model):
|
|
__tablename__ = "pa_job_manager_fe_message"
|
|
id = db.Column(db.Integer, db.Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
|
|
job_id = db.Column(db.Integer, db.ForeignKey('job.id') )
|
|
alert = db.Column(db.String)
|
|
message = db.Column(db.String)
|
|
job = db.relationship ("Job" )
|
|
def __repr__(self):
|
|
return f"<id: {self.id}, job_id: {self.job_id}, alert: {self.alert}, message: {self.message}, job: {self.job}"
|
|
|
|
|
|
################################################################################
|
|
# GetJM_Message: used in html to display any message for this front-end
|
|
################################################################################
|
|
def GetJM_Message():
|
|
msg=PA_JobManager_Message.query.first()
|
|
return msg
|
|
|
|
################################################################################
|
|
# ClearJM_Message: used in html to clear any message just displayed
|
|
################################################################################
|
|
def ClearJM_Message(id):
|
|
PA_JobManager_Message.query.filter(PA_JobManager_Message.id==id).delete()
|
|
db.session.commit()
|
|
return
|
|
|
|
################################################################################
|
|
# util function to just update the current/first/last positions needed for
|
|
# viewing / using pa_user_state DB table
|
|
################################################################################
|
|
def UpdatePref( pref, OPT ):
|
|
last_used=datetime.now(pytz.utc)
|
|
if OPT.current>0:
|
|
pref.current=OPT.current
|
|
if OPT.first_eid>0:
|
|
pref.first_eid=OPT.first_eid
|
|
if OPT.last_eid>0:
|
|
pref.last_eid=OPT.last_eid
|
|
if OPT.num_entries>0:
|
|
pref.num_entries=OPT.num_entries
|
|
pref.last_used=last_used
|
|
db.session.add(pref)
|
|
db.session.commit()
|
|
|
|
################################################################################
|
|
# GetEntriesInFlatView: func. to retrieve DB entries appropriate for flat view
|
|
################################################################################
|
|
def GetEntriesInFlatView( OPT, prefix ):
|
|
entries=[]
|
|
num_entries=0
|
|
|
|
if OPT.noo == "Oldest":
|
|
entries+=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(File.year,File.month,File.day,Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
last_entry=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name.desc()).limit(1)
|
|
elif OPT.noo == "Newest":
|
|
entries+=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
last_entry=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(File.year,File.month,File.day,Entry.name).limit(1)
|
|
elif OPT.noo == "Z to A":
|
|
entries+=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(Entry.name.desc()).offset(OPT.offset).limit(OPT.how_many).all()
|
|
last_entry=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(Entry.name).limit(1)
|
|
else:
|
|
entries+=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
last_entry=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(Entry.name.desc()).limit(1)
|
|
if OPT.first_eid == 0 and OPT.offset == 0 and len(entries):
|
|
OPT.first_eid = entries[0].id
|
|
|
|
if OPT.last_eid==0:
|
|
num_entries=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).count()
|
|
le=last_entry.all()
|
|
if len(le):
|
|
OPT.last_eid = le[0].id
|
|
|
|
return entries, num_entries
|
|
|
|
################################################################################
|
|
# GetEntriesInFolderView: func. to retrieve DB entries appropriate for folder view
|
|
# read inline comments to deal with variations / ordering...
|
|
################################################################################
|
|
def GetEntriesInFolderView( OPT, prefix ):
|
|
entries=[]
|
|
num_entries=0
|
|
# okay the root cwd is fake, so treat it specially - its Dir can be found by path with dir.rel_path=''
|
|
if os.path.dirname(OPT.cwd) == 'static':
|
|
dir=Entry.query.join(Dir).join(PathDirLink).join(Path).filter(Dir.rel_path=='').filter(Path.path_prefix==prefix).order_by(Entry.name).first()
|
|
# this can occur if the path in settings does not exist as it wont be in # the DB
|
|
if not dir:
|
|
return entries, num_entries
|
|
# although this is 1 entry, needs to come back via all() to be iterable
|
|
entries+= Entry.query.filter(Entry.id==dir.id).all()
|
|
else:
|
|
rp = OPT.cwd.replace( prefix, '' )
|
|
# when in subdirs, replacing prefix will leave the first char as /, get rid of it
|
|
if len(rp) and rp[0] == '/':
|
|
rp=rp[1:]
|
|
dir=Entry.query.join(Dir).join(PathDirLink).join(Path).filter(Dir.rel_path==rp).filter(Path.path_prefix==prefix).order_by(Entry.name).first()
|
|
# this can occur if the path in settings does not exist as it wont be in # the DB
|
|
if not dir:
|
|
return entries, 0
|
|
if OPT.noo == "Z to A" or OPT.noo == "Newest":
|
|
entries+= Entry.query.join(EntryDirLink).join(FileType).filter(EntryDirLink.dir_eid==dir.id).filter(FileType.name=='Directory').order_by(Entry.name.desc()).all()
|
|
# just do A to Z / Oldest by default or if no valid option
|
|
else:
|
|
entries+= Entry.query.join(EntryDirLink).join(FileType).filter(EntryDirLink.dir_eid==dir.id).filter(FileType.name=='Directory').order_by(Entry.name).all()
|
|
|
|
# add any files at the current CWD (based on dir_eid in DB)
|
|
if OPT.noo == "Oldest":
|
|
file_entries=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(File.year,File.month,File.day,Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
last_entry=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name)
|
|
elif OPT.noo == "Newest":
|
|
file_entries=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
last_entry=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(File.year,File.month,File.day,Entry.name)
|
|
elif OPT.noo == "Z to A":
|
|
file_entries=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(Entry.name.desc()).offset(OPT.offset).limit(OPT.how_many).all()
|
|
last_entry=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(Entry.name)
|
|
# just do A to Z by default or if no valid option
|
|
else:
|
|
file_entries=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
last_entry=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(Entry.name.desc())
|
|
if OPT.offset == 0 and len(file_entries):
|
|
OPT.first_eid = file_entries[0].id
|
|
if len(file_entries):
|
|
num_entries=last_entry.count()
|
|
le=last_entry.limit(1).all()
|
|
if len(le):
|
|
OPT.last_eid = le[0].id
|
|
|
|
entries += file_entries;
|
|
return entries, num_entries
|
|
|
|
################################################################################
|
|
# /GetEntries -> helper function that Gets Entries for required files to show
|
|
# for several routes (files_ip, files_sp, files_rbp, search, viewlist)
|
|
################################################################################
|
|
def GetEntries( OPT ):
|
|
entries=[]
|
|
if OPT.path_type == 'Search' or (OPT.path_type == 'View' and OPT.orig_ptype=='Search'):
|
|
search_term=OPT.orig_search_term
|
|
if 'AI:' in search_term:
|
|
search_term = search_term.replace('AI:','')
|
|
all_entries = Entry.query.join(File).distinct().join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag.ilike(f"%{search_term}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
if OPT.last_eid == 0:
|
|
OPT.num_entries = Entry.query.join(File).distinct().join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag.ilike(f"%{search_term}%")).count()
|
|
last_entry=Entry.query.join(File).distinct().join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag.ilike(f"%{search_term}%")).order_by(File.year,File.month,File.day,Entry.name.desc()).limit(1).all()
|
|
if len(last_entry):
|
|
OPT.last_eid = last_entry[0].id
|
|
else:
|
|
file_data=Entry.query.join(File).filter(Entry.name.ilike(f"%{search_term}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
dir_data=Entry.query.join(File).join(EntryDirLink).join(Dir).filter(Dir.rel_path.ilike(f"%{search_term}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
ai_data=Entry.query.join(File).join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag.ilike(f"%{search_term}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(OPT.offset).limit(OPT.how_many).all()
|
|
|
|
# remove any duplicates from combined data
|
|
all_entries = []
|
|
for f in file_data:
|
|
all_entries.append(f)
|
|
for d in dir_data:
|
|
add_it=1
|
|
for f in file_data:
|
|
if d.name == f.name:
|
|
add_it=0
|
|
break
|
|
if add_it:
|
|
all_entries.append(d)
|
|
for a in ai_data:
|
|
add_it=1
|
|
for f in file_data:
|
|
if a.name == f.name:
|
|
add_it=0
|
|
break
|
|
if add_it:
|
|
all_entries.append(a)
|
|
|
|
# for all searches first_entry is worked out when first_eid not set yet & offset is 0 and we have some entries
|
|
if OPT.first_eid == 0 and OPT.offset == 0 and len(all_entries):
|
|
OPT.first_eid = all_entries[0].id
|
|
if OPT.last_eid == 0:
|
|
by_fname= f"select e.id from entry e where e.name ilike '%%{search_term}%%'"
|
|
by_dirname=f"select e.id from entry e, entry_dir_link edl where edl.entry_id = e.id and edl.dir_eid in ( select d.eid from dir d where d.rel_path ilike '%%{search_term}%%' )"
|
|
by_ai =f"select e.id from entry e, face_file_link ffl, face_refimg_link frl, person_refimg_link prl, person p where e.id = ffl.file_eid and frl.face_id = ffl.face_id and frl.refimg_id = prl.refimg_id and prl.person_id = p.id and p.tag ilike '%%{search_term}%%'"
|
|
|
|
sel_no_order=f"select e.*, f.* from entry e, file f where e.id=f.eid and e.id in ( {by_fname} union {by_dirname} union {by_ai} ) "
|
|
order_desc=f"f.year desc, f.month desc, f.day desc, e.name"
|
|
order_asc=f"f.year, f.month, f.day, e.name desc"
|
|
|
|
#num_entries
|
|
num_e_sql = f"select count(1) from ( {by_fname} union {by_dirname} union {by_ai} ) as foo"
|
|
num_e_result = db.engine.execute( num_e_sql )
|
|
for res in num_e_result:
|
|
OPT.num_entries=res.count
|
|
|
|
last_entry_sql= f"{sel_no_order} order by {order_asc} limit 1"
|
|
last_entry=db.engine.execute( last_entry_sql )
|
|
# can only be 1 due to limit above
|
|
for l in last_entry:
|
|
OPT.last_eid = l.id
|
|
# store first/last eid into prefs
|
|
pref=PA_UserState.query.filter(PA_UserState.pa_user_dn==current_user.dn,PA_UserState.path_type==OPT.path_type,PA_UserState.orig_ptype==OPT.orig_ptype,PA_UserState.orig_search_term==search_term).first()
|
|
UpdatePref( pref, OPT )
|
|
|
|
return all_entries
|
|
|
|
# if we are a view, then it will be of something else, e.g. a list of
|
|
# import, storage, or bin images, reset OPT.path_type so that the paths array below works
|
|
if 'View' in OPT.path_type:
|
|
eid = OPT.url[6:]
|
|
OPT.path_type= OPT.orig_ptype
|
|
|
|
paths = []
|
|
if OPT.path_type == 'Storage':
|
|
paths = SettingsSPath()
|
|
elif OPT.path_type == 'Import':
|
|
paths = SettingsIPath()
|
|
elif OPT.path_type == 'Bin':
|
|
paths.append(SettingsRBPath())
|
|
|
|
num_paths = len(paths)
|
|
|
|
num_entries=0
|
|
path_cnt=1
|
|
|
|
# if we have not set last_eid yet, then we need to 'reset' it during the
|
|
# path loop below (if we have more than one dir in (say) Import path)
|
|
if OPT.last_eid == 0 or OPT.folders:
|
|
update_last_eid = True
|
|
else:
|
|
update_last_eid = False
|
|
for path in paths:
|
|
if not os.path.exists(path):
|
|
continue
|
|
prefix = SymlinkName(OPT.path_type,path,path+'/')
|
|
if OPT.folders:
|
|
tmp_ents, tmp_num_ents = GetEntriesInFolderView( OPT, prefix )
|
|
else:
|
|
tmp_ents, tmp_num_ents = GetEntriesInFlatView( OPT, prefix )
|
|
entries += tmp_ents
|
|
num_entries += tmp_num_ents
|
|
# if we have another path, keep adding num_etnries, and last_eid is the last path, not this one, so reset to 0
|
|
if update_last_eid and path_cnt < num_paths:
|
|
OPT.last_eid=0
|
|
path_cnt += 1
|
|
|
|
if update_last_eid:
|
|
# find pref... via path_type if we are here
|
|
OPT.num_entries=num_entries
|
|
pref=PA_UserState.query.filter(PA_UserState.pa_user_dn==current_user.dn,PA_UserState.path_type==OPT.path_type).first()
|
|
UpdatePref( pref, OPT )
|
|
return entries
|
|
|
|
################################################################################
|
|
# /clear_jm_msg -> with a dismissable error (ie. anything not success, that is
|
|
# not showing duplicates (so rare errors) - allow them to be dismissed
|
|
################################################################################
|
|
@app.route("/clear_jm_msg/<id>", methods=["POST"])
|
|
@login_required
|
|
def clear_jm_msg(id):
|
|
ClearJM_Message(id)
|
|
return redirect( url_for("main_page") )
|
|
|
|
@app.route("/ChangeFileOpts", methods=["POST"])
|
|
@login_required
|
|
def ChangeFileOpts():
|
|
# reset options based on form post, then redirect back to orig page (with a GET to allow back button to work)
|
|
OPT=States( request )
|
|
return redirect( request.referrer )
|
|
|
|
################################################################################
|
|
# /file_list -> show detailed file list of files from import_path(s)
|
|
################################################################################
|
|
@app.route("/file_list_ip", methods=["GET", "POST"])
|
|
@login_required
|
|
def file_list_ip():
|
|
OPT=States( request )
|
|
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
|
|
if request.method=='POST':
|
|
redirect("/file_list_ip")
|
|
entries=GetEntries( OPT )
|
|
return render_template("file_list.html", page_title='View File Details (Import Path)', entry_data=entries, OPT=OPT )
|
|
|
|
################################################################################
|
|
# /files -> show thumbnail view of files from import_path(s)
|
|
################################################################################
|
|
@app.route("/files_ip", methods=["GET", "POST"])
|
|
@login_required
|
|
def files_ip():
|
|
OPT=States( request )
|
|
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
|
|
if request.method=='POST':
|
|
redirect("/files_ip")
|
|
entries=GetEntries( OPT )
|
|
people = Person.query.all()
|
|
move_paths = MovePathDetails()
|
|
return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", entry_data=entries, OPT=OPT, people=people, move_paths=move_paths )
|
|
|
|
################################################################################
|
|
# /files -> show thumbnail view of files from storage_path
|
|
################################################################################
|
|
@app.route("/files_sp", methods=["GET", "POST"])
|
|
@login_required
|
|
def files_sp():
|
|
OPT=States( request )
|
|
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
|
|
if request.method=='POST':
|
|
redirect("/files_sp")
|
|
entries=GetEntries( OPT )
|
|
people = Person.query.all()
|
|
move_paths = MovePathDetails()
|
|
return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", entry_data=entries, OPT=OPT, people=people, move_paths=move_paths )
|
|
|
|
|
|
################################################################################
|
|
# /files -> show thumbnail view of files from recycle_bin_path
|
|
################################################################################
|
|
@app.route("/files_rbp", methods=["GET", "POST"])
|
|
@login_required
|
|
def files_rbp():
|
|
OPT=States( request )
|
|
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
|
|
if request.method=='POST':
|
|
redirect("/files_rbp")
|
|
entries=GetEntries( OPT )
|
|
people = Person.query.all()
|
|
move_paths = MovePathDetails()
|
|
return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", entry_data=entries, OPT=OPT, move_paths=move_paths )
|
|
|
|
################################################################################
|
|
# search -> GET version -> has search_term in the URL and is therefore able to
|
|
# be used even if the user hits the front/back buttons in the browser.
|
|
# func shows thumbnails of matching files.
|
|
################################################################################
|
|
@app.route("/search/<search_term>", methods=["GET", "POST"])
|
|
@login_required
|
|
def search(search_term):
|
|
OPT=States( request )
|
|
# if we posted to get here, its a change in State, so save it to pa_user_state, and go back to the GET version or URL
|
|
if request.method=="POST":
|
|
redirect("/search/"+search_term)
|
|
OPT.search_term = search_term
|
|
# always show flat results for search to start with
|
|
OPT.folders=False
|
|
entries=GetEntries( OPT )
|
|
move_paths = MovePathDetails()
|
|
return render_template("files.html", page_title='View Files', search_term=search_term, entry_data=entries, OPT=OPT, move_paths=move_paths )
|
|
|
|
################################################################################
|
|
# /search -> POST version -> only used on form submit when you hit return. This
|
|
# form just redirects to a GET of /search/<search_term> to trip route above
|
|
################################################################################
|
|
@app.route("/search", methods=["POST"])
|
|
@login_required
|
|
def search_post():
|
|
return redirect( "/search/"+request.form['search_term'] )
|
|
|
|
################################################################################
|
|
# /files/scannow -> allows us to force a check for new files
|
|
################################################################################
|
|
@app.route("/files/scannow", methods=["GET"])
|
|
@login_required
|
|
def scannow():
|
|
job=NewJob("scannow" )
|
|
st.SetMessage("scanning for new files in: <a href=/job/{}>Job #{}</a> (Click the link to follow progress)".format( job.id, job.id) )
|
|
return render_template("base.html")
|
|
|
|
################################################################################
|
|
# /files/forcescan -> deletes old data in DB, and does a brand new scan
|
|
################################################################################
|
|
@app.route("/files/forcescan", methods=["GET"])
|
|
@login_required
|
|
def forcescan():
|
|
job=NewJob("forcescan" )
|
|
st.SetMessage("force scan & rebuild data for files in: <a href=/job/{}>Job #{}</a> (Click the link to follow progress)".format( job.id, job.id) )
|
|
return render_template("base.html")
|
|
|
|
################################################################################
|
|
# /files/scan_sp -> allows us to force a check for new files
|
|
################################################################################
|
|
@app.route("/files/scan_sp", methods=["GET"])
|
|
@login_required
|
|
def scan_sp():
|
|
job=NewJob("scan_sp" )
|
|
st.SetMessage("scanning for new files in: <a href=/job/{}>Job #{}</a> (Click the link to follow progress)".format( job.id, job.id) )
|
|
return render_template("base.html")
|
|
|
|
|
|
################################################################################
|
|
# /fix_dups -> use sql to find duplicates based on same hash, different
|
|
# filenames, or directories. Pass this straight through to the job manager
|
|
# as job extras to a new job.
|
|
################################################################################
|
|
@app.route("/fix_dups", methods=["POST"])
|
|
@login_required
|
|
def fix_dups():
|
|
rows = db.engine.execute( "select e1.id as id1, f1.hash, d1.rel_path as rel_path1, d1.eid as did1, e1.name as fname1, p1.id as path1, p1.type_id as path_type1, e2.id as id2, d2.rel_path as rel_path2, d2.eid as did2, e2.name as fname2, p2.id as path2, p2.type_id as path_type2 from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb order by path1, rel_path1, fname1");
|
|
|
|
if rows.returns_rows == False:
|
|
st.SetMessage(f"Err, no dups - should now clear the FE 'danger' message?")
|
|
return redirect("/")
|
|
|
|
if 'pagesize' not in request.form:
|
|
# default to 10, see if we have a larger value as someone reset it in the gui, rather than first time invoked
|
|
pagesize = 10
|
|
jexes = JobExtra.query.join(Job).filter(Job.name=='checkdups').filter(Job.pa_job_state=='New').all()
|
|
jexes.append( JobExtra( name="pagesize", value=pagesize ) )
|
|
else:
|
|
pagesize=int(request.form['pagesize'])
|
|
DD=Duplicates()
|
|
for row in rows:
|
|
DD.AddDup( row )
|
|
|
|
DD.SecondPass()
|
|
# DD.Dump()
|
|
|
|
return render_template("dups.html", DD=DD, pagesize=pagesize )
|
|
|
|
################################################################################
|
|
# /rm_dups -> f/e that shows actual duplicates so that we can delete some dups
|
|
# this code creates a new job with extras that have hashes/ids to allow removal
|
|
################################################################################
|
|
@app.route("/rm_dups", methods=["POST"])
|
|
@login_required
|
|
def rm_dups():
|
|
|
|
jex=[]
|
|
for el in request.form:
|
|
if 'kfhash-' in el:
|
|
# get which row/number kf it is...
|
|
_, which = el.split('-')
|
|
jex.append( JobExtra( name=f"kfid-{which}", value=request.form['kfid-'+which] ) )
|
|
jex.append( JobExtra( name=f"kfhash-{which}", value=request.form[el] ) )
|
|
if 'kdhash-' in el:
|
|
# get which row/number kd it is...
|
|
_, which = el.split('-')
|
|
jex.append( JobExtra( name=f"kdid-{which}", value=request.form['kdid-'+which] ) )
|
|
jex.append( JobExtra( name=f"kdhash-{which}", value=request.form[el] ) )
|
|
|
|
jex.append( JobExtra( name="pagesize", value=10 ) )
|
|
|
|
job=NewJob( "rmdups", 0, None, jex )
|
|
st.SetMessage( f"Created <a href=/job/{job.id}>Job #{job.id}</a> to delete duplicate files")
|
|
|
|
return redirect("/jobs")
|
|
|
|
################################################################################
|
|
# /restore_files -> create a job to restore files for the b/e to process
|
|
################################################################################
|
|
@app.route("/restore_files", methods=["POST"])
|
|
@login_required
|
|
def restore_files():
|
|
jex=[]
|
|
for el in request.form:
|
|
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
|
|
|
|
job=NewJob( "restore_files", 0, None, jex )
|
|
st.SetMessage( f"Created <a href=/job/{job.id}>Job #{job.id}</a> to restore selected file(s)")
|
|
return redirect("/jobs")
|
|
|
|
################################################################################
|
|
# /delete_files -> create a job to delete files for the b/e to process
|
|
################################################################################
|
|
@app.route("/delete_files", methods=["POST"])
|
|
@login_required
|
|
def delete_files():
|
|
jex=[]
|
|
for el in request.form:
|
|
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
|
|
|
|
job=NewJob( "delete_files", 0, None, jex )
|
|
st.SetMessage( f"Created <a href=/job/{job.id}>Job #{job.id}</a> to delete selected file(s)")
|
|
return redirect("/jobs")
|
|
|
|
################################################################################
|
|
# /move_files -> create a job to move files for the b/e to process
|
|
################################################################################
|
|
@app.route("/move_files", methods=["POST"])
|
|
@login_required
|
|
def move_files():
|
|
|
|
jex=[]
|
|
for el in request.form:
|
|
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
|
|
job=NewJob( "move_files", 0, None, jex )
|
|
st.SetMessage( f"Created <a href=/job/{job.id}>Job #{job.id}</a> to move selected file(s)")
|
|
return redirect("/jobs")
|
|
|
|
@login_required
|
|
@app.route("/viewlist", methods=["POST"])
|
|
def viewlist():
|
|
OPT=States( request )
|
|
# Get next/prev set of data - e.g. if next set, then it will use orig_url
|
|
# to go forward how_many from offset and then use viewer.html to show that
|
|
# first obj of the new list of entries
|
|
entries=GetEntries( OPT )
|
|
# this occurs when we went from the last image on a page (with how_many on
|
|
# it) and it just happened to also be the last in the DB...
|
|
if not entries:
|
|
print("DDP: DONT think this can happen anymore")
|
|
|
|
# undo the skip by how_many and getentries again
|
|
OPT.offset -= int(OPT.how_many)
|
|
entries=GetEntries( OPT )
|
|
# now flag we are at the last in db, to reset current below
|
|
objs = {}
|
|
eids=""
|
|
resp={}
|
|
resp['objs']={}
|
|
for e in entries:
|
|
if not e.file_details:
|
|
continue
|
|
eids=eids+f"{e.id},"
|
|
resp['objs'][e.id]={}
|
|
resp['objs'][e.id]['url'] = e.FullPathOnFS()
|
|
resp['objs'][e.id]['name'] = e.name
|
|
resp['objs'][e.id]['type'] = e.type.name
|
|
if e.file_details.faces:
|
|
resp['objs'][e.id]['face_model'] = e.file_details.faces[0].facefile_lnk.model_used
|
|
resp['objs'][e.id]['faces'] = []
|
|
|
|
# put locn data back into array format
|
|
fid=0
|
|
for face in e.file_details.faces:
|
|
tmp_locn = json.loads(face.locn)
|
|
fd= {}
|
|
fd['x'] = tmp_locn[3]
|
|
fd['y'] = tmp_locn[0]
|
|
fd['w'] = tmp_locn[1]-tmp_locn[3]
|
|
fd['h'] = tmp_locn[2]-tmp_locn[0]
|
|
if face.refimg:
|
|
fd['who'] = face.refimg.person.tag
|
|
fd['distance'] = round(face.refimg_lnk.face_distance,2)
|
|
resp['objs'][e.id]['faces'].append(fd)
|
|
fid+=1
|
|
|
|
eids=eids.rstrip(",")
|
|
lst = eids.split(',')
|
|
if 'next' in request.form:
|
|
OPT.current = int(lst[0])
|
|
if 'prev' in request.form:
|
|
OPT.current = int(lst[-1])
|
|
|
|
resp['current']=OPT.current
|
|
# OPT.first_eid can still be 0 IF we have gone past the first page, I could
|
|
# better set this in states rather than kludge this if... think about it
|
|
if OPT.first_eid>0:
|
|
resp['first_eid']=OPT.first_eid
|
|
resp['last_eid']=OPT.last_eid
|
|
resp['eids']=eids
|
|
resp['offset']=OPT.offset
|
|
# save pref to keep the new current value, first/last
|
|
pref=PA_UserState.query.filter(PA_UserState.pa_user_dn==current_user.dn,PA_UserState.orig_ptype==OPT.orig_ptype,PA_UserState.view_eid==OPT.view_eid).first()
|
|
UpdatePref( pref, OPT )
|
|
|
|
return resp
|
|
|
|
################################################################################
|
|
# /view/id -> grabs data from DB and views it (GET)
|
|
################################################################################
|
|
@login_required
|
|
@app.route("/view/<id>", methods=["GET"])
|
|
def view(id):
|
|
OPT=States( request )
|
|
objs = {}
|
|
entries=GetEntries( OPT )
|
|
eids=""
|
|
for e in entries:
|
|
objs[e.id]=e
|
|
eids += f"{e.id},"
|
|
# if this is a dir, we wont view it with a click anyway, so move on...
|
|
if not e.file_details:
|
|
continue
|
|
# put locn data back into array format
|
|
for face in e.file_details.faces:
|
|
face.locn = json.loads(face.locn)
|
|
eids=eids.rstrip(",")
|
|
# jic, sometimes we trip this, and rather than show broken pages / destroy
|
|
# face locn data, just warn & redirect
|
|
if id not in eids:
|
|
print( f"ERROR: viewing an id, but its not in eids OPT={OPT}, id={id}, eids={eids}")
|
|
st.SetMessage("Sorry, viewing data is confused, cannot view this image now", "warning" )
|
|
return redirect("/")
|
|
else:
|
|
return render_template("viewer.html", current=int(id), eids=eids, objs=objs, OPT=OPT )
|
|
|
|
##################################################################################
|
|
# /view/id -> grabs data from DB and views it (POST -> set state, redirect to GET)
|
|
##################################################################################
|
|
@app.route("/view/<id>", methods=["POST"])
|
|
@login_required
|
|
def view_img_post(id):
|
|
# set pa_user_states...
|
|
OPT=States( request )
|
|
# then use back-button friendly URL (and use pa_user_states to view the right image in the right list
|
|
return redirect( "/view/" + id );
|
|
|
|
# route called from front/end - if multiple images are being transformed, each transorm == a separate call
|
|
# to this route (and therefore a separate transorm job. Each reponse allows the f/e to check the
|
|
# specific transorm job is finished (/checktransformjob) which will be called (say) every 1 sec. from f/e
|
|
# with a spinning wheel, then when pa_job_mgr has finished it will return the transformed thumb
|
|
@app.route("/transform", methods=["POST"])
|
|
@login_required
|
|
def transform():
|
|
id = request.form['id']
|
|
amt = request.form['amt']
|
|
|
|
jex=[]
|
|
for el in request.form:
|
|
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
|
|
|
|
job=NewJob( "transform_image", 0, None, jex )
|
|
|
|
resp={}
|
|
resp['job_id']=job.id
|
|
|
|
return resp
|
|
|
|
################################################################################
|
|
# /checktransformjob -> URL that is called repeatedly by front-end waiting for the
|
|
# b/e to finish the transform job. Once done, the new / now
|
|
# transformed image's thumbnail is returned so the f/e can
|
|
# update with it
|
|
################################################################################
|
|
@app.route("/checktransformjob", methods=["POST"])
|
|
@login_required
|
|
def checktransformjob():
|
|
job_id = request.form['job_id']
|
|
job = Job.query.get(job_id)
|
|
resp={}
|
|
resp['finished']=False
|
|
if job.pa_job_state == 'Completed':
|
|
id=[jex.value for jex in job.extra if jex.name == "id"][0]
|
|
e=Entry.query.join(File).filter(Entry.id==id).first()
|
|
resp['thumbnail']=e.file_details.thumbnail
|
|
resp['finished']=True
|
|
return resp
|
|
|
|
################################################################################
|
|
# /include -> return contents on /include and does not need a login, so we
|
|
# can get the icon, and potentially any js, bootstrap, etc. needed for the login page
|
|
################################################################################
|
|
@app.route("/internal/<path:filename>")
|
|
def internal(filename):
|
|
return send_from_directory("internal/", filename)
|
|
|
|
################################################################################
|
|
# /static -> returns the contents of any file referenced inside /static.
|
|
# we create/use symlinks in static/ to reference the images to show
|
|
################################################################################
|
|
@app.route("/static/<filename>")
|
|
@login_required
|
|
def custom_static(filename):
|
|
return send_from_directory("static/", filename)
|
|
|
|
###############################################################################
|
|
# This func creates a new filter in jinja2 to test to see if the Dir being
|
|
# checked, is a top-level folder of 'OPT.cwd'
|
|
################################################################################
|
|
@app.template_filter('TopLevelFolderOf')
|
|
def _jinja2_filter_toplevelfolderof(path, cwd):
|
|
if os.path.dirname(path) == cwd:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
###############################################################################
|
|
# This func creates a new filter in jinja2 to test to hand back the parent path
|
|
# from a given path
|
|
################################################################################
|
|
@app.template_filter('ParentPath')
|
|
def _jinja2_filter_parentpath(path):
|
|
return os.path.dirname(path)
|
|
|
|
###############################################################################
|
|
# route to allow the Move Dialog Box to pass a date (YYYYMMDD) and returns a
|
|
# json? list of existing dir names that could be near it in time. Starting
|
|
# simple, by using YYYYMM-1, YYYYMM, YYYYMM+1 dirs
|
|
###############################################################################
|
|
@app.route("/getexistingpaths/<dt>", methods=["POST"])
|
|
@login_required
|
|
def GetExistingPathsAsDiv(dt):
|
|
dir_ft=FileType.query.filter(FileType.name=='Directory').first()
|
|
dirs_arr=[]
|
|
for delta in range(-7, 8):
|
|
try:
|
|
new_dtime=datetime.datetime.strptime(dt, "%Y%m%d") + datetime.timedelta(days=delta)
|
|
except:
|
|
# this is not a date, so we cant work out possible dirs, just
|
|
# return an empty set
|
|
return "[]"
|
|
new_dt=new_dtime.strftime('%Y%m%d')
|
|
dirs_arr+=Dir.query.distinct(Dir.rel_path).filter(Dir.rel_path.ilike('%'+new_dt+'%')).all();
|
|
dirs_arr+=Dir.query.distinct(Dir.rel_path).join(EntryDirLink).join(Entry).filter(Entry.type_id!=dir_ft.id).filter(Entry.name.ilike('%'+new_dt+'%')).all()
|
|
|
|
# remove duplicates from array
|
|
dirs = set(dirs_arr)
|
|
|
|
# turn DB output into json and return it to the f/e
|
|
ret='[ '
|
|
first_dir=1
|
|
for dir in dirs:
|
|
if not first_dir:
|
|
ret +=", "
|
|
bits=dir.rel_path.split('-')
|
|
|
|
ret+= '{ '
|
|
ret+= '"prefix":"' + bits[0] + '", '
|
|
if len(bits)>1:
|
|
ret+= '"suffix":"' + bits[1] + '"'
|
|
else:
|
|
ret+= '"suffix":"''"'
|
|
ret+= ' } '
|
|
first_dir=0
|
|
ret+= ' ]'
|
|
return ret
|