from flask import Flask, render_template, request, redirect, url_for, render_template_string from flask_compress import Compress from flask_sqlalchemy import SQLAlchemy from sqlalchemy.exc import SQLAlchemyError from flask_marshmallow import Marshmallow from flask_bootstrap import Bootstrap from wtforms import SubmitField, StringField, HiddenField, SelectField, IntegerField, TextAreaField, validators from flask_wtf import FlaskForm import glob from datetime import datetime import os import re import socket from shared import CreateSelect, CreateFoldersSelect, LocationIcon, DB_URL, PROD_HOST, OLDEST_LOG_LIMIT # for ldap auth from flask_ldap3_login import LDAP3LoginManager from flask_login import LoginManager, login_user, login_required, UserMixin, current_user, logout_user from flask_ldap3_login.forms import LDAPLoginForm # pylint: disable=no-member ####################################### Flask App globals ####################################### hostname = socket.gethostname() print( "Running on: {}".format( hostname) ) app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = DB_URL app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['ENV'] = os.environ['ENV'] app.config['SECRET_KEY'] = b'my_insecure_PA_token_with_random_2134876adsfjhlkasdf87' app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 31536000 # ldap config vars: (the last one is required, or python ldap freaks out) app.config['LDAP_HOST'] = 'mara.ddp.net' app.config['LDAP_BASE_DN'] = 'dc=depaoli,dc=id,dc=au' app.config['LDAP_USER_DN'] = 'ou=users' app.config['LDAP_GROUP_DN'] = 'ou=groups' app.config['LDAP_USER_RDN_ATTR'] = 'cn' app.config['LDAP_USER_LOGIN_ATTR'] = 'uid' app.config['LDAP_BIND_USER_DN'] = None app.config['LDAP_BIND_USER_PASSWORD'] = None app.config['LDAP_GROUP_OBJECT_FILTER'] = '(objectclass=posixGroup)' db = SQLAlchemy(app) # create the (flask) sqlalchemy connection ma = Marshmallow(app) # set up Marshmallow - data marshalling / serialising Bootstrap(app) # set up Bootstrap - used in flask-forms (TODO: CONFIRM THIS IS NEEDED - sometimes I do boostrap by hand anyway) login_manager = LoginManager(app) # Setup a Flask-Login Manager ldap_manager = LDAP3LoginManager(app) # Setup a LDAP3 Login Manager. login_manager.login_view = "login" # default login route, failed with url_for, so hard-coded Compress(app) ################################# Now, import separated class files ################################### from ai import aistats from files import Entry from person import Person from settings import Settings from user import PAUser ####################################### GLOBALS ####################################### # allow jinja2 to call these python functions directly app.jinja_env.add_extension('jinja2.ext.loopcontrols') app.jinja_env.globals['CreateSelect'] = CreateSelect app.jinja_env.globals['CreateFoldersSelect'] = CreateFoldersSelect app.jinja_env.globals['LocationIcon'] = LocationIcon app.jinja_env.globals['OLDEST_LOG_LIMIT'] = OLDEST_LOG_LIMIT app.jinja_env.globals['current_user'] = current_user # Declare a User Loader for Flask-Login. # Returns the User if it exists in our 'database', otherwise returns None. @login_manager.user_loader def load_user(id): pau=PAUser.query.filter(PAUser.dn==id).first() return pau # Declare The User Saver for Flask-Ldap3-Login # This method is called whenever a LDAPLoginForm() successfully validates. # store the user details / session in the DB if it is not in there already @ldap_manager.save_user def save_user(dn, username, data, memberships): pau=PAUser.query.filter(PAUser.dn==dn).first() # if we already have a valid user/session, and say the web has restarted, just re-use it, dont make more users if pau: return pau pau=PAUser(dn=dn, default_import_noo="Oldest", default_storage_noo="A to Z", default_search_noo="Newest", default_grouping="None", default_how_many=50, default_size=128, default_import_folders=False, default_storage_folders=True ) db.session.add(pau) db.session.commit() return pau # default page, just the navbar @app.route("/", methods=["GET"]) @login_required def main_page(): # Redirect users who are not logged in. if not current_user or current_user.is_anonymous: return redirect(url_for('login')) return render_template("base.html") # route for the login page/box # POST is when user submits pwd & uses flask-login to hit ldap, validate pwd # if valid, then we save user/session into the DB via login_user() -> calls save_user() @app.route('/login', methods=['GET', 'POST']) def login(): # Instantiate a LDAPLoginForm which has a validator to check if the user # exists in LDAP. form = LDAPLoginForm() form.submit.label.text="Login" # the re matches on any special LDAP chars, we dont want someone # ldap-injecting our username, so send them back to the login page instead if request.method == 'POST' and re.search( r'[()\\*&!]', request.form['username']): print( f"WARNING: Detected special LDAP chars in username: {request.form['username']}") return redirect(url_for('login')) if form.validate_on_submit(): # Successfully logged in, We can now access the saved user object via form.user. login_user(form.user, remember=True) # Tell flask-login to log them in. next = request.args.get("next") if next: return redirect(next) # Send them back where they came from else: return redirect( url_for('main_page') ) return render_template("login.html", form=form) ################################################################################ # simple about route/page that shows build date and because Prod version is # Docker built, it picks up a git log for displaying recent commits ################################################################################ @app.route('/about') @login_required def about(): commits=[] try: with open( 'internal/git-log.txt', 'r' ) as fp: can_add=False for l in fp: if l.startswith( 'commit' ) or l.startswith( 'Author:' ): pass elif l.startswith( 'Date:' ): if can_add: commits.append(o) o={} o['date']=l.replace("Date: ", "").split("+")[0] o['str']='' can_add=True else: o['str']=o['str']+l except FileNotFoundError: most_recent=0 for file in glob.glob("./*"): timestamp = os.path.getctime(file) if timestamp < most_recent: continue most_recent = timestamp o={} o['date']="No known date" o['str']=f'No commits to display (if this is DEV then last change was {datetime.fromtimestamp(most_recent).replace(microsecond=0)})' # add last commit (or fake commit if no file) commits.append(o) bugs=[] try: with open( 'internal/BUGs', 'r' ) as fp: can_add=False for l in fp: if not l.startswith( 'BUG' ): if can_add: b['str']=b['str']+'
'+l else: pass else: if can_add: bugs.append(b) b={} b['str']=l can_add=True except FileNotFoundError: b={} b['str']="No BUGs defined - cannot find BUGs file!?" bugs.append(b) try: with open( 'internal/TODO', 'r' ) as fp: todo=fp.read() except FileNotFoundError: todo="No TODOs defined - cannot find TODO file!?" try: with open( 'internal/build-date.txt', 'r' ) as fp: if 'ENV' in os.environ: build_info=os.environ['ENV'] else: build_info="Development" build_info = build_info + " version built on: " + fp.read() except FileNotFoundError: if 'ENV' not in os.environ or os.environ['ENV'] == "development": build_info="Development version" else: build_info="No build info found!?" return render_template("about.html", commits=commits, bugs=bugs, todo=todo, build_info=build_info) @app.route('/logout') @login_required def logout(): logout_user() return redirect('/login') ############################################################################### # main to be called via Flask/Gunicorn ############################################################################### def main(): if hostname == PROD_HOST: app.run(ssl_context=('/etc/letsencrypt/live/pa.depaoli.id.au/cert.pem', '/etc/letsencrypt/live/pa.depaoli.id.au/privkey.pem'), host="0.0.0.0", debug=False) else: app.run(host="0.0.0.0", debug=True) ############################################################################### # This func creates a new filter in jinja2 to test to hand back the username # from the ldap dn ################################################################################ @app.template_filter('Username') def _jinja2_filter_parentpath(dn): # pull apart a dn (uid=xxx,dn=yyy,etc), and return the xxx username=str(dn) s=username.index('=') s+=1 f=username.index(',') return username[s:f]