from flask_wtf import FlaskForm from flask import request, render_template, redirect, send_from_directory, url_for, jsonify, make_response from marshmallow import Schema, fields from main import db, app, ma from sqlalchemy import Sequence, text, select, union, or_ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import joinedload import os import glob import json from PIL import Image from pymediainfo import MediaInfo import hashlib import exifread import base64 import numpy import cv2 import time import re from datetime import datetime, timedelta import pytz import html from flask_login import login_required, current_user from types import SimpleNamespace from amend import EntryAmendment, AmendmentType # Local Class imports ################################################################################ from states import States, PA_UserState from query import Query from job import Job, JobExtra, Joblog, NewJob, SetFELog from path import PathType, Path from person import Refimg, Person, PersonRefimgLink from settings import Settings, SettingsIPath, SettingsSPath, SettingsRBPath from shared import SymlinkName, ICON, PA from dups import Duplicates from amend import getAmendments from face import Face, FaceFileLink, FaceRefimgLink, FaceOverrideType, FaceNoMatchOverride, FaceForceMatchOverride # pylint: disable=no-member ################################################################################ # Class describing PathDirLink and in the DB (via sqlalchemy) # connects the entry (dir) with a path ################################################################################ class PathDirLink(PA,db.Model): __tablename__ = "path_dir_link" path_id = db.Column(db.Integer, db.ForeignKey("path.id"), primary_key=True ) dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True ) ################################################################################ # Class describing EntryDirLInk and in the DB (via sqlalchemy) # connects (many) entry contained in a directory (which is also an entry) ################################################################################ class EntryDirLink(PA,db.Model): __tablename__ = "entry_dir_link" entry_id = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True ) dir_eid = db.Column(db.Integer, db.ForeignKey("dir.eid"), primary_key=True ) ################################################################################ # Class describing Dir and in the DB (via sqlalchemy) # rel_path: rest of dir after path, e.g. if path = /..../storage, then # rel_path could be 2021/20210101-new-years-day-pics # in_path: only in this structure, not DB, quick ref to the path this dir is in ################################################################################ class Dir(PA,db.Model): __tablename__ = "dir" eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True ) rel_path = db.Column(db.String, unique=True ) in_path = db.relationship("Path", secondary="path_dir_link", uselist=False) ################################################################################ # Class describing Entry and in the DB (via sqlalchemy) # an entry is the common bits between files and dirs # type is a convenience var only in this class, not in DB # {dir|file}_etails are convenience data for the relevant details from the Dir # or File class - not in DB # in_dir - is the Dir that this entry is located in (convenience for class only) # FullPathOnFS(): method to get path on the FS for this Entry ################################################################################ class Entry(PA,db.Model): __tablename__ = "entry" id = db.Column(db.Integer, db.Sequence('file_id_seq'), primary_key=True ) name = db.Column(db.String, unique=False, nullable=False ) type_id = db.Column(db.Integer, db.ForeignKey("file_type.id")) type = db.relationship("FileType") dir_details = db.relationship( "Dir", uselist=False ) file_details = db.relationship( "File", uselist=False ) in_dir = db.relationship ("Dir", secondary="entry_dir_link", uselist=False ) def FullPathOnFS(self): if self.in_dir: s=self.in_dir.in_path.path_prefix + '/' if len(self.in_dir.rel_path) > 0: s += self.in_dir.rel_path + '/' s += self.name # this occurs when we have a dir that is the root of a path else: s=self.dir_details.in_path.path_prefix return s ################################################################################ # Class describing File and in the DB (via sqlalchemy) # all files are entries, this is the extra bits only for a file, of note: # hash is unique for files, and used to validate duplicates # woy == week of year, all date fields are used to sort/show content. Date # info can be from exif, or file system, or file name (rarely) # faces: convenience field to show connected face(s) for this file ################################################################################ class File(PA,db.Model): __tablename__ = "file" eid = db.Column(db.Integer, db.ForeignKey("entry.id"), primary_key=True ) size_mb = db.Column(db.Integer, unique=False, nullable=False) thumbnail = db.Column(db.String, unique=False, nullable=True) hash = db.Column(db.String) year = db.Column(db.Integer) month = db.Column(db.Integer) day = db.Column(db.Integer) woy = db.Column(db.Integer) faces = db.relationship ("Face", secondary="face_file_link" ) ################################################################################ # Class describing FileType and in the DB (via sqlalchemy) # pre-defined list of file types (image, dir, etc.) ################################################################################ class FileType(PA,db.Model): __tablename__ = "file_type" id = db.Column(db.Integer, db.Sequence('file_type_id_seq'), primary_key=True ) name = db.Column(db.String, unique=True, nullable=False ) ################################################################################ # this is how we order all queries based on value of 'noo' - used with # access *order_map.get(OPT.noo) ################################################################################ order_map = { "Newest": (File.year.desc(),File.month.desc(),File.day.desc(),Entry.name.desc()), "Oldest": (File.year,File.month,File.day,Entry.name), # careful, these need to be tuples, so with a , at the end "Z to A": (Entry.name.desc(),), "A to Z": (Entry.name.asc(),), } ################################################################################ ################################################################################ # Schemas for Path, FileType, File, Dir - used in EntrySchema ################################################################################ class PathType(ma.SQLAlchemyAutoSchema): class Meta: model = PathType load_instance = True class PathSchema(ma.SQLAlchemyAutoSchema): class Meta: model = Path load_instance = True type = ma.Nested(PathType) root_dir = fields.Method("get_root_dir") icon_url = fields.Method("get_icon_url") def get_icon_url(self, obj): return url_for("internal", filename="icons.svg") + "#" + ICON[obj.type.name] def get_root_dir(self, obj): parts = obj.path_prefix.split('/') return ''.join(parts[2:]) class FileTypeSchema(ma.SQLAlchemyAutoSchema): class Meta: model = FileType load_instance = True class DirSchema(ma.SQLAlchemyAutoSchema): class Meta: model = Dir load_instance = True eid = ma.auto_field() # Explicitly include eid in_path = ma.Nested(PathSchema) class FaceFileLinkSchema(ma.SQLAlchemyAutoSchema): class Meta: model = FaceFileLink load_instance = True model_used = ma.auto_field() class PersonSchema(ma.SQLAlchemyAutoSchema): class Meta: model=Person load_instance = True class RefimgSchema(ma.SQLAlchemyAutoSchema): class Meta: model = Refimg exclude = ('face',) load_instance = True person = ma.Nested(PersonSchema) class FaceRefimgLinkSchema(ma.SQLAlchemyAutoSchema): class Meta: model = FaceRefimgLink load_instance = True class FaceOverrideTypeSchema(ma.SQLAlchemyAutoSchema): class Meta: model = FaceOverrideType load_instance = True class FaceNoMatchOverrideSchema(ma.SQLAlchemyAutoSchema): class Meta: model = FaceNoMatchOverride load_instance = True type = ma.Nested(FaceOverrideTypeSchema) class FaceForceMatchOverrideSchema(ma.SQLAlchemyAutoSchema): class Meta: model = FaceForceMatchOverride load_instance = True person = ma.Nested(PersonSchema) class FaceSchema(ma.SQLAlchemyAutoSchema): class Meta: model=Face exclude = ('face',) load_instance = True refimg = ma.Nested(RefimgSchema,allow_none=True) # faces have to come with a file connection facefile_lnk = ma.Nested(FaceFileLinkSchema) refimg_lnk = ma.Nested(FaceRefimgLinkSchema,allow_none=True) fnmo = ma.Nested( FaceNoMatchOverrideSchema, allow_none=True, many=True ) ffmo = ma.Nested( FaceForceMatchOverrideSchema, allow_none=True, many=True ) class FileSchema(ma.SQLAlchemyAutoSchema): class Meta: model = File load_instance = True faces = ma.Nested(FaceSchema,many=True,allow_none=True) class AmendmentTypeSchema(ma.SQLAlchemyAutoSchema): class Meta: model = AmendmentType load_instance = True class EntryAmendmentSchema(ma.SQLAlchemyAutoSchema): class Meta: model = EntryAmendment load_instance = True eid = ma.auto_field() type = ma.Nested(AmendmentTypeSchema) ################################################################################ # Schema for Entry so we can json for data to the client ################################################################################ class EntrySchema(ma.SQLAlchemyAutoSchema): # gives id, name, type_id class Meta: model = Entry load_instance = True type = ma.Nested(FileTypeSchema) file_details = ma.Nested(FileSchema,allow_none=True) # noting dir_details needs in_path to work dir_details = ma.Nested(DirSchema) # noting in_dir needs in_path and in_path.type to work in_dir = ma.Nested(DirSchema) # allow us to use FullPathOnFS() FullPathOnFS = fields.Method("get_full_path") def get_full_path(self, obj): return obj.FullPathOnFS() # global - this will be use more than once below, so do it once for efficiency entries_schema = EntrySchema(many=True) FOT_Schema = FaceOverrideTypeSchema(many=True) path_Schema = PathSchema(many=True) person_Schema = PersonSchema(many=True) et_schema = AmendmentTypeSchema(many=True) ea_schema = EntryAmendmentSchema(many=True) ################################################################################ # /get_entries_by_ids -> route where we supply list of entry ids (for next/prev # page of data we want to show). Returns json of all matching entries ################################################################################ @app.route('/get_entries_by_ids', methods=['POST']) @login_required def process_ids(): data = request.get_json() # Parse JSON body ids = data.get('ids', []) # Extract list of ids # Query DB for matching entries stmt = ( select(Entry) .options( joinedload(Entry.file_details).joinedload(File.faces).joinedload(Face.refimg).joinedload(Refimg.person), joinedload(Entry.file_details).joinedload(File.faces).joinedload(Face.refimg_lnk), joinedload(Entry.file_details).joinedload(File.faces).joinedload(Face.facefile_lnk), joinedload(Entry.file_details).joinedload(File.faces).joinedload(Face.fnmo).joinedload(FaceNoMatchOverride.type), joinedload(Entry.file_details).joinedload(File.faces).joinedload(Face.ffmo).joinedload(FaceForceMatchOverride.person), ) .where(Entry.id.in_(ids)) ) # unique as the ORM query returns a Cartesian product for the joins. E.g if file has 3 faces, the result has 3 rows of the same entry and file data, but different face data data=db.session.execute(stmt).unique().scalars().all() # data is now in whatever order the DB returns- faster in python than DB supposedly. So, create a mapping from id to entry for quick lookup entry_map = {entry.id: entry for entry in data} # Sort the entries according to the order of ids sorted_data = [entry_map[id_] for id_ in ids if id_ in entry_map] # get any pending entry amendments stmt = select(EntryAmendment).join(AmendmentType) ea = db.session.execute(stmt).unique().scalars().all() ea_data=ea_schema.dump(ea) return jsonify(entries=entries_schema.dump(sorted_data), amend=ea_data) ################################################################################ # /get_dir_entries: # -> if back is false - returns list of eids inside this dir # -> if back is true - returns list of eids inside the parent of this dir ################################################################################ @app.route("/get_dir_eids", methods=["POST"]) @login_required def get_dir_entries(): data = request.get_json() # Parse JSON body dir_id = data.get('dir_id', []) # Extract list of ids back = data.get('back', False) # Extract back boolean noo = data.get('noo', "A to Z") # Extract noo ordering # if we are going back, find the parent id and use that instead if back: # get parent of this dir, to go back stmt=select(EntryDirLink.dir_eid).filter(EntryDirLink.entry_id==dir_id) dir_id = db.session.execute(stmt).scalars().one_or_none() if not dir_id: # return valid as false, we need to let user know this is not an empty dir, it does not exist return jsonify( valid=False, entry_list=[] ) # Just double-check this is still in the DB, in case it got deleted since client made view stmt=select(Entry.id).where(Entry.id==dir_id) ent_id = db.session.execute(stmt).scalars().one_or_none() if not ent_id: # return valid as false, we need to let user know this is not an empty dir, it does not exist return jsonify( valid=False, entry_list=[] ) # get content of dir_id stmt=select(Entry.id).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir_id) stmt=stmt.order_by(*order_map.get(noo) ) return jsonify( valid=True, entry_list=db.session.execute(stmt).scalars().all() ) # get Face override details def getFOT(): stmt = select(FaceOverrideType) fot=db.session.execute(stmt).scalars().all() return FOT_Schema.dump(fot) # get import/storage path details for move dbox def getMoveDetails(): stmt = select(Path).where( or_( Path.type.has(name="Import"), Path.type.has(name="Storage"))) mp=db.session.execute(stmt).scalars().all() return path_Schema.dump(mp) # get people data for the menu for AI matching (of person.tag) def getPeople(): stmt = select(Person) people=db.session.execute(stmt).scalars().all() return person_Schema.dump(people) def initQueryData(): query_data={} query_data['entry_list']=None query_data['root_eid']=0 query_data['NMO'] = getFOT() query_data['move_paths'] = getMoveDetails() query_data['people'] = getPeople() query_data['amendTypes'] = et_schema.dump( getAmendments() ) return query_data ################################################################################ # Get all relevant Entry.ids based on search_term passed in and OPT visuals ################################################################################ def GetSearchQueryData(OPT): query_data=initQueryData() search_term = OPT.search_term # turn * wildcard into sql wildcard of % search_term = search_term.replace('*', '%') if 'AI:' in OPT.search_term: search_term = search_term.replace('AI:', '') # AI searches are for specific ppl/joins in the DB AND we do them for ALL types of searches, define this once ai_query = ( select(Entry.id) .join(File).join(FaceFileLink).join(Face).join(FaceRefimgLink).join(Refimg).join(PersonRefimgLink).join(Person) .where(Person.tag == search_term) .order_by(*order_map.get(OPT.noo) ) ) if 'AI:' in OPT.search_term: all_entries = db.session.execute(ai_query).scalars().all() else: # match name of File file_query = select(Entry.id).join(File).where(Entry.name.ilike(f'%{search_term}%')).order_by(*order_map.get(OPT.noo)) # match name of Dir dir_query = select(Entry.id).join(File).join(EntryDirLink).join(Dir).where(Dir.rel_path.ilike(f'%{search_term}%')).order_by(*order_map.get(OPT.noo)) ai_entries = db.session.execute(ai_query).scalars().all() file_entries = db.session.execute(file_query).scalars().all() dir_entries = db.session.execute(dir_query).scalars().all() # Combine ai, file & dir matches with union() to dedup and then order them all_entries = list(dict.fromkeys(ai_entries + dir_entries + file_entries)) query_data['entry_list']=all_entries return query_data ################################################################################# # Get all relevant Entry.ids based on files_ip/files_sp/files_rbp and OPT visuals ################################################################################# def GetQueryData( OPT ): query_data=initQueryData() # always get the top of the (OPT.prefix) Path's eid and keep it for OPT.folders toggling/use dir_stmt=( select(Entry.id) .join(Dir).join(PathDirLink).join(Path) .where(Dir.rel_path == '').where(Path.path_prefix==OPT.prefix) ) # this should return the 1 Dir (that we want to see the content of) - and with only 1, no need to worry about order dir_arr=db.session.execute(dir_stmt).scalars().all() if dir_arr: dir_id=dir_arr[0] else: dir_id=0 # used to know the parent/root (in folder view), in flat view - just ignore/safe though query_data['root_eid']=dir_id if OPT.folders: # start folder view with only the root folder stmt=select(Entry.id).join(EntryDirLink).filter(EntryDirLink.dir_eid==dir_id) else: # get every File that is in the OPT.prefix Path stmt=( select(Entry.id) .join(File).join(EntryDirLink).join(Dir).join(PathDirLink).join(Path) .where(Path.path_prefix == OPT.prefix) ) stmt=stmt.order_by(*order_map.get(OPT.noo) ) query_data['entry_list']=db.session.execute(stmt).scalars().all() return query_data ################################################################################ # /change_file_opts -> allow sort order, how_many per page, etc. to change, and # then send back the new query_data to update entryList ################################################################################ @app.route("/change_file_opts", methods=["POST"]) @login_required def change_file_opts(): data = request.get_json() # Parse JSON body # allow dot-notation for OPT OPT = SimpleNamespace(**data) if hasattr(OPT, 'folders') and OPT.folders == 'True': OPT.folders=True else: OPT.folders=False # so create a new entryList, and handle that on the client if 'search' in request.referrer: query_data = GetSearchQueryData( OPT ) else: query_data = GetQueryData( OPT ) return make_response( jsonify( query_data=query_data ) ) ################################################################################ # /file_list -> show detailed file list of files from import_path(s) ################################################################################ @app.route("/file_list_ip", methods=["GET"]) @login_required def file_list_ip(): OPT=States( request ) query_data = GetQueryData( OPT ) js_vers = getVersions() return render_template("file_list.html", page_title='View File Details (Import Path)', query_data=query_data, OPT=OPT, js_vers=js_vers ) ################################################################################ # /files -> show thumbnail view of files from import_path(s) ################################################################################ @app.route("/files_ip", methods=["GET"]) @login_required def files_ip(): OPT=States( request ) query_data = GetQueryData( OPT ) js_vers = getVersions() return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", OPT=OPT, query_data=query_data, js_vers=js_vers ) ################################################################################ # /files -> show thumbnail view of files from storage_path ################################################################################ @app.route("/files_sp", methods=["GET"]) @login_required def files_sp(): OPT=States( request ) query_data = GetQueryData( OPT ) js_vers = getVersions() return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", OPT=OPT, query_data=query_data, js_vers=js_vers ) ################################################################################ # /files -> show thumbnail view of files from recycle_bin_path ################################################################################ @app.route("/files_rbp", methods=["GET"]) @login_required def files_rbp(): OPT=States( request ) query_data = GetQueryData( OPT ) js_vers = getVersions() return render_template("files.html", page_title=f"View Files ({OPT.path_type} Path)", OPT=OPT, query_data=query_data, js_vers=js_vers ) ################################################################################ # search -> GET version -> has search_term in the URL and is therefore able to # be used even if the user hits the front/back buttons in the browser. # func shows thumbnails of matching files. ################################################################################ @app.route("/search/", methods=["GET", "POST"]) @login_required def search(search_term): OPT=States( request ) OPT.search_term = search_term query_data=GetSearchQueryData( OPT ) js_vers = getVersions() return render_template("files.html", page_title='View Files', search_term=search_term, query_data=query_data, OPT=OPT, js_vers=js_vers ) ################################################################################ # /files/scan_ip -> allows us to force a check for new files ################################################################################ @app.route("/files/scan_ip", methods=["GET"]) @login_required def scan_ip(): job=NewJob( name="scan_ip", num_files=0, wait_for=None, jex=None, desc="scan for new files in import path" ) return redirect("/jobs") ################################################################################ # /files/force_scan -> deletes old data in DB, and does a brand new scan ################################################################################ @app.route("/files/force_scan", methods=["GET"]) @login_required def force_scan(): job=NewJob( name="force_scan", num_files=0, wait_for=None, jex=None, desc="remove data and rescan import & storage paths" ) return redirect("/jobs") ################################################################################ # /files/scan_sp -> allows us to force a check for new files ################################################################################ @app.route("/files/scan_sp", methods=["GET"]) @login_required def scan_sp(): job=NewJob( name="scan_sp", num_files=0, wait_for=None, jex=None, desc="scan for new files in storage path" ) return redirect("/jobs") ################################################################################ # /fix_dups -> use sql to find duplicates based on same hash, different # filenames, or directories. Pass this straight through to the job manager # as job extras to a new job. ################################################################################ @app.route("/fix_dups", methods=["POST"]) @login_required def fix_dups(): with db.engine.connect() as conn: rows = conn.execute( text( "select e1.id as id1, f1.hash, d1.rel_path as rel_path1, d1.eid as did1, e1.name as fname1, p1.id as path1, p1.type_id as path_type1, e2.id as id2, d2.rel_path as rel_path2, d2.eid as did2, e2.name as fname2, p2.id as path2, p2.type_id as path_type2 from entry e1, file f1, dir d1, entry_dir_link edl1, path_dir_link pdl1, path p1, entry e2, file f2, dir d2, entry_dir_link edl2, path_dir_link pdl2, path p2 where e1.id = f1.eid and e2.id = f2.eid and d1.eid = edl1.dir_eid and edl1.entry_id = e1.id and edl2.dir_eid = d2.eid and edl2.entry_id = e2.id and p1.type_id != (select id from path_type where name = 'Bin') and p1.id = pdl1.path_id and pdl1.dir_eid = d1.eid and p2.type_id != (select id from path_type where name = 'Bin') and p2.id = pdl2.path_id and pdl2.dir_eid = d2.eid and f1.hash = f2.hash and e1.id != e2.id and f1.size_mb = f2.size_mb order by path1, rel_path1, fname1" ) ) if rows.returns_rows == False: SetFELog(f"Err, No more duplicates? Old link followed, or something is wrong!", "warning") return redirect("/") if 'pagesize' not in request.form: # default to 10, see if we have a larger value as someone reset it in the gui, rather than first time invoked pagesize = 10 jexes = JobExtra.query.join(Job).filter(Job.name=='check_dups').filter(Job.pa_job_state=='New').all() jexes.append( JobExtra( name="pagesize", value=str(pagesize) ) ) else: pagesize=int(request.form['pagesize']) DD=Duplicates() for row in rows: DD.AddDup( row ) DD.SecondPass() # DD.Dump() return render_template("dups.html", DD=DD, pagesize=pagesize ) ################################################################################ # /rm_dups -> f/e that shows actual duplicates so that we can delete some dups # this code creates a new job with extras that have hashes/ids to allow removal ################################################################################ @app.route("/rm_dups", methods=["POST"]) @login_required def rm_dups(): jex=[] for el in request.form: if 'kfhash-' in el: # get which row/number kf it is... _, which = el.split('-') jex.append( JobExtra( name=f"kfid-{which}", value=str(request.form['kfid-'+which] )) ) jex.append( JobExtra( name=f"kfhash-{which}", value=str(request.form[el] )) ) if 'kdhash-' in el: # get which row/number kd it is... _, which = el.split('-') jex.append( JobExtra( name=f"kdid-{which}", value=str(request.form['kdid-'+which]) ) ) jex.append( JobExtra( name=f"kdhash-{which}", value=str(request.form[el]) ) ) jex.append( JobExtra( name="pagesize", value="10" ) ) job=NewJob( name="rm_dups", num_files=0, wait_for=None, jex=jex, desc="to delete duplicate files" ) return redirect("/jobs") ################################################################################ # /restore_files -> create a job to restore files for the b/e to process ################################################################################ @app.route("/restore_files", methods=["POST"]) @login_required def restore_files(): jex=[] for el in request.form: jex.append( JobExtra( name=f"{el}", value=str(request.form[el]) ) ) job=NewJob( name="restore_files", num_files=0, wait_for=None, jex=jex, desc="to restore selected file(s)" ) return redirect("/jobs") ################################################################################ # /delete_files -> create a job to delete files for the b/e to process ################################################################################ @app.route("/delete_files", methods=["POST"]) @login_required def delete_files(): jex=[] for el in request.form: jex.append( JobExtra( name=f"{el}", value=str(request.form[el]) ) ) job=NewJob( name="delete_files", num_files=0, wait_for=None, jex=jex, desc="to delete selected file(s)" ) return redirect("/jobs") ################################################################################ # /move_files -> create a job to move files for the b/e to process ################################################################################ @app.route("/move_files", methods=["POST"]) @login_required def move_files(): jex=[] for el in request.form: jex.append( JobExtra( name=f"{el}", value=str(request.form[el]) ) ) job=NewJob( name="move_files", num_files=0, wait_for=None, jex=jex, desc="to move selected file(s)" ) # data is not used, but send response to trigger CheckForJobs() return make_response( jsonify( job_id=job.id ) ) @login_required @app.route("/view/", methods=["POST"]) def view(): data = request.get_json() # Parse JSON body eid = data.get('eid', 0) # Extract list of ids stmt = ( select(Entry) .options( joinedload(Entry.file_details).joinedload(File.faces), joinedload(Entry.file_details).joinedload(File.faces).joinedload(Face.refimg).joinedload(Refimg.person) ) .where(Entry.id == eid) ) # this needs unique() because: # entry (one row for id=660) # file (one row, since file_details is a one-to-one relationship) # face (many rows, since a file can have many faces) # refimg and person (one row per face, via the link tables) # The SQL query returns a Cartesian product for the joins involving collections (like faces). For example, if your file has 3 faces, # the result set will have 3 rows, each with the same entry and file data, but different face, refimg, and person data. data=db.session.execute(stmt).unique().scalars().all() return jsonify(entries_schema.dump(data)) # route called from front/end - if multiple images are being transformed, each transorm == a separate call # to this route (and therefore a separate transorm job. Each reponse allows the f/e to check the # specific transorm job is finished (/check_transform_job) which will be called (say) every 1 sec. from f/e # with a spinning wheel, then when pa_job_mgr has finished it will return the transformed thumb @app.route("/transform", methods=["POST"]) @login_required def transform(): id = request.form['id'] amt = request.form['amt'] jex=[] for el in request.form: jex.append( JobExtra( name=f"{el}", value=str(request.form[el]) ) ) job=NewJob( name="transform_image", num_files=0, wait_for=None, jex=jex, desc="to transform selected file(s)" ) return make_response( jsonify( job_id=job.id ) ) ################################################################################ # /check_transform_job -> URL that is called repeatedly by front-end waiting for the # b/e to finish the transform job. Once done, the new / now # transformed image's thumbnail is returned so the f/e can # update with it ################################################################################ @app.route("/check_transform_job", methods=["POST"]) @login_required def check_transform_job(): job_id = request.form['job_id'] stmt=select(Job).where(Job.id==job_id) job=db.session.execute(stmt).scalars().one_or_none() j=jsonify( finished=False ) if job.pa_job_state == 'Completed': id=[jex.value for jex in job.extra if jex.name == "id"][0] e=Entry.query.join(File).filter(Entry.id==id).first() j=jsonify( finished=True, thumbnail=e.file_details.thumbnail ) return make_response( j ) ################################################################################ # /include -> return contents on /include and does not need a login, so we # can get the icon, and potentially any js, bootstrap, etc. needed for the login page ################################################################################ @app.route("/internal/") def internal(filename): return send_from_directory("internal/", filename) ################################################################################ # /static -> returns the contents of any file referenced inside /static. # we create/use symlinks in static/ to reference the images to show ################################################################################ @app.route("/static/") @login_required def custom_static(filename): return send_from_directory("static/", filename) ############################################################################### # This func creates a new filter in jinja2 to test to see if the Dir being # checked, is a top-level folder of 'OPT.cwd' ################################################################################ @app.template_filter('TopLevelFolderOf') def _jinja2_filter_toplevelfolderof(path, cwd): if os.path.dirname(path) == cwd: return True else: return False ############################################################################### # route to allow the Move Dialog Box to pass a date (YYYYMMDD) and returns a # json list of existing dir names that could be near it in time. Starting # simple, by using YYYYMM-1, YYYYMM, YYYYMM+1 dirs ############################################################################### @app.route("/get_existing_paths/
", methods=["POST"]) @login_required def get_existing_paths(dt): dir_ft=FileType.query.filter(FileType.name=='Directory').first() dirs_arr=[] for delta in range(-14, 15): try: new_dtime=datetime.strptime(dt, "%Y%m%d") + timedelta(days=delta) except: # this is not a date, so we cant work out possible dirs, just # return an empty set return make_response( '[]' ) new_dt=new_dtime.strftime('%Y%m%d') # find dirs named with this date dirs_arr+=Dir.query.filter(Dir.rel_path.ilike('%'+new_dt+'%')).all(); # find dirs with non-dirs (files) with this date dirs_arr+=Dir.query.join(EntryDirLink).join(Entry).filter(Entry.type_id!=dir_ft.id).filter(Entry.name.ilike('%'+new_dt+'%')).all() # remove duplicates from array dirs = set(dirs_arr) # turn DB output into json and return it to the f/e ret='[ ' first_dir=1 for dir in dirs: # this can occur if there is a file with this date name in the top-levle of the path, its legit, but only really happens in DEV # regardless, it cant be used for a existpath button in the F/E, ignore it if dir.rel_path == '': continue if not first_dir: ret +=", " # maxsplit 1, means bits[1] can contain dashes bits=dir.rel_path.split('-',maxsplit=1) ret+= '{ ' # if there is a prefix/suffix, then do prefix='bits[0]-', suffix='bits[1]', otherwise prefix='bits[0]', suffix='' if len(bits) > 1: ret+= '"prefix":"' + bits[0] + '-", "suffix":"' + bits[1] + '", ' else: ret+= '"prefix":"' + bits[0] + '", "suffix":"''", ' ret += '"ptype": "'+dir.in_path.type.name+'"' ret+= ' } ' first_dir=0 ret+= ' ]' return make_response( ret ) # quick helper func to return timestamps of jscript files # we use this as a quick/hacky way of versioning them def getVersions(): js_vers={} js_vers['fs'] = int(os.path.getmtime( "."+url_for( 'internal', filename='js/files_support.js') )) js_vers['vs'] = int(os.path.getmtime( "."+url_for( 'internal', filename='js/view_support.js') )) js_vers['ft'] = int(os.path.getmtime( "."+url_for( 'internal', filename='js/files_transform.js') )) js_vers['ic'] = int(os.path.getmtime( "."+url_for( 'internal', filename='icons.svg') )) js_vers['r180'] = int(os.path.getmtime( "."+url_for( 'internal', filename='rot180.png') )) js_vers['r270'] = int(os.path.getmtime( "."+url_for( 'internal', filename='rot270.png') )) js_vers['r90'] = int(os.path.getmtime( "."+url_for( 'internal', filename='rot90.png') )) js_vers['th'] = int(os.path.getmtime( "."+url_for( 'internal', filename='throbber.gif') )) return js_vers