Files
photoassistant/files.py

485 lines
22 KiB
Python

from wtforms import SubmitField, StringField, HiddenField, validators, Form
from flask_wtf import FlaskForm
from flask import request, render_template, redirect, send_from_directory
from main import db, app, ma
from sqlalchemy import Sequence
from sqlalchemy.exc import SQLAlchemyError
from status import st, Status
import os
import glob
from PIL import Image
from pymediainfo import MediaInfo
import hashlib
import exifread
import base64
import numpy
import cv2
import time
import re
from flask_login import login_required, current_user
################################################################################
# Local Class imports
################################################################################
from job import Job, JobExtra, Joblog, NewJob
from path import PathType, Path
from person import Person, PersonRefimgLink
from refimg import Refimg
from settings import Settings
from shared import SymlinkName
from dups import Duplicates
# pylint: disable=no-member
################################################################################
# Class describing File in the database, and via sqlalchemy, connected to the DB as well
# This has to match one-for-one the DB table
################################################################################
class PathDirLink(db.Model):
__tablename__ = "path_dir_link"
path_id = db.Column(db.Integer, db.ForeignKey("path.id"), primary_key=True )
dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return f"<path_id: {self.path_id}, dir_eid: {self.dir_eid}>"
class EntryDirLink(db.Model):
__tablename__ = "entry_dir_link"
entry_id = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True )
def __repr__(self):
return f"<entry_id: {self.entry_id}, dir_eid: {self.dir_eid}>"
class Dir(db.Model):
__tablename__ = "dir"
eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
rel_path = db.Column(db.String, unique=True )
in_path = db.relationship("Path", secondary="path_dir_link", uselist=False)
def __repr__(self):
return f"<eid: {self.eid}, rel_path: {self.rel_path}, in_path: {self.in_path}>"
class Entry(db.Model):
__tablename__ = "entry"
id = db.Column(db.Integer, db.Sequence('file_id_seq'), primary_key=True )
name = db.Column(db.String, unique=False, nullable=False )
type_id = db.Column(db.Integer, db.ForeignKey("file_type.id"))
type = db.relationship("FileType")
dir_details = db.relationship( "Dir", uselist=False )
file_details = db.relationship( "File", uselist=False )
in_dir = db.relationship ("Dir", secondary="entry_dir_link", uselist=False )
def __repr__(self):
return "<id: {}, name: {}, type={}, dir_details={}, file_details={}, in_dir={}>".format(self.id, self.name, self.type, self.dir_details, self.file_details, self.in_dir)
class FileRefimgLink(db.Model):
__tablename__ = "file_refimg_link"
file_id = db.Column(db.Integer, db.ForeignKey('file.eid'), unique=True, nullable=False, primary_key=True)
refimg_id = db.Column(db.Integer, db.ForeignKey('refimg.id'), unique=True, nullable=False, primary_key=True)
when_processed = db.Column(db.Float)
matched = db.Column(db.Boolean)
def __repr__(self):
return f"<file_id: {self.file_id}, refimg_id: {self.refimg_id} when_processed={self.when_processed}, matched={self.matched}"
class File(db.Model):
__tablename__ = "file"
eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True )
size_mb = db.Column(db.Integer, unique=False, nullable=False)
thumbnail = db.Column(db.String, unique=False, nullable=True)
hash = db.Column(db.Integer)
year = db.Column(db.Integer)
month = db.Column(db.Integer)
day = db.Column(db.Integer)
woy = db.Column(db.Integer)
def __repr__(self):
return f"<eid: {self.eid}, size_mb={self.size_mb}, hash={self.hash}, year={self.year}, month={self.month}, day={self.day}, woy={self.woy}>"
class FileType(db.Model):
__tablename__ = "file_type"
id = db.Column(db.Integer, db.Sequence('file_type_id_seq'), primary_key=True )
name = db.Column(db.String, unique=True, nullable=False )
def __repr__(self):
return "<id: {}, name={}>".format(self.id, self.name )
class PA_JobManager_Message(db.Model):
__tablename__ = "pa_job_manager_fe_message"
id = db.Column(db.Integer, db.Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True )
job_id = db.Column(db.Integer, db.ForeignKey('job.id') )
alert = db.Column(db.String)
message = db.Column(db.String)
def __repr__(self):
return "<id: {}, job_id: {}, alert: {}, message: {}".format(self.id, self.job_id, self.alert, self.message)
################################################################################
# Local utility functions
################################################################################
def GetJM_Message():
msg=PA_JobManager_Message.query.first()
return msg
def ClearJM_Message(id):
PA_JobManager_Message.query.filter(PA_JobManager_Message.id==id).delete()
db.session.commit()
return
def ViewingOptions( request ):
noo="Oldest"
grouping="None"
how_many="50"
offset=0
size=128
if 'files_sp' in request.path:
noo="A to Z"
folders=True
cwd='static/Storage'
elif 'files_rbp' in request.path:
folders=True
cwd='static/Bin'
else:
folders=False
cwd='static/Import'
root=cwd
# the above are defaults, if we are here, then we have current values, use them instead
if request.method=="POST":
noo=request.form['noo']
how_many=request.form['how_many']
offset=int(request.form['offset'])
grouping=request.form['grouping']
size = request.form['size']
# seems html cant do boolean, but uses strings so convert
if request.form['folders'] == "False":
folders=False
if request.form['folders'] == "True":
folders=True
# have to force grouping to None if we flick to folders from a flat
# view with grouping (otherwise we print out group headings for
# child content that is not in the CWD)
grouping=None
cwd = request.form['cwd']
if 'prev' in request.form:
offset -= int(how_many)
if offset < 0:
offset=0
if 'next' in request.form:
offset += int(how_many)
return noo, grouping, how_many, offset, size, folders, cwd, root
def GetEntriesInFlatView( cwd, prefix, noo, offset, how_many ):
entries=[]
if noo == "Oldest":
entries+=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(File.year,File.month,File.day,Entry.name).offset(offset).limit(how_many).all()
elif noo == "Newest":
entries+=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(offset).limit(how_many).all()
elif noo == "Z to A":
entries+=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(Entry.name.desc()).offset(offset).limit(how_many).all()
else:
entries+=Entry.query.join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path).filter(Path.path_prefix==prefix).order_by(Entry.name).offset(offset).limit(how_many).all()
return entries
def GetEntriesInFolderView( cwd, prefix, noo, offset, how_many ):
entries=[]
# okay the root cwd is fake, so treat it specially - its Dir can be found by path with dir.rel_path=''
if os.path.dirname(cwd) == 'static':
dir=Entry.query.join(Dir).join(PathDirLink).join(Path).filter(Dir.rel_path=='').filter(Path.path_prefix==prefix).order_by(Entry.name).first()
# this can occur if the path in settings does not exist as it wont be in # the DB
if not dir:
return entries
# although this is 1 entry, needs to come back via all() to be iterable
entries+= Entry.query.filter(Entry.id==dir.id).all()
else:
rp = cwd.replace( prefix, '' )
# when in subdirs, replacing prefix will leave the first char as /, get rid of it
if len(rp) and rp[0] == '/':
rp=rp[1:]
dir=Entry.query.join(Dir).join(PathDirLink).join(Path).filter(Dir.rel_path==rp).filter(Path.path_prefix==prefix).order_by(Entry.name).first()
# this can occur if the path in settings does not exist as it wont be in # the DB
if not dir:
return entries
if noo == "Z to A" or "Newest":
entries+= Entry.query.join(EntryDirLink).join(FileType).filter(EntryDirLink.dir_eid==dir.id).filter(FileType.name=='Directory').order_by(Entry.name.desc()).all()
# just do A to Z / Oldest by default or if no valid option
else:
entries+= Entry.query.join(EntryDirLink).join(FileType).filter(EntryDirLink.dir_eid==dir.id).filter(FileType.name=='Directory').order_by(Entry.name).all()
# add any files at the current CWD (based on dir_eid in DB)
if noo == "Oldest":
entries+=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(File.year,File.month,File.day,Entry.name).offset(offset).limit(how_many).all()
elif noo == "Newest":
entries+=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(offset).limit(how_many).all()
elif noo == "Z to A":
entries+=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(Entry.name.desc()).offset(offset).limit(how_many).all()
# just do A to Z by default or if no valid option
else:
entries+=Entry.query.join(File).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir.id).order_by(Entry.name).offset(offset).limit(how_many).all()
return entries
################################################################################
# /file_list -> show detailed file list of files from import_path(s)
################################################################################
@app.route("/file_list_ip", methods=["GET","POST"])
@login_required
def file_list_ip():
noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request )
entries=[]
settings=Settings.query.first()
paths = settings.import_path.split("#")
for path in paths:
prefix = SymlinkName("Import",path,path+'/')
entries+=GetEntriesInFlatView( cwd, prefix, noo, offset, how_many )
return render_template("file_list.html", page_title='View File Details (Import Path)', entry_data=entries, noo=noo, how_many=how_many, offset=offset )
################################################################################
# /files -> show thumbnail view of files from import_path(s)
################################################################################
@app.route("/files_ip", methods=["GET", "POST"])
@login_required
def files_ip():
noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request )
entries=[]
people = Person.query.all()
# per import path, add entries to view
settings=Settings.query.first()
paths = settings.import_path.split("#")
for path in paths:
if not os.path.exists(path):
continue
prefix = SymlinkName("Import",path,path+'/')
if folders:
entries+=GetEntriesInFolderView( cwd, prefix, noo, offset, how_many )
else:
entries+=GetEntriesInFlatView( cwd, prefix, noo, offset, how_many )
return render_template("files.html", page_title='View Files (Import Path)', entry_data=entries, noo=noo, grouping=grouping, how_many=how_many, offset=offset, size=size, folders=folders, cwd=cwd, root=root, people=people )
################################################################################
# /files -> show thumbnail view of files from storage_path
################################################################################
@app.route("/files_sp", methods=["GET", "POST"])
@login_required
def files_sp():
noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request )
entries=[]
people = Person.query.all()
# per storage path, add entries to view
settings=Settings.query.first()
paths = settings.storage_path.split("#")
for path in paths:
if not os.path.exists(path):
continue
prefix = SymlinkName("Storage",path,path+'/')
if folders:
entries+=GetEntriesInFolderView( cwd, prefix, noo, offset, how_many )
else:
entries+=GetEntriesInFlatView( cwd, prefix, noo, offset, how_many )
return render_template("files.html", page_title='View Files (Storage Path)', entry_data=entries, noo=noo, grouping=grouping, how_many=how_many, offset=offset, size=size, folders=folders, cwd=cwd, root=root, people=people )
################################################################################
# /files -> show thumbnail view of files from storage_path
################################################################################
@app.route("/files_rbp", methods=["GET", "POST"])
@login_required
def files_rbp():
noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request )
entries=[]
# per recyle bin path, add entries to view
settings=Settings.query.first()
paths = settings.recycle_bin_path.split("#")
for path in paths:
if not os.path.exists(path):
continue
prefix = SymlinkName("Bin",path,path+'/')
if folders:
entries+=GetEntriesInFolderView( cwd, prefix, noo, offset, how_many )
else:
entries+=GetEntriesInFlatView( cwd, prefix, noo, offset, how_many )
return render_template("files.html", page_title='View Files (Bin Path)', entry_data=entries, noo=noo, grouping=grouping, how_many=how_many, offset=offset, size=size, folders=folders, cwd=cwd, root=root )
################################################################################
# /search -> show thumbnail view of files from import_path(s)
################################################################################
@app.route("/search", methods=["GET","POST"])
@login_required
def search():
noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request )
# always show flat results for search to start with
folders=False
file_data=Entry.query.join(File).filter(Entry.name.ilike(f"%{request.form['term']}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(offset).limit(how_many).all()
dir_data=Entry.query.join(File).join(EntryDirLink).join(Dir).filter(Dir.rel_path.ilike(f"%{request.form['term']}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(offset).limit(how_many).all()
ai_data=Entry.query.join(File).join(FileRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(FileRefimgLink.matched==True).filter(Person.tag.ilike(f"%{request.form['term']}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(offset).limit(how_many).all()
all_entries = file_data + dir_data + ai_data
return render_template("files.html", page_title='View Files', search_term=request.form['term'], entry_data=all_entries, noo=noo, grouping=grouping, how_many=how_many, offset=offset, size=size, folders=folders, cwd=cwd, root=root )
################################################################################
# /files/scannow -> allows us to force a check for new files
################################################################################
@app.route("/files/scannow", methods=["GET"])
@login_required
def scannow():
job=NewJob("scannow" )
st.SetAlert("success")
st.SetMessage("scanning for new files in:&nbsp;<a href=/job/{}>Job #{}</a>&nbsp;(Click the link to follow progress)".format( job.id, job.id) )
return render_template("base.html")
################################################################################
# /files/forcescan -> deletes old data in DB, and does a brand new scan
################################################################################
@app.route("/files/forcescan", methods=["GET"])
@login_required
def forcescan():
job=NewJob("forcescan" )
st.SetAlert("success")
st.SetMessage("force scan & rebuild data for files in:&nbsp;<a href=/job/{}>Job #{}</a>&nbsp;(Click the link to follow progress)".format( job.id, job.id) )
return render_template("base.html")
################################################################################
# /files/scan_sp -> allows us to force a check for new files
################################################################################
@app.route("/files/scan_sp", methods=["GET"])
@login_required
def scan_sp():
job=NewJob("scan_sp" )
st.SetAlert("success")
st.SetMessage("scanning for new files in:&nbsp;<a href=/job/{}>Job #{}</a>&nbsp;(Click the link to follow progress)".format( job.id, job.id) )
return render_template("base.html")
@app.route("/fix_dups", methods=["POST"])
@login_required
def fix_dups():
rows = db.engine.execute( "select e1.id as id1, f1.hash, d1.rel_path as rel_path1, d1.eid as did1, e1.name as fname1, p1.id as path1, p1.type_id as path_type1, e2.id as id2, d2.rel_path as rel_path2, d2.eid as did2, e2.name as fname2, p2.id as path2, p2.type_id as path_type2 from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb order by path1, rel_path1, fname1");
if rows.returns_rows == False:
st.SetAlert("success")
st.SetMessage(f"Err, no dups - should now clear the FE 'danger' message?")
return render_template("base.html")
if 'pagesize' not in request.form:
# default to 10, see if we have a larger value as someone reset it in the gui, rather than first time invoked
pagesize = 10
jexes = JobExtra.query.join(Job).filter(Job.name=='checkdups').filter(Job.pa_job_state=='New').all()
jexes.append( JobExtra( name="pagesize", value=pagesize ) )
else:
pagesize=int(request.form['pagesize'])
DD=Duplicates()
for row in rows:
DD.AddDup( row )
DD.SecondPass()
# DD.Dump()
return render_template("dups.html", DD=DD, pagesize=pagesize )
@app.route("/rm_dups", methods=["POST"])
@login_required
def rm_dups():
jex=[]
for el in request.form:
if 'kfhash-' in el:
# get which row/number kf it is...
_, which = el.split('-')
jex.append( JobExtra( name=f"kfid-{which}", value=request.form['kfid-'+which] ) )
jex.append( JobExtra( name=f"kfhash-{which}", value=request.form[el] ) )
if 'kdhash-' in el:
# get which row/number kd it is...
_, which = el.split('-')
jex.append( JobExtra( name=f"kdid-{which}", value=request.form['kdid-'+which] ) )
jex.append( JobExtra( name=f"kdhash-{which}", value=request.form[el] ) )
jex.append( JobExtra( name="pagesize", value=10 ) )
job=NewJob( "rmdups", 0, None, jex )
st.SetAlert("success")
st.SetMessage( f"Created&nbsp;<a href=/job/{job.id}>Job #{job.id}</a>&nbsp;to delete duplicate files")
return render_template("base.html")
@app.route("/restore_files", methods=["POST"])
@login_required
def restore_files():
jex=[]
for el in request.form:
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
job=NewJob( "restore_files", 0, None, jex )
st.SetAlert("success")
st.SetMessage( f"Created&nbsp;<a href=/job/{job.id}>Job #{job.id}</a>&nbsp;to restore selected file(s)")
return render_template("base.html")
@app.route("/delete_files", methods=["POST"])
@login_required
def delete_files():
jex=[]
for el in request.form:
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
job=NewJob( "delete_files", 0, None, jex )
st.SetAlert("success")
st.SetMessage( f"Created&nbsp;<a href=/job/{job.id}>Job #{job.id}</a>&nbsp;to delete selected file(s)")
return render_template("base.html")
@app.route("/move_files", methods=["POST"])
@login_required
def move_files():
jex=[]
for el in request.form:
jex.append( JobExtra( name=f"{el}", value=request.form[el] ) )
job=NewJob( "move_files", 0, None, jex )
st.SetAlert("success")
st.SetMessage( f"Created&nbsp;<a href=/job/{job.id}>Job #{job.id}</a>&nbsp;to move selected file(s)")
return render_template("base.html")
################################################################################
# /static -> returns the contents of any file referenced inside /static.
# we create/use symlinks in static/ to reference the images to show
################################################################################
@app.route("/static/<filename>")
@login_required
def custom_static(filename):
return send_from_directory("static/", filename)
###############################################################################
# This func creates a new filter in jinja2 to test to see if the Dir being
# checked, is a top-level folder of 'cwd'
################################################################################
@app.template_filter('TopLevelFolderOf')
def _jinja2_filter_toplevelfolderof(path, cwd):
if os.path.dirname(path) == cwd:
return True
else:
return False
###############################################################################
# This func creates a new filter in jinja2 to test to hand back the parent path
# from a given path
################################################################################
@app.template_filter('ParentPath')
def _jinja2_filter_parentpath(path):
return os.path.dirname(path)