from wtforms import SubmitField, StringField, HiddenField, validators, Form from flask_wtf import FlaskForm from flask import request, render_template, redirect, send_from_directory from main import db, app, ma from sqlalchemy import Sequence from sqlalchemy.exc import SQLAlchemyError from status import st, Status import os import glob from PIL import Image from pymediainfo import MediaInfo import hashlib import exifread import base64 import numpy import cv2 import time ################################################################################ # Local Class imports ################################################################################ from settings import Settings from job import Job, Joblog class FileData(): def __init__(self): self.view_list=[] def getExif(self, file): f = open(file, 'rb') try: tags = exifread.process_file(f) except: print('NO EXIF TAGS?!?!?!?') f.close() raise f.close() fthumbnail = base64.b64encode(tags['JPEGThumbnail']) fthumbnail = str(fthumbnail)[2:-1] return fthumbnail def isVideo(self, file): try: fileInfo = MediaInfo.parse(file) for track in fileInfo.tracks: if track.track_type == "Video": return True return False except Exception as e: return False # Converts linux paths into windows paths # HACK: assumes c:, might be best to just look for [a-z]: ? def FixPath(self, p): if p.startswith('c:'): p = p.replace('/', '\\') return p # Returns an md5 hash of the fnames' contents def md5(self, fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def isImage(self, file): try: img = Image.open(file) return True except: return False def generateVideoThumbnail(self, file): #overall wrapper function for generating video thumbnails vcap = cv2.VideoCapture(file) res, im_ar = vcap.read() while im_ar.mean() < 15 and res: res, im_ar = vcap.read() im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR) #save on a buffer for direct transmission res, thumb_buf = cv2.imencode('.jpeg', im_ar) # '.jpeg' etc are permitted #get the bytes content bt = thumb_buf.tostring() fthumbnail = base64.b64encode(bt) fthumbnail = str(fthumbnail)[2:-1] return fthumbnail ############################################################################## # HACK: At present this only handles one path (need to re-factor if we have # # multiple valid paths in import_path) # ############################################################################## def GenerateFileData(self): settings = Settings.query.all() if not settings: return last_import_date = settings[0].last_import_date paths = settings[0].import_path.split("#") for path in paths: print( "GenerateFileData: Checking {}".format( path ) ) path = self.FixPath(path) if os.path.exists( path ): # to serve static content of the images, we create a symlink # from inside the static subdir of each import_path that exists symlink = self.FixPath('static/{}'.format( os.path.basename(path[0:-1]))) if not os.path.exists(symlink): os.symlink(path, symlink) file_list=[] file_list.append(glob.glob(path + '**', recursive=True)) for file in file_list[0]: if file == path: continue stat = os.stat(file) if last_import_date == 0 or stat.st_ctime > last_import_date: print( "{} - {} is newer than {}".format( file, stat.st_ctime, last_import_date ) ) fthumbnail = None if os.path.isdir(file): ftype = 'Directory' elif self.isImage(file): ftype = 'Image' fthumbnail = self.getExif(file) elif self.isVideo(file): ftype = 'Video' fthumbnail = self.generateVideoThumbnail(file) else: ftype = 'File' if ftype != "Directory": fhash=self.md5(file) else: fhash=None fsize = round(os.stat(file).st_size/(1024*1024)) fname=file.replace(path, "") path_prefix=symlink.replace(path,"") file_obj = File( name=fname, type=ftype, size_mb=fsize, hash=fhash, path_prefix=path_prefix, thumbnail=fthumbnail ) db.session.add(file_obj) else: print( "{} - {} is OLDER than {}".format( file, stat.st_ctime, last_import_date ) ) settings[0].last_import_date = time.time() db.session.commit() self.view_list = File.query.all() return self ################################################################################ # Class describing File in the database, and via sqlalchemy, connected to the DB as well # This has to match one-for-one the DB table ################################################################################ class File(db.Model): id = db.Column(db.Integer, db.Sequence('file_id_seq'), primary_key=True ) name = db.Column(db.String, unique=True, nullable=False ) type = db.Column(db.String, unique=False, nullable=False) path_prefix = db.Column(db.String, unique=False, nullable=False) size_mb = db.Column(db.Integer, unique=False, nullable=False) # hash might not be unique, this could be the source of dupe problems hash = db.Column(db.Integer, unique=True, nullable=True) thumbnail = db.Column(db.String, unique=False, nullable=True) def __repr__(self): return "".format(self.id, self.name ) ### Initiatlise the file data set (GenerateFileData is clever enough to not ### re-process files when we run twice in quick succession, e.g. when running ### Flask in DEBUG mode filedata = FileData() filedata.GenerateFileData() ################################################################################ # /file_list -> show detailed file list of files from import_path(s) ################################################################################ @app.route("/file_list", methods=["GET"]) def file_list(): return render_template("file_list.html", page_title='View Files (details)', file_data=filedata, alert=st.GetAlert(), message=st.GetMessage() ) ################################################################################ # /files -> show thumbnail view of files from import_path(s) ################################################################################ @app.route("/files", methods=["GET"]) def files(): return render_template("files.html", page_title='View Files', file_data=filedata, alert=st.GetAlert(), message=st.GetMessage() ) ################################################################################ # /files/scannow -> allows us to force a check for new files ################################################################################ @app.route("/files/scannow", methods=["GET"]) def scannow(): filedata.GenerateFileData() return render_template("base.html", page_title='Forced look for new items', file_data=filedata, alert="success", message="Scanned for new files" ) ################################################################################ # /files/forcescan -> deletes old data in DB, and does a brand new scan ################################################################################ @app.route("/files/forcescan", methods=["GET"]) def forcescan(): File.query.delete() Settings.query.all()[0].last_import_date=0 db.session.commit() filedata.GenerateFileData() return render_template("base.html", page_title='Forced look for new items', file_data=filedata, alert="success", message="Forced remove and recreation of all file data" ) ################################################################################ # /static -> returns the contents of any file referenced inside /static. # we create/use symlinks in static/ to reference the images to show ################################################################################ @app.route("/static/") def custom_static(filename): return send_from_directory("static/", filename)