182 lines
6.9 KiB
Python
182 lines
6.9 KiB
Python
from wtforms import SubmitField, StringField, HiddenField, validators, Form
|
|
from flask_wtf import FlaskForm
|
|
from flask import request, render_template, redirect, send_from_directory
|
|
from main import db, app, ma
|
|
from sqlalchemy import Sequence
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from status import st, Status
|
|
import os
|
|
import glob
|
|
from PIL import Image
|
|
from pymediainfo import MediaInfo
|
|
import hashlib
|
|
import exifread
|
|
import base64
|
|
import numpy
|
|
import cv2
|
|
|
|
################################################################################
|
|
# Local Class imports
|
|
################################################################################
|
|
from settings import Settings
|
|
|
|
class FileData():
|
|
def __init__(self):
|
|
self.view_path=''
|
|
self.view_list=[]
|
|
self.symlink=''
|
|
self.file_list=[]
|
|
|
|
################################################################################
|
|
# Utility Functions for Files
|
|
################################################################################
|
|
|
|
def getExif(self, file):
|
|
f = open(file, 'rb')
|
|
try:
|
|
tags = exifread.process_file(f)
|
|
except:
|
|
print('NO EXIF TAGS?!?!?!?')
|
|
f.close()
|
|
raise
|
|
f.close()
|
|
|
|
fthumbnail = base64.b64encode(tags['JPEGThumbnail'])
|
|
fthumbnail = str(fthumbnail)[2:-1]
|
|
return fthumbnail
|
|
|
|
def isVideo(self, file):
|
|
try:
|
|
fileInfo = MediaInfo.parse(file)
|
|
for track in fileInfo.tracks:
|
|
if track.track_type == "Video":
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
|
|
# Converts linux paths into windows paths
|
|
# HACK: assumes c:, might be best to just look for [a-z]: ?
|
|
def FixPath(self, p):
|
|
if p.startswith('c:'):
|
|
p = p.replace('/', '\\')
|
|
return p
|
|
|
|
# Returns an md5 hash of the fnames' contents
|
|
def md5(self, fname):
|
|
hash_md5 = hashlib.md5()
|
|
with open(fname, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
return hash_md5.hexdigest()
|
|
|
|
def isImage(self, file):
|
|
try:
|
|
img = Image.open(file)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def generateVideoThumbnail(self, file):
|
|
#overall wrapper function for generating video thumbnails
|
|
vcap = cv2.VideoCapture(file)
|
|
res, im_ar = vcap.read()
|
|
while im_ar.mean() < 15 and res:
|
|
res, im_ar = vcap.read()
|
|
im_ar = cv2.resize(im_ar, (160, 90), 0, 0, cv2.INTER_LINEAR)
|
|
#save on a buffer for direct transmission
|
|
res, thumb_buf = cv2.imencode('.jpeg', im_ar)
|
|
# '.jpeg' etc are permitted
|
|
#get the bytes content
|
|
bt = thumb_buf.tostring()
|
|
fthumbnail = base64.b64encode(bt)
|
|
fthumbnail = str(fthumbnail)[2:-1]
|
|
print(fthumbnail)
|
|
return fthumbnail
|
|
|
|
|
|
##############################################################################
|
|
# HACK: At present this only handles one path (need to re-factor if we have #
|
|
# multiple valid paths in import_path) #
|
|
##############################################################################
|
|
def GenerateFileData(self):
|
|
sets = Settings.query.filter(Settings.name=="import_path").all()
|
|
paths= sets[0].value.split("#")
|
|
|
|
for path in paths:
|
|
path = self.FixPath(path)
|
|
if os.path.exists( path ):
|
|
self.view_path = path
|
|
# to serve static content of the images, we create a symlink
|
|
# from inside the static subdir of each import_path that exists
|
|
self.symlink = self.FixPath('static/{}'.format( os.path.basename(path[0:-1])))
|
|
if not os.path.exists(self.symlink):
|
|
os.symlink(path, self.symlink)
|
|
|
|
self.file_list.append(glob.glob(self.view_path + '**', recursive=True))
|
|
for file in self.file_list[0]:
|
|
fthumbnail = None
|
|
if file == path:
|
|
continue
|
|
if os.path.isdir(file):
|
|
ftype = 'Directory'
|
|
elif self.isImage(file):
|
|
ftype = 'Image'
|
|
fthumbnail = self.getExif(file)
|
|
elif self.isVideo(file):
|
|
ftype = 'Video'
|
|
fthumbnail = self.generateVideoThumbnail(file)
|
|
else:
|
|
ftype = 'File'
|
|
|
|
if ftype != "Directory":
|
|
fhash=self.md5(file)
|
|
else:
|
|
fhash=None
|
|
|
|
fsize = round(os.stat(file).st_size/(1024*1024))
|
|
fname=file.replace(path, "")
|
|
self.view_list.append( Files( name=fname, type=ftype, size_mb=fsize, hash=fhash, thumbnail=fthumbnail ))
|
|
return self
|
|
|
|
################################################################################
|
|
# Class describing Files in the database, and via sqlalchemy, connected to the DB as well
|
|
# This has to match one-for-one the DB table
|
|
################################################################################
|
|
class Files(db.Model):
|
|
id = db.Column(db.Integer, db.Sequence('files_id_seq'), primary_key=True )
|
|
name = db.Column(db.String, unique=True, nullable=False )
|
|
type = db.Column(db.String, unique=False, nullable=False)
|
|
size_mb = db.Column(db.Integer, unique=False, nullable=False)
|
|
# hash might not be unique, this could be the source of dupe problems
|
|
hash = db.Column(db.Integer, unique=True, nullable=True)
|
|
thumbnail = db.Column(db.LargeBinary, unique=False, nullable=True)
|
|
|
|
def __repr__(self):
|
|
return "<id: {}, name: {}>".format(self.id, self.name )
|
|
|
|
|
|
|
|
################################################################################
|
|
# /file_list -> show detailed file list of files from import_path(s)
|
|
################################################################################
|
|
@app.route("/file_list", methods=["GET"])
|
|
def file_list():
|
|
filedata = FileData()
|
|
file_data=filedata.GenerateFileData()
|
|
return render_template("file_list.html", page_title='View Files (details)', file_data=file_data, alert=st.GetAlert(), message=st.GetMessage() )
|
|
|
|
################################################################################
|
|
# /files -> show thumbnail view of files from import_path(s)
|
|
################################################################################
|
|
@app.route("/files", methods=["GET"])
|
|
def files():
|
|
filedata = FileData()
|
|
file_data=filedata.GenerateFileData()
|
|
return render_template("files.html", page_title='View Files', file_data=file_data, alert=st.GetAlert(), message=st.GetMessage() )
|
|
|
|
|
|
@app.route("/static/<filename>")
|
|
def custom_static(filename):
|
|
return send_from_directory("static/", filename)
|