import socket import os import face_recognition import io import base64 import subprocess from PIL import Image, ImageOps import numpy as np class PA: def __repr__(self): str=f"<{self.__class__.__name__}(" for k, v in self.__dict__.items(): if isinstance(v, (bytes, bytearray)): str += f"{k}=, " elif k == "thumbnail": str += f"{k}=, " # skip internal state elif k == "_sa_instance_state": continue else: str += f"{k}={v!r}, " str=str.rstrip(", ") + ")>" return str hostname = socket.gethostname() # dict to store name of icon in icons.svg so we can use by referece in html ICON={} ICON["Import"]="import" ICON["Storage"]="db" ICON["Bin"]="trash" SECS_IN_A_DAY = 86400 NEWEST_LOG_LIMIT = 15 OLDEST_LOG_LIMIT = 5 # check where we are running, if laptop, then run web server and db on localhost if hostname == "lappy": PA_JOB_MANAGER_HOST="localhost" DB_URL = 'postgresql+psycopg2://pa:for_now_pa@localhost:5432/pa' PA_EXIF_AUTOROTATE = './utils/pa_exifautotran' PA_EXIF_ROTATER = './utils/pa_rotate' # if we dont set the env or we are explicitly DEV, run web server on localhost & db on mara (port 65432) elif 'ENV' not in os.environ or os.environ['ENV'] == "development" or os.environ['ENV'] == "container": PA_JOB_MANAGER_HOST="localhost" # DB_URL = 'postgresql+psycopg2://pa:for_now_pa@mara.ddp.net:65432/pa' DB_URL = 'postgresql+psycopg2://pa:for_now_pa@padb_dev/pa' PA_EXIF_AUTOROTATE = './utils/pa_exifautotran' PA_EXIF_ROTATER = './utils/pa_rotate' # if we explicitly are on PROD, run web server on localhost (pa_web container) & db on mara (port 5432 on padb container)- only accessed via internal docker ports) elif os.environ['ENV'] == "production": PA_JOB_MANAGER_HOST="localhost" DB_URL = 'postgresql+psycopg2://pa:for_now_pa@padb/pa' PA_EXIF_AUTOROTATE = '/code/utils/pa_exifautotran' PA_EXIF_ROTATER = '/code/utils/pa_rotate' else: print( "ERROR: I do not know which environment (development, etc.) and which DB (on which host to use)" ) if 'ENV' not in os.environ: print( f"ERROR: no ENV variable set in the environment" ) else: print( f"ERROR: ENV is {os.environ['ENV']}" ) exit( -1 ) # PORT number we connect to the pa_job_manager on - by default it runs on the # same host as the web manager, but it can run wherever (as long as the file # system is available in the same path) PA_JOB_MANAGER_PORT=55430 # default thumbnail size (width and height) for images THUMBSIZE=256 # Helper function used in html files to create a bootstrap'd select with options. With: # name: the data field in the submitted form # selected: chooses the option that should be selected # list: for the options # js: optional extra javascript to run onChange (so far used to reset offset when choosing to change ordering of files being viewed) # add_class: some class overrides, usually to format margins/padding/format/text size,etc. # vals: in case the value is not the same as the name in the list provided... E.g. list={'yes', 'no'}, vals={'0':yes,'1':no} def CreateSelect(name, selected, list, js="", add_class="", vals={} ): str = f'' return str # TODO: can this be collapsed into using above - probably if the 'selected' passed in was 'In Folder' or 'Flat View' -- but I think that isn't in a var??? # Helper function used in html files to create a bootstrap'd select with options. Same as CreateSelect() really, only contains # hard-coded True/False around the if selected part, but with string based "True"/"False" in the vals={}, and list has "In Folders", "Flat View" def CreateFoldersSelect(selected, js="", add_class=""): str = f'' return str # wrapper function to return the path to an icon based on this objects type - # just for convenience/shortening in the html def LocationIcon(obj): return ICON[obj.in_dir.in_path.type.name] # translate a path type, a full FS path and a file into the full FS path of the # symlink on the file system. To note, this is used with file == path, when we # want to just get the symlink to the path itself, otherwise file == ### FIXME: I think this is way over-complicated, want to revist one day, with #what params are passed in, what we need to get out -- I think the overloaded #file/oath use case, and why sometimes we trail with a / or not and then concat #last_dir and bit before last_dir, etc. this feels a bit too complicated for #what it does OR we comment this much better def SymlinkName(ptype, path, file): sig_bit=file.replace(path, "") last_dir=os.path.basename(path[0:-1]) if len(sig_bit) > 0 and sig_bit[-1] == '/': last_bit = os.path.dirname(sig_bit)[0:-1] else: last_bit = os.path.dirname(sig_bit) symlink = 'static/'+ptype+'/'+last_dir+'/'+last_bit if symlink[-1] == '/': symlink=symlink[0:-1] return symlink # generates the thumbnail for an image - uses THUMBSIZE, and deals with non RGB images, and rotated images (based on exif) # returns data for thumbnail and original width and height, which gets stored in DB. Used when re-scaling viewed thumbs (refimgs on person page) def GenThumb(fname,auto_rotate): try: if auto_rotate: im_orig = Image.open(fname) if im_orig.format == 'JPEG': # run cmdline util to re-orient jpeg (only changes if needed, and does it losslessly) p = subprocess.run([PA_EXIF_AUTOROTATE,fname] ) im=Image.open(fname) else: im=im_orig # if we don't autorotate/touch the original, we still want the thumbnail oriented the right way else: im_orig = Image.open(fname) if im_orig.format == 'JPEG': im = ImageOps.exif_transpose(im_orig) else: im = im_orig # if mode isn't RGB thumbnail fails, so force it if needed if im.mode != "RGB": im = im.convert('RGB') orig_w, orig_h = im.size im.thumbnail((THUMBSIZE,THUMBSIZE)) img_bytearray = io.BytesIO() im.save(img_bytearray, format='JPEG') img_bytearray = img_bytearray.getvalue() thumbnail = base64.b64encode(img_bytearray) thumbnail = str(thumbnail)[2:-1] return thumbnail, orig_w, orig_h except Exception as e: print( f"GenThumb failed: {e}") return None, None, None # generate Face data (and location) - wrapper func of face_recognition library # used to store refimg data into the DB def GenFace(fname, model): try: img = face_recognition.load_image_file(fname) location = face_recognition.face_locations(img, model=model) encodings = face_recognition.face_encodings(img, known_face_locations=location) if len(encodings) and len(location): return encodings[0].tobytes(), location[0] except Exception as e: print( f"GenFace failed: {e}" ) return None, None