Files
photoassistant/files.py

843 lines
39 KiB
Python

from wtforms import SubmitField, StringField, HiddenField, validators, Form
from flask_wtf import FlaskForm
from flask import request, render_template, redirect, send_from_directory, url_for, jsonify, make_response
from main import db, app, ma
from sqlalchemy import Sequence
from sqlalchemy.exc import SQLAlchemyError
from status import st, Status
import os
import glob
from PIL import Image
from pymediainfo import MediaInfo
import hashlib
import exifread
import base64
import numpy
import cv2
import time
import re
from datetime import datetime
import pytz
from flask_login import login_required, current_user
from states import States, PA_UserState
################################################################################
# Local Class imports
################################################################################
from job import Job, JobExtra, Joblog, NewJob
from path import PathType, Path, MovePathDetails
from person import Refimg, Person, PersonRefimgLink
from settings import Settings, SettingsIPath, SettingsSPath, SettingsRBPath
from shared import SymlinkName
from dups import Duplicates
from face import Face, FaceFileLink, FaceRefimgLink, FaceOverrideType, FaceNoMatchOverride, FaceForceMatchOverride
# pylint: disable=no-member
################################################################################
# Class describing PathDirLink and in the DB (via sqlalchemy)
# connects the entry (dir) with a path
################################################################################
class PathDirLink(db.Model):
__tablename__ = "path_dir_link"
path_id = db.Column(db.Integer, db.ForeignKey("path.id"), primary_key=True )
dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return f"<path_id: {self.path_id}, dir_eid: {self.dir_eid}>"
################################################################################
# Class describing EntryDirLInk and in the DB (via sqlalchemy)
# connects (many) entry contained in a directory (which is also an entry)
################################################################################
class EntryDirLink(db.Model):
__tablename__ = "entry_dir_link"
entry_id = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
################################################################################
# Class describing Dir and in the DB (via sqlalchemy)
# rel_path: rest of dir after path, e.g. if path = /..../storage, then
# rel_path could be 2021/20210101-new-years-day-pics
# in_path: only in this structure, not DB, quick ref to the path this dir is in
################################################################################
class Dir(db.Model):
__tablename__ = "dir"
eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
rel_path = db.Column(db.String, unique=True )
in_path = db.relationship("Path", secondary="path_dir_link", uselist=False)
def __repr__(self):
return f"<eid: {self.eid}, rel_path: {self.rel_path}, in_path: {self.in_path}>"
################################################################################
# Class describing Entry and in the DB (via sqlalchemy)
# an entry is the common bits between files and dirs
# type is a convenience var only in this class, not in DB
# {dir|file}_etails are convenience data for the relevant details from the Dir
# or File class - not in DB
# in_dir - is the Dir that this entry is located in (convenience for class only)
# FullPathOnFS(): method to get path on the FS for this Entry
################################################################################
class Entry(db.Model):
__tablename__ = "entry"
id = db.Column(db.Integer, db.Sequence('file_id_seq'), primary_key=True )
name = db.Column(db.String, unique=False, nullable=False )
type_id = db.Column(db.Integer, db.ForeignKey("file_type.id"))
type = db.relationship("FileType")
dir_details = db.relationship( "Dir", uselist=False )
file_details = db.relationship( "File", uselist=False )
in_dir = db.relationship ("Dir", secondary="entry_dir_link", uselist=False )
def FullPathOnFS(self):
if self.in_dir:
s=self.in_dir.in_path.path_prefix + '/'
if len(self.in_dir.rel_path) > 0:
s += self.in_dir.rel_path + '/'
s += self.name
# this occurs when we have a dir that is the root of a path
else:
s=self.dir_details.in_path.path_prefix
return s
def __repr__(self):
return f"<id: {self.id}, name: {self.name}, type={self.type}, dir_details={self.dir_details}, file_details={self.file_details}, in_dir={self.in_dir}"
################################################################################
# Class describing File and in the DB (via sqlalchemy)
# all files are entries, this is the extra bits only for a file, of note:
# hash is unique for files, and used to validate duplicates
# woy == week of year, all date fields are used to sort/show content. Date
# info can be from exif, or file system, or file name (rarely)
# faces: convenience field to show connected face(s) for this file
################################################################################
class File(db.Model):
__tablename__ = "file"
eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
size_mb = db.Column(db.Integer, unique=False, nullable=False)
thumbnail = db.Column(db.String, unique=False, nullable=True)
hash = db.Column(db.Integer)
year = db.Column(db.Integer)
month = db.Column(db.Integer)
day = db.Column(db.Integer)
woy = db.Column(db.Integer)
faces = db.relationship ("Face", secondary="face_file_link" )
def __repr__(self):
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, year={self.year}, month={self.month}, day={self.day}, woy={self.woy}, faces={self.faces}>"
################################################################################
# Class describing FileType and in the DB (via sqlalchemy)
# pre-defined list of file types (image, dir, etc.)
################################################################################
class FileType(db.Model):
__tablename__ = "file_type"
id = db.Column(db.Integer, db.Sequence('file_type_id_seq'), primary_key=True )
name = db.Column(db.String, unique=True, nullable=False )
def __repr__(self):
return f"<id: {self.id}, name={self.name}>"
################################################################################
# util function to just update the current/first/last positions needed for
# viewing / using pa_user_state DB table
################################################################################
def UpdatePref( pref, OPT ):
last_used=datetime.now(pytz.utc)
if OPT.current>0:
pref.current=OPT.current
if OPT.first_eid>0:
pref.first_eid=OPT.first_eid
if OPT.last_eid>0:
pref.last_eid=OPT.last_eid
if OPT.num_entries>0:
pref.num_entries=OPT.num_entries
pref.last_used=last_used
db.session.add(pref)
db.session.commit()
################################################################################
# GetEntriesInFlatView: func. to retrieve DB entries appropriate for flat view
################################################################################
def GetEntriesInFlatView( OPT, prefix ):
entries=[]
num_entries=0
join = "Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix)"
entries = eval( f"{join}.{OPT.order}.offset({OPT.offset}).limit({OPT.how_many}).all()" )
if OPT.first_eid == 0 and OPT.offset == 0 and len(entries):
OPT.first_eid = entries[0].id
if OPT.last_eid==0:
num_entries = eval( f"{join}.count()" )
last_entry = eval( f"{join}.{OPT.last_order}.limit(1).first()" )
if last_entry:
OPT.last_eid = last_entry.id
return entries, num_entries
################################################################################
# GetEntriesInFolderView: func. to retrieve DB entries appropriate for folder view
# read inline comments to deal with variations / ordering...
################################################################################
def GetEntriesInFolderView( OPT, prefix ):
entries=[]
num_entries=0
# okay the root cwd is fake, so treat it specially - its Dir can be found by path with dir.rel_path=''
if os.path.dirname(OPT.cwd) == 'static':
dir=Entry.query.join(Dir).join(PathDirLink).join(Path).filter(Dir.rel_path=='').filter(Path.path_prefix==prefix).order_by(Entry.name).first()
# this can occur if the path in settings does not exist as it wont be in # the DB
if not dir:
return entries, num_entries
# although this is 1 entry, needs to come back via all() to be iterable
entries+= Entry.query.filter(Entry.id==dir.id).all()
else:
rp = OPT.cwd.replace( prefix, '' )
# when in subdirs, replacing prefix will leave the first char as /, get rid of it
if len(rp) and rp[0] == '/':
rp=rp[1:]
dir=Entry.query.join(Dir).join(PathDirLink).join(Path).filter(Dir.rel_path==rp).filter(Path.path_prefix==prefix).order_by(Entry.name).first()
# this can occur if the path in settings does not exist as it wont be in # the DB
if not dir:
return entries, 0
# dirs cant be sorted by date really, so do best I can for now
if OPT.noo == "Z to A" or OPT.noo == "Newest":
entries+= Entry.query.join(EntryDirLink).join(FileType).filter(EntryDirLink.dir_eid==dir.id).filter(FileType.name=='Directory').order_by(Entry.name.desc()).all()
# just do A to Z / Oldest by default or if no valid option
else:
entries+= Entry.query.join(EntryDirLink).join(FileType).filter(EntryDirLink.dir_eid==dir.id).filter(FileType.name=='Directory').order_by(Entry.name).all()
# add any files at the current CWD (based on dir_eid in DB)
join="Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id)"
file_entries= eval( f"{join}.{OPT.order}.offset(OPT.offset).limit(OPT.how_many).all()")
if OPT.offset == 0 and len(file_entries):
OPT.first_eid = file_entries[0].id
num_entries = eval( f"{join}.count()" )
last_entry = eval( f"{join}.{OPT.last_order}.limit(1).first()" )
if last_entry:
OPT.last_eid = last_entry.id
entries += file_entries;
return entries, num_entries
################################################################################
# GetEntriesInSearchView: func. to retrieve DB entries appropriate for Search view
# Defaults search is for any matching filename, contents of any matching dirname
# and any match with AI / face for that term. Explicit, only AI match via
# AI:<tag> syntax
################################################################################
def GetEntriesInSearchView( OPT ):
search_term=OPT.orig_search_term
if 'AI:' in OPT.orig_search_term:
search_term = search_term.replace('AI:','')
join=f"Entry.query.join(File).distinct().join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag.ilike('%{search_term}%'))"
if 'AI:' in OPT.orig_search_term:
all_entries = eval( f"{join}.{OPT.order}.offset(OPT.offset).limit(OPT.how_many).all()")
else:
file_data=eval( f"Entry.query.join(File).filter(Entry.name.ilike('%{search_term}%')).{OPT.order}.offset({OPT.offset}).limit({OPT.how_many}).all()" )
dir_data =eval( f"Entry.query.join(File).join(EntryDirLink).join(Dir).filter(Dir.rel_path.ilike('%{search_term}%')).{OPT.order}.offset({OPT.offset}).limit({OPT.how_many}).all()" )
ai_data =eval( f"{join}.{OPT.order}.offset({OPT.offset}).limit({OPT.how_many}).all()")
# remove any duplicates from combined data
all_entries = []
for f in file_data:
all_entries.append(f)
for d in dir_data:
add_it=1
for f in file_data:
if d.name == f.name:
add_it=0
break
if add_it:
all_entries.append(d)
for a in ai_data:
add_it=1
for f in file_data:
if a.name == f.name:
add_it=0
break
if add_it:
all_entries.append(a)
# for all searches first_entry is worked out when first_eid not set yet & offset is 0 and we have some entries
if OPT.first_eid == 0 and OPT.offset == 0 and len(all_entries):
OPT.first_eid = all_entries[0].id
if OPT.last_eid == 0:
by_fname= f"select e.id from entry e where e.name ilike '%%{search_term}%%'"
by_dirname=f"select e.id from entry e, entry_dir_link edl where edl.entry_id = e.id and edl.dir_eid in ( select d.eid from dir d where d.rel_path ilike '%%{search_term}%%' )"
by_ai =f"select e.id from entry e, face_file_link ffl, face_refimg_link frl, person_refimg_link prl, person p where e.id = ffl.file_eid and frl.face_id = ffl.face_id and frl.refimg_id = prl.refimg_id and prl.person_id = p.id and p.tag ilike '%%{search_term}%%'"
if 'AI:' in OPT.orig_search_term:
sel_no_order=f"select e.*, f.* from entry e, file f where e.id=f.eid and e.id in ( {by_ai} ) "
else:
sel_no_order=f"select e.*, f.* from entry e, file f where e.id=f.eid and e.id in ( {by_fname} union {by_dirname} union {by_ai} ) "
#num_entries
num_e_sql = f"select count(1) from ( {by_fname} union {by_dirname} union {by_ai} ) as foo"
num_e_result = db.engine.execute( num_e_sql )
for res in num_e_result:
OPT.num_entries=res.count
last_entry_sql= f"{sel_no_order} order by {OPT.last_order_raw} limit 1"
last_entry=db.engine.execute( last_entry_sql )
# can only be 1 due to limit above
for l in last_entry:
OPT.last_eid = l.id
# store first/last eid into prefs
pref=PA_UserState.query.filter(PA_UserState.pa_user_dn==current_user.dn,PA_UserState.path_type==OPT.path_type,PA_UserState.orig_ptype==OPT.orig_ptype,PA_UserState.orig_search_term==OPT.orig_search_term).first()
UpdatePref( pref, OPT )
return all_entries
################################################################################
# set up "order strings" to use in ORM and raw queries as needed for
# GetEntries*Search*, GetEntries*Flat*, GetEntries*Fold*
################################################################################
def SetOrderStrings( OPT ):
if OPT.noo == "Newest":
OPT.order="order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name.desc())"
OPT.last_order="order_by(File.year,File.month,File.day,Entry.name)"
OPT.last_order_raw=f"f.year, f.month, f.day, e.name"
elif OPT.noo == "Oldest":
OPT.order="order_by(File.year,File.month,File.day,Entry.name)"
OPT.last_order="order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name.desc())"
OPT.last_order_raw=f"f.year desc, f.month desc, f.day desc, e.name desc"
elif OPT.noo == "Z to A":
OPT.order="order_by(Entry.name.desc())"
OPT.last_order="order_by(Entry.name)"
OPT.last_order_raw=f"e.name"
else:
# A to Z
OPT.order="order_by(Entry.name)"
OPT.last_order="order_by(Entry.name.desc())"
OPT.last_order_raw=f"e.name desc"
return
################################################################################
# /GetEntries -> helper function that Gets Entries for required files to show
# for several routes (files_ip, files_sp, files_rbp, search, viewlist)
################################################################################
def GetEntries( OPT ):
entries=[]
SetOrderStrings( OPT )
if OPT.path_type == 'Search' or (OPT.path_type == 'View' and OPT.orig_ptype=='Search'):
return GetEntriesInSearchView( OPT )
# if we are a view, then it will be of something else, e.g. a list of
# import, storage, or bin images, reset OPT.path_type so that the paths array below works
if 'View' in OPT.path_type:
eid = OPT.url[6:]
OPT.path_type= OPT.orig_ptype
paths = []
if OPT.path_type == 'Storage':
path = SettingsSPath()
elif OPT.path_type == 'Import':
path = SettingsIPath()
elif OPT.path_type == 'Bin':
path = SettingsRBPath()
num_entries=0
path_cnt=1
# if we have not set last_eid yet, then we need to 'reset' it during the
# path loop below (if we have more than one dir in (say) Import path)
if OPT.last_eid == 0 or OPT.folders:
update_last_eid = True
else:
update_last_eid = False
prefix = SymlinkName(OPT.path_type,path,path+'/')
if OPT.folders:
tmp_ents, tmp_num_ents = GetEntriesInFolderView( OPT, prefix )
else:
tmp_ents, tmp_num_ents = GetEntriesInFlatView( OPT, prefix )
entries += tmp_ents
num_entries += tmp_num_ents
if update_last_eid:
# find pref... via path_type if we are here
OPT.num_entries=num_entries
pref=PA_UserState.query.filter(PA_UserState.pa_user_dn==current_user.dn,PA_UserState.path_type==OPT.path_type).first()
UpdatePref( pref, OPT )
return entries
################################################################################
# /clear_jm_msg -> with a dismissable error (ie. anything not success, that is
# not showing duplicates (so rare errors) - allow them to be dismissed
################################################################################
@app.route("/clear_jm_msg/<id>", methods=["POST"])
@login_required
def clear_jm_msg(id):
ClearJM_Message(id)
return redirect( url_for("main_page") )
@app.route("/ChangeFileOpts", methods=["POST"])
@login_required
def ChangeFileOpts():
# reset options based on form post, then redirect back to orig page (with a GET to allow back button to work)
OPT=States( request )
return redirect( request.referrer )
################################################################################
# /file_list -> show detailed file list of files from import_path(s)
################################################################################
@app.route("/file_list_ip", methods=["GET", "POST"])
@login_required
def file_list_ip():
OPT=States( request )
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
if request.method=='POST':
redirect("/file_list_ip")
entries=GetEntries( OPT )
return render_template("file_list.html", page_title='View File Details (Import Path)', entry_data=entries, OPT=OPT )
################################################################################
# /files -> show thumbnail view of files from import_path(s)
################################################################################
@app.route("/files_ip", methods=["GET", "POST"])
@login_required
def files_ip():
OPT=States( request )
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
if request.method=='POST':
redirect("/files_ip")
entries=GetEntries( OPT )
people = Person.query.all()
move_paths = MovePathDetails()
return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", entry_data=entries, OPT=OPT, people=people, move_paths=move_paths )
################################################################################
# /files -> show thumbnail view of files from storage_path
################################################################################
@app.route("/files_sp", methods=["GET", "POST"])
@login_required
def files_sp():
OPT=States( request )
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
if request.method=='POST':
redirect("/files_sp")
entries=GetEntries( OPT )
people = Person.query.all()
move_paths = MovePathDetails()
return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", entry_data=entries, OPT=OPT, people=people, move_paths=move_paths )
################################################################################
# /files -> show thumbnail view of files from recycle_bin_path
################################################################################
@app.route("/files_rbp", methods=["GET", "POST"])
@login_required
def files_rbp():
OPT=States( request )
# now we have reset the offset, etc. into the prefs, we can use a GET and this will be back/forward browser button safe
if request.method=='POST':
redirect("/files_rbp")
entries=GetEntries( OPT )
people = Person.query.all()
move_paths = MovePathDetails()
return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", entry_data=entries, OPT=OPT, move_paths=move_paths )
################################################################################
# search -> GET version -> has search_term in the URL and is therefore able to
# be used even if the user hits the front/back buttons in the browser.
# func shows thumbnails of matching files.
################################################################################
@app.route("/search/<search_term>", methods=["GET", "POST"])
@login_required
def search(search_term):
OPT=States( request )
# if we posted to get here, its a change in State, so save it to pa_user_state, and go back to the GET version or URL
if request.method=="POST":
redirect("/search/"+search_term)
OPT.search_term = search_term
# always show flat results for search to start with
OPT.folders=False
entries=GetEntries( OPT )
move_paths = MovePathDetails()
return render_template("files.html", page_title='View Files', search_term=search_term, entry_data=entries, OPT=OPT, move_paths=move_paths )
################################################################################
# /files/scannow -> allows us to force a check for new files
################################################################################
@app.route("/files/scannow", methods=["GET"])
@login_required
def scannow():
job=NewJob("scannow" )
st.SetMessage("scanning for new files in:&nbsp;<a href=/job/{}>Job #{}</a>&nbsp;(Click the link to follow progress)".format( job.id, job.id) )
return redirect("/jobs")
################################################################################
# /files/forcescan -> deletes old data in DB, and does a brand new scan
################################################################################
@app.route("/files/forcescan", methods=["GET"])
@login_required
def forcescan():
job=NewJob("forcescan" )
st.SetMessage("force scan & rebuild data for files in:&nbsp;<a href=/job/{}>Job #{}</a>&nbsp;(Click the link to follow progress)".format( job.id, job.id) )
return redirect("/jobs")
################################################################################
# /files/scan_sp -> allows us to force a check for new files
################################################################################
@app.route("/files/scan_sp", methods=["GET"])
@login_required
def scan_sp():
job=NewJob("scan_sp" )
st.SetMessage("scanning for new files in:&nbsp;<a href=/job/{}>Job #{}</a>&nbsp;(Click the link to follow progress)".format( job.id, job.id) )
return redirect("/jobs")
################################################################################
# /fix_dups -> use sql to find duplicates based on same hash, different
# filenames, or directories. Pass this straight through to the job manager
# as job extras to a new job.
################################################################################
@app.route("/fix_dups", methods=["POST"])
@login_required
def fix_dups():
rows = db.engine.execute( "select e1.id as id1, f1.hash, d1.rel_path as rel_path1, d1.eid as did1, e1.name as fname1, p1.id as path1, p1.type_id as path_type1, e2.id as id2, d2.rel_path as rel_path2, d2.eid as did2, e2.name as fname2, p2.id as path2, p2.type_id as path_type2 from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb order by path1, rel_path1, fname1");
if rows.returns_rows == False:
st.SetMessage(f"Err, no dups - should now clear the FE 'danger' message?")
return redirect("/")
if 'pagesize' not in request.form:
# default to 10, see if we have a larger value as someone reset it in the gui, rather than first time invoked
pagesize = 10
jexes = JobExtra.query.join(Job).filter(Job.name=='checkdups').filter(Job.pa_job_state=='New').all()
jexes.append( JobExtra( name="pagesize", value=pagesize ) )
else:
pagesize=int(request.form['pagesize'])
DD=Duplicates()
for row in rows:
DD.AddDup( row )
DD.SecondPass()
# DD.Dump()
return render_template("dups.html", DD=DD, pagesize=pagesize )
################################################################################
# /rm_dups -> f/e that shows actual duplicates so that we can delete some dups
# this code creates a new job with extras that have hashes/ids to allow removal
################################################################################
@app.route("/rm_dups", methods=["POST"])
@login_required
def rm_dups():
jex=[]
for el in request.form:
if 'kfhash-' in el:
# get which row/number kf it is...
_, which = el.split('-')
jex.append( JobExtra( name=f"kfid-{which}", value=request.form['kfid-'+which] ) )
jex.append( JobExtra( name=f"kfhash-{which}", value=request.form[el] ) )
if 'kdhash-' in el:
# get which row/number kd it is...
_, which = el.split('-')
jex.append( JobExtra( name=f"kdid-{which}", value=request.form['kdid-'+which] ) )
jex.append( JobExtra( name=f"kdhash-{which}", value=request.form[el] ) )
jex.append( JobExtra( name="pagesize", value=10 ) )
job=NewJob( "rmdups", 0, None, jex )
st.SetMessage( f"Created&nbsp;<a href=/job/{job.id}>Job #{job.id}</a>&nbsp;to delete duplicate files")
return redirect("/jobs")
################################################################################
# /restore_files -> create a job to restore files for the b/e to process
################################################################################
@app.route("/restore_files", methods=["POST"])
@login_required
def restore_files():
jex=[]
for el in request.form:
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
job=NewJob( "restore_files", 0, None, jex )
st.SetMessage( f"Created&nbsp;<a href=/job/{job.id}>Job #{job.id}</a>&nbsp;to restore selected file(s)")
return redirect("/jobs")
################################################################################
# /delete_files -> create a job to delete files for the b/e to process
################################################################################
@app.route("/delete_files", methods=["POST"])
@login_required
def delete_files():
jex=[]
for el in request.form:
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
job=NewJob( "delete_files", 0, None, jex )
st.SetMessage( f"Created&nbsp;<a href=/job/{job.id}>Job #{job.id}</a>&nbsp;to delete selected file(s)")
return redirect("/jobs")
################################################################################
# /move_files -> create a job to move files for the b/e to process
################################################################################
@app.route("/move_files", methods=["POST"])
@login_required
def move_files():
jex=[]
for el in request.form:
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
job=NewJob( "move_files", 0, None, jex )
return make_response( jsonify(
job_id=job.id,
message=f"Created&nbsp;<a class='link-light' href=/job/{job.id}>Job #{job.id}</a>&nbsp;to move selected file(s)",
level="success", alert="success", persistent=False, cant_close=False ) )
@login_required
@app.route("/viewlist", methods=["POST"])
def viewlist():
OPT=States( request )
# Get next/prev set of data - e.g. if next set, then it will use orig_url
# to go forward how_many from offset and then use viewer.html to show that
# first obj of the new list of entries
entries=GetEntries( OPT )
# this occurs when we went from the last image on a page (with how_many on
# it) and it just happened to also be the last in the DB...
if not entries:
print("DDP: DONT think this can happen anymore")
# undo the skip by how_many and getentries again
OPT.offset -= int(OPT.how_many)
entries=GetEntries( OPT )
# now flag we are at the last in db, to reset current below
objs = {}
eids=""
resp={}
resp['objs']={}
for e in entries:
if not e.file_details:
continue
eids=eids+f"{e.id},"
resp['objs'][e.id]={}
resp['objs'][e.id]['url'] = e.FullPathOnFS()
resp['objs'][e.id]['name'] = e.name
resp['objs'][e.id]['type'] = e.type.name
if e.file_details.faces:
# model is used for whole file, so set it at that level (based on first face)
resp['objs'][e.id]['face_model'] = e.file_details.faces[0].facefile_lnk.model_used
resp['objs'][e.id]['faces'] = []
# put face data back into array format (for js processing)
fid=0
for face in e.file_details.faces:
fd= {}
fd['x'] = face.face_left
fd['y'] = face.face_top
fd['w'] = face.w
fd['h'] = face.h
if face.refimg:
fd['who'] = face.refimg.person.tag
fd['distance'] = round(face.refimg_lnk.face_distance,2)
resp['objs'][e.id]['faces'].append(fd)
fid+=1
eids=eids.rstrip(",")
lst = eids.split(',')
if 'next' in request.form:
OPT.current = int(lst[0])
if 'prev' in request.form:
OPT.current = int(lst[-1])
resp['current']=OPT.current
# OPT.first_eid can still be 0 IF we have gone past the first page, I could
# better set this in states rather than kludge this if... think about it
if OPT.first_eid>0:
resp['first_eid']=OPT.first_eid
resp['last_eid']=OPT.last_eid
resp['eids']=eids
resp['offset']=OPT.offset
# save pref to keep the new current value, first/last
pref=PA_UserState.query.filter(PA_UserState.pa_user_dn==current_user.dn,PA_UserState.orig_ptype==OPT.orig_ptype,PA_UserState.view_eid==OPT.view_eid).first()
UpdatePref( pref, OPT )
return resp
################################################################################
# /view/id -> grabs data from DB and views it (GET)
################################################################################
@login_required
@app.route("/view/<id>", methods=["GET"])
def view(id):
OPT=States( request )
objs = {}
entries=GetEntries( OPT )
eids=""
for e in entries:
objs[e.id]=e
eids += f"{e.id},"
# if this is a dir, we wont view it with a click anyway, so move on...
if not e.file_details:
continue
# process any overrides
for face in e.file_details.faces:
# now get any relevant override and store it in objs...
fnmo = FaceNoMatchOverride.query.filter(FaceNoMatchOverride.face_id==face.id).first()
if fnmo:
face.no_match_override=fnmo
mo = FaceForceMatchOverride.query.filter(FaceForceMatchOverride.face_id==face.id).first()
if mo:
mo.type = FaceOverrideType.query.filter( FaceOverrideType.name== 'Manual match to existing person' ).first()
face.manual_override=mo
eids=eids.rstrip(",")
# jic, sometimes we trip this, and rather than show broken pages / destroy
if id not in eids:
print( f"ERROR: viewing an id, but its not in eids OPT={OPT}, id={id}, eids={eids}")
msg="Sorry, viewing data is confused, cannot view this image now"
if os.environ['ENV'] == "production":
msg += "Clearing out all states. This means browser back buttons will not work, please start a new tab and try again"
PA_UserState.query.delete()
db.session.commit()
st.SetMessage( msg, "warning" )
return redirect("/")
else:
NMO_data = FaceOverrideType.query.all()
setting = Settings.query.first()
imp_path = setting.import_path
st_path = setting.storage_path
bin_path = setting.recycle_bin_path
return render_template("viewer.html", current=int(id), eids=eids, objs=objs, OPT=OPT, NMO_data=NMO_data, imp_path=imp_path, st_path=st_path, bin_path=bin_path )
##################################################################################
# /view/id -> grabs data from DB and views it (POST -> set state, redirect to GET)
##################################################################################
@app.route("/view/<id>", methods=["POST"])
@login_required
def view_img_post(id):
# set pa_user_states...
OPT=States( request )
# then use back-button friendly URL (and use pa_user_states to view the right image in the right list
return redirect( "/view/" + id );
# route called from front/end - if multiple images are being transformed, each transorm == a separate call
# to this route (and therefore a separate transorm job. Each reponse allows the f/e to check the
# specific transorm job is finished (/checktransformjob) which will be called (say) every 1 sec. from f/e
# with a spinning wheel, then when pa_job_mgr has finished it will return the transformed thumb
@app.route("/transform", methods=["POST"])
@login_required
def transform():
id = request.form['id']
amt = request.form['amt']
jex=[]
for el in request.form:
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
job=NewJob( "transform_image", 0, None, jex )
resp={}
resp['job_id']=job.id
return resp
################################################################################
# /checktransformjob -> URL that is called repeatedly by front-end waiting for the
# b/e to finish the transform job. Once done, the new / now
# transformed image's thumbnail is returned so the f/e can
# update with it
################################################################################
@app.route("/checktransformjob", methods=["POST"])
@login_required
def checktransformjob():
job_id = request.form['job_id']
job = Job.query.get(job_id)
resp={}
resp['finished']=False
if job.pa_job_state == 'Completed':
id=[jex.value for jex in job.extra if jex.name == "id"][0]
e=Entry.query.join(File).filter(Entry.id==id).first()
resp['thumbnail']=e.file_details.thumbnail
resp['finished']=True
return resp
################################################################################
# /include -> return contents on /include and does not need a login, so we
# can get the icon, and potentially any js, bootstrap, etc. needed for the login page
################################################################################
@app.route("/internal/<path:filename>")
def internal(filename):
return send_from_directory("internal/", filename)
################################################################################
# /static -> returns the contents of any file referenced inside /static.
# we create/use symlinks in static/ to reference the images to show
################################################################################
@app.route("/static/<filename>")
@login_required
def custom_static(filename):
return send_from_directory("static/", filename)
###############################################################################
# This func creates a new filter in jinja2 to test to see if the Dir being
# checked, is a top-level folder of 'OPT.cwd'
################################################################################
@app.template_filter('TopLevelFolderOf')
def _jinja2_filter_toplevelfolderof(path, cwd):
if os.path.dirname(path) == cwd:
return True
else:
return False
###############################################################################
# This func creates a new filter in jinja2 to test to hand back the parent path
# from a given path
################################################################################
@app.template_filter('ParentPath')
def _jinja2_filter_parentpath(path):
return os.path.dirname(path)
###############################################################################
# route to allow the Move Dialog Box to pass a date (YYYYMMDD) and returns a
# json? list of existing dir names that could be near it in time. Starting
# simple, by using YYYYMM-1, YYYYMM, YYYYMM+1 dirs
###############################################################################
@app.route("/getexistingpaths/<dt>", methods=["POST"])
@login_required
def GetExistingPathsAsDiv(dt):
dir_ft=FileType.query.filter(FileType.name=='Directory').first()
dirs_arr=[]
for delta in range(-7, 8):
try:
new_dtime=datetime.datetime.strptime(dt, "%Y%m%d") + datetime.timedelta(days=delta)
except:
# this is not a date, so we cant work out possible dirs, just
# return an empty set
return "[]"
new_dt=new_dtime.strftime('%Y%m%d')
dirs_arr+=Dir.query.distinct(Dir.rel_path).filter(Dir.rel_path.ilike('%'+new_dt+'%')).all();
dirs_arr+=Dir.query.distinct(Dir.rel_path).join(EntryDirLink).join(Entry).filter(Entry.type_id!=dir_ft.id).filter(Entry.name.ilike('%'+new_dt+'%')).all()
# remove duplicates from array
dirs = set(dirs_arr)
# turn DB output into json and return it to the f/e
ret='[ '
first_dir=1
for dir in dirs:
if not first_dir:
ret +=", "
bits=dir.rel_path.split('-')
ret+= '{ '
ret+= '"prefix":"' + bits[0] + '", '
if len(bits)>1:
ret+= '"suffix":"' + bits[1] + '"'
else:
ret+= '"suffix":"''"'
ret+= ' } '
first_dir=0
ret+= ' ]'
return ret