from wtforms import SubmitField, StringField, HiddenField, validators, Form from flask_wtf import FlaskForm from flask import request, render_template, redirect, send_from_directory from main import db, app, ma from sqlalchemy import Sequence from sqlalchemy.exc import SQLAlchemyError from status import st, Status import os import glob from PIL import Image from pymediainfo import MediaInfo import hashlib import exifread import base64 import numpy import cv2 import time import re import json from flask_login import login_required, current_user ################################################################################ # Local Class imports ################################################################################ from job import Job, JobExtra, Joblog, NewJob from path import PathType, Path from person import Refimg, Person, PersonRefimgLink from settings import Settings from shared import SymlinkName from dups import Duplicates from face import Face, FaceFileLink, FaceRefimgLink # pylint: disable=no-member ################################################################################ # Class describing PathDirLink and in the DB (via sqlalchemy) # connects the entry (dir) with a path ################################################################################ class PathDirLink(db.Model): __tablename__ = "path_dir_link" path_id = db.Column(db.Integer, db.ForeignKey("path.id"), primary_key=True ) dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True ) def __repr__(self): return f"" ################################################################################ # Class describing EntryDirLInk and in the DB (via sqlalchemy) # connects (many) entry contained in a directory (which is also an entry) ################################################################################ class EntryDirLink(db.Model): __tablename__ = "entry_dir_link" entry_id = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True ) dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True ) def __repr__(self): return f"" ################################################################################ # Class describing Dir and in the DB (via sqlalchemy) # rel_path: rest of dir after path, e.g. if path = /..../storage, then # rel_path could be 2021/20210101-new-years-day-pics # in_path: only in this structure, not DB, quick ref to the path this dir is in ################################################################################ class Dir(db.Model): __tablename__ = "dir" eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True ) rel_path = db.Column(db.String, unique=True ) in_path = db.relationship("Path", secondary="path_dir_link", uselist=False) def __repr__(self): return f"" ################################################################################ # Class describing Entry and in the DB (via sqlalchemy) # an entry is the common bits between files and dirs # type is a convenience var only in this class, not in DB # {dir|file}_etails are convenience data for the relevant details from the Dir # or File class - not in DB # in_dir - is the Dir that this entry is located in (convenience for class only) ################################################################################ class Entry(db.Model): __tablename__ = "entry" id = db.Column(db.Integer, db.Sequence('file_id_seq'), primary_key=True ) name = db.Column(db.String, unique=False, nullable=False ) type_id = db.Column(db.Integer, db.ForeignKey("file_type.id")) type = db.relationship("FileType") dir_details = db.relationship( "Dir", uselist=False ) file_details = db.relationship( "File", uselist=False ) in_dir = db.relationship ("Dir", secondary="entry_dir_link", uselist=False ) def __repr__(self): return f"" ################################################################################ # Class describing FileType and in the DB (via sqlalchemy) # pre-defined list of file types (image, dir, etc.) ################################################################################ class FileType(db.Model): __tablename__ = "file_type" id = db.Column(db.Integer, db.Sequence('file_type_id_seq'), primary_key=True ) name = db.Column(db.String, unique=True, nullable=False ) def __repr__(self): return "".format(self.id, self.name ) ################################################################################ # Class describing PA_JobManager_Message and in the DB (via sqlalchemy) # the job manager can send a message back to the front end (this code) via the # DB. has to be about a specific job_id and is success/danger, etc. (alert) # and a message ################################################################################ class PA_JobManager_Message(db.Model): __tablename__ = "pa_job_manager_fe_message" id = db.Column(db.Integer, db.Sequence('pa_job_manager_fe_message_id_seq'), primary_key=True ) job_id = db.Column(db.Integer, db.ForeignKey('job.id') ) alert = db.Column(db.String) message = db.Column(db.String) def __repr__(self): return " show detailed file list of files from import_path(s) ################################################################################ @app.route("/file_list_ip", methods=["GET","POST"]) @login_required def file_list_ip(): noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request ) entries=[] settings=Settings.query.first() paths = settings.import_path.split("#") for path in paths: prefix = SymlinkName("Import",path,path+'/') entries+=GetEntriesInFlatView( cwd, prefix, noo, offset, how_many ) return render_template("file_list.html", page_title='View File Details (Import Path)', entry_data=entries, noo=noo, how_many=how_many, offset=offset ) ################################################################################ # /files -> show thumbnail view of files from import_path(s) ################################################################################ @app.route("/files_ip", methods=["GET", "POST"]) @login_required def files_ip(): noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request ) entries=[] people = Person.query.all() # per import path, add entries to view settings=Settings.query.first() paths = settings.import_path.split("#") for path in paths: if not os.path.exists(path): continue prefix = SymlinkName("Import",path,path+'/') if folders: entries+=GetEntriesInFolderView( cwd, prefix, noo, offset, how_many ) else: entries+=GetEntriesInFlatView( cwd, prefix, noo, offset, how_many ) return render_template("files.html", page_title='View Files (Import Path)', entry_data=entries, noo=noo, grouping=grouping, how_many=how_many, offset=offset, size=size, folders=folders, cwd=cwd, root=root, people=people ) ################################################################################ # /files -> show thumbnail view of files from storage_path ################################################################################ @app.route("/files_sp", methods=["GET", "POST"]) @login_required def files_sp(): noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request ) entries=[] people = Person.query.all() # per storage path, add entries to view settings=Settings.query.first() paths = settings.storage_path.split("#") for path in paths: if not os.path.exists(path): continue prefix = SymlinkName("Storage",path,path+'/') if folders: entries+=GetEntriesInFolderView( cwd, prefix, noo, offset, how_many ) else: entries+=GetEntriesInFlatView( cwd, prefix, noo, offset, how_many ) return render_template("files.html", page_title='View Files (Storage Path)', entry_data=entries, noo=noo, grouping=grouping, how_many=how_many, offset=offset, size=size, folders=folders, cwd=cwd, root=root, people=people ) ################################################################################ # /files -> show thumbnail view of files from storage_path ################################################################################ @app.route("/files_rbp", methods=["GET", "POST"]) @login_required def files_rbp(): noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request ) entries=[] # per recyle bin path, add entries to view settings=Settings.query.first() paths = settings.recycle_bin_path.split("#") for path in paths: if not os.path.exists(path): continue prefix = SymlinkName("Bin",path,path+'/') if folders: entries+=GetEntriesInFolderView( cwd, prefix, noo, offset, how_many ) else: entries+=GetEntriesInFlatView( cwd, prefix, noo, offset, how_many ) return render_template("files.html", page_title='View Files (Bin Path)', entry_data=entries, noo=noo, grouping=grouping, how_many=how_many, offset=offset, size=size, folders=folders, cwd=cwd, root=root ) ################################################################################ # /search -> show thumbnail view of files from import_path(s) ################################################################################ @app.route("/search", methods=["GET","POST"]) @login_required def search(): noo, grouping, how_many, offset, size, folders, cwd, root = ViewingOptions( request ) # always show flat results for search to start with folders=False term=request.form['term'] if 'AI:' in term: term = term.replace('AI:','') all_entries = Entry.query.join(File).join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag.ilike(f"%{term}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(offset).limit(how_many).all() else: file_data=Entry.query.join(File).filter(Entry.name.ilike(f"%{request.form['term']}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(offset).limit(how_many).all() dir_data=Entry.query.join(File).join(EntryDirLink).join(Dir).filter(Dir.rel_path.ilike(f"%{request.form['term']}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(offset).limit(how_many).all() ai_data=Entry.query.join(File).join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person).filter(Person.tag.ilike(f"%{request.form['term']}%")).order_by(File.year.desc(),File.month.desc(),File.day.desc(),Entry.name).offset(offset).limit(how_many).all() all_entries = file_data + dir_data + ai_data return render_template("files.html", page_title='View Files', search_term=request.form['term'], entry_data=all_entries, noo=noo, grouping=grouping, how_many=how_many, offset=offset, size=size, folders=folders, cwd=cwd, root=root ) ################################################################################ # /files/scannow -> allows us to force a check for new files ################################################################################ @app.route("/files/scannow", methods=["GET"]) @login_required def scannow(): job=NewJob("scannow" ) st.SetAlert("success") st.SetMessage("scanning for new files in: Job #{} (Click the link to follow progress)".format( job.id, job.id) ) return render_template("base.html") ################################################################################ # /files/forcescan -> deletes old data in DB, and does a brand new scan ################################################################################ @app.route("/files/forcescan", methods=["GET"]) @login_required def forcescan(): job=NewJob("forcescan" ) st.SetAlert("success") st.SetMessage("force scan & rebuild data for files in: Job #{} (Click the link to follow progress)".format( job.id, job.id) ) return render_template("base.html") ################################################################################ # /files/scan_sp -> allows us to force a check for new files ################################################################################ @app.route("/files/scan_sp", methods=["GET"]) @login_required def scan_sp(): job=NewJob("scan_sp" ) st.SetAlert("success") st.SetMessage("scanning for new files in: Job #{} (Click the link to follow progress)".format( job.id, job.id) ) return render_template("base.html") ################################################################################ # /fix_dups -> use sql to find duplicates based on same hash, different # filenames, or directories. Pass this straight through to the job manager # as job extras to a new job. ################################################################################ @app.route("/fix_dups", methods=["POST"]) @login_required def fix_dups(): rows = db.engine.execute( "select e1.id as id1, f1.hash, d1.rel_path as rel_path1, d1.eid as did1, e1.name as fname1, p1.id as path1, p1.type_id as path_type1, e2.id as id2, d2.rel_path as rel_path2, d2.eid as did2, e2.name as fname2, p2.id as path2, p2.type_id as path_type2 from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb order by path1, rel_path1, fname1"); if rows.returns_rows == False: st.SetAlert("success") st.SetMessage(f"Err, no dups - should now clear the FE 'danger' message?") return render_template("base.html") if 'pagesize' not in request.form: # default to 10, see if we have a larger value as someone reset it in the gui, rather than first time invoked pagesize = 10 jexes = JobExtra.query.join(Job).filter(Job.name=='checkdups').filter(Job.pa_job_state=='New').all() jexes.append( JobExtra( name="pagesize", value=pagesize ) ) else: pagesize=int(request.form['pagesize']) DD=Duplicates() for row in rows: DD.AddDup( row ) DD.SecondPass() # DD.Dump() return render_template("dups.html", DD=DD, pagesize=pagesize ) ################################################################################ # /rm_dups -> f/e that shows actual duplicates so that we can delete some dups # this code creates a new job with extras that have hashes/ids to allow removal ################################################################################ @app.route("/rm_dups", methods=["POST"]) @login_required def rm_dups(): jex=[] for el in request.form: if 'kfhash-' in el: # get which row/number kf it is... _, which = el.split('-') jex.append( JobExtra( name=f"kfid-{which}", value=request.form['kfid-'+which] ) ) jex.append( JobExtra( name=f"kfhash-{which}", value=request.form[el] ) ) if 'kdhash-' in el: # get which row/number kd it is... _, which = el.split('-') jex.append( JobExtra( name=f"kdid-{which}", value=request.form['kdid-'+which] ) ) jex.append( JobExtra( name=f"kdhash-{which}", value=request.form[el] ) ) jex.append( JobExtra( name="pagesize", value=10 ) ) job=NewJob( "rmdups", 0, None, jex ) st.SetAlert("success") st.SetMessage( f"Created Job #{job.id} to delete duplicate files") return render_template("base.html") ################################################################################ # /restore_files -> create a job to restore files for the b/e to process ################################################################################ @app.route("/restore_files", methods=["POST"]) @login_required def restore_files(): jex=[] for el in request.form: jex.append( JobExtra( name=f"{el}", value=request.form[el] ) ) job=NewJob( "restore_files", 0, None, jex ) st.SetAlert("success") st.SetMessage( f"Created Job #{job.id} to restore selected file(s)") return render_template("base.html") ################################################################################ # /delete_files -> create a job to delete files for the b/e to process ################################################################################ @app.route("/delete_files", methods=["POST"]) @login_required def delete_files(): jex=[] for el in request.form: jex.append( JobExtra( name=f"{el}", value=request.form[el] ) ) job=NewJob( "delete_files", 0, None, jex ) st.SetAlert("success") st.SetMessage( f"Created Job #{job.id} to delete selected file(s)") return render_template("base.html") ################################################################################ # /move_files -> create a job to move files for the b/e to process ################################################################################ @app.route("/move_files", methods=["POST"]) @login_required def move_files(): jex=[] for el in request.form: jex.append( JobExtra( name=f"{el}", value=request.form[el] ) ) job=NewJob( "move_files", 0, None, jex ) st.SetAlert("success") st.SetMessage( f"Created Job #{job.id} to move selected file(s)") return render_template("base.html") ################################################################################ # /viewnext -> moves to the next entry and grabs data from DB and views it ################################################################################ @app.route("/viewnext", methods=["GET","POST"]) @login_required def viewnext(): sels={} sels['fname']='true' sels['faces']='true' sels['distance']='true' if request.method=="POST": id = request.form['current'] eids=request.form['eids'] sels['fname']=request.form['fname'] sels['faces']=request.form['faces'] sels['distance']=request.form['distance'] lst = eids.split(',') new_id = lst[lst.index(id)+1] obj = Entry.query.join(File).filter(Entry.id==new_id).first() # put locn data back into array format for face in obj.file_details.faces: face.locn = json.loads(face.locn) return render_template("viewer.html", obj=obj, eids=eids, sels=sels ) ################################################################################ # /viewprev -> moves to the prev entry and grabs data from DB and views it ################################################################################ @app.route("/viewprev", methods=["GET","POST"]) @login_required def viewprev(): sels={} sels['fname']='true' sels['faces']='true' sels['distance']='true' if request.method=="POST": id = request.form['current'] eids=request.form['eids'] sels['fname']=request.form['fname'] sels['faces']=request.form['faces'] sels['distance']=request.form['distance'] lst = eids.split(',') new_id = lst[lst.index(id)-1] obj = Entry.query.join(File).filter(Entry.id==new_id).first() # put locn data back into array format for face in obj.file_details.faces: face.locn = json.loads(face.locn) return render_template("viewer.html", obj=obj, eids=eids, sels=sels ) ################################################################################ # /view/id -> grabs data from DB and views it ################################################################################ @app.route("/view/", methods=["GET","POST"]) @login_required def view_img(id): obj = Entry.query.join(File).filter(Entry.id==id).first() # put locn data back into array format for face in obj.file_details.faces: face.locn = json.loads(face.locn) if request.method=="POST": eids=request.form['eids'] else: eids='' sels={} sels['fname']='true' sels['faces']='true' sels['distance']='true' return render_template("viewer.html", obj=obj, eids=eids, sels=sels ) # route called from front/end - if multiple images are being rotated, each rotation == a separate call # to this route (and therefore a separate rotate job. Each reponse allows the f/e to check the # specific rotation job is finished (/checkrotatejob) which will be called (say) every 1 sec. from f/e # with a spinning wheel, then when pa_job_mgr has finished it will return the rotated thumb @app.route("/rotate", methods=["POST"]) @login_required def rotate(): id = request.form['id'] amt = request.form['amt'] print( f"rotate called with id={id}, amt={amt}") jex=[] for el in request.form: jex.append( JobExtra( name=f"{el}", value=request.form[el] ) ) job=NewJob( "rotate_image", 0, None, jex ) resp={} resp['job_id']=job.id # TODO: make this return data with the job number, then the f/e can poll checkrotatejob return resp ################################################################################ # /checkrotatejob -> URL that is called repeatedly by front-end waiting for the # b/e to finish the rotate job. Once done, the new / now # rotated image's thumbnail is returned so the f/e can # update with it ################################################################################ @app.route("/checkrotatejob", methods=["POST"]) @login_required def checkrotatejob(): job_id = request.form['job_id'] job = Job.query.get(job_id) resp={} resp['finished']=False if job.pa_job_state == 'Completed': id=[jex.value for jex in job.extra if jex.name == "id"][0] e=Entry.query.join(File).filter(Entry.id==id).first() resp['thumbnail']=e.file_details.thumbnail resp['finished']=True return resp ################################################################################ # /include -> return contents on /include and does not need a login, so we # can get the icon, and potentially any js, bootstrap, etc. needed for the login page ################################################################################ @app.route("/internal/") def internal(filename): print( filename ) return send_from_directory("internal/", filename) ################################################################################ # /static -> returns the contents of any file referenced inside /static. # we create/use symlinks in static/ to reference the images to show ################################################################################ @app.route("/static/") @login_required def custom_static(filename): return send_from_directory("static/", filename) ############################################################################### # This func creates a new filter in jinja2 to test to see if the Dir being # checked, is a top-level folder of 'cwd' ################################################################################ @app.template_filter('TopLevelFolderOf') def _jinja2_filter_toplevelfolderof(path, cwd): if os.path.dirname(path) == cwd: return True else: return False ############################################################################### # This func creates a new filter in jinja2 to test to hand back the parent path # from a given path ################################################################################ @app.template_filter('ParentPath') def _jinja2_filter_parentpath(path): return os.path.dirname(path)