from flask import Flask, render_template, request, redirect, jsonify, url_for from flask_sqlalchemy import SQLAlchemy from sqlalchemy import desc, text from sqlalchemy.exc import SQLAlchemyError from flask_marshmallow import Marshmallow from flask_bootstrap import Bootstrap from wtforms import SubmitField, StringField, HiddenField, SelectField, IntegerField, TextAreaField, validators, ValidationError from flask_wtf import FlaskForm #from flask_compress import Compress from status import st, Status # for ldap auth from flask_ldap3_login import LDAP3LoginManager from flask_login import LoginManager, login_user, login_required, UserMixin, current_user, logout_user from flask_ldap3_login.forms import LDAPLoginForm import re import os import contextlib ####################################### Flask App globals ####################################### app = Flask(__name__) # local DB conn string if os.environ['FLASK_ENV'] == "production": DB_URL = 'postgresql+psycopg2://ddp:blahdeblah@bookdb:5432/library' app.config['FLASK_ENV']="production" elif os.environ['FLASK_ENV'] == "container": app.config['FLASK_ENV']="container" DB_URL = 'postgresql+psycopg2://ddp:blahdeblah@bookdb_dev:5432/library' else: app.config['FLASK_ENV']="development" DB_URL = 'postgresql+psycopg2://ddp:blahdeblah@127.0.0.1:55432/library' app.config['SQLALCHEMY_DATABASE_URI'] = DB_URL app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config.from_mapping( SECRET_KEY=b'\xd6\x04\xbdj\xfe\xed$c\x1e@\xad\x0f\x13,@G') # ldap config vars: (the last one is required, or python ldap freaks out) app.config['LDAP_HOST'] = 'mara.ddp.net' app.config['LDAP_BASE_DN'] = 'dc=depaoli,dc=id,dc=au' app.config['LDAP_USER_DN'] = 'ou=users' app.config['LDAP_GROUP_DN'] = 'ou=groups' app.config['LDAP_USER_RDN_ATTR'] = 'cn' app.config['LDAP_USER_LOGIN_ATTR'] = 'uid' app.config['LDAP_BIND_USER_DN'] = None app.config['LDAP_BIND_USER_PASSWORD'] = None app.config['LDAP_GROUP_OBJECT_FILTER'] = '(objectclass=posixGroup)' db = SQLAlchemy(app) ma = Marshmallow(app) Bootstrap(app) # setup ldap for auth login_manager = LoginManager(app) # Setup a Flask-Login Manager ldap_manager = LDAP3LoginManager(app) # Setup a LDAP3 Login Manager. login_manager.login_view = "login" # default login route, failed with url_for, so hard-coded # enable compression for http / speed #Compress(app) ################################# Now, import non-book classes ################################### from author import Author, AuthorForm, AuthorSchema, GetAuthors from publisher import Publisher, PublisherForm, PublisherSchema, GetPublisherById from genre import Genre, GenreForm, GenreSchema, GetGenres from condition import Condition, ConditionForm, ConditionSchema, GetConditionById from covertype import Covertype, CovertypeForm, CovertypeSchema, GetCovertypeById from owned import Owned, OwnedForm, OwnedSchema, GetOwnedById from rating import Rating, RatingForm, RatingSchema, GetRatingById from loan import Loan, LoanForm, LoanSchema from series import Series, SeriesForm, SeriesSchema, ListOfSeriesWithMissingBooks, CalcAvgRating from user import BDBUser # hacky constants (FIX THIS) ON_WISHLIST=2 COVERTYPE_NOT_APPLICABLE=4 CONDITION_NOT_APPLICABLE=4 ####################################### CLASSES / DB model ####################################### class QuickParentBook: def __init__(self): self.parent={} self.publisher=-1 self.owned=-1 self.covertype=-1 self.condition=-1 self.blurb='' def __repr__(self): return "".format(self.parent, self.publisher, self.owned, self.covertype, self.condition, self.blurb ) class Book_Genre_Link(db.Model): __tablename__ = "book_genre_link" book_id = db.Column( db.Integer, db.ForeignKey('book.id'), primary_key=True) genre_id = db.Column( db.Integer, db.ForeignKey('genre.id'), primary_key=True) class Book_Author_Link(db.Model): __tablename__ = "book_author_link" book_id = db.Column( db.Integer, db.ForeignKey('book.id'), primary_key=True) author_id = db.Column( db.Integer, db.ForeignKey('author.id'), primary_key=True) author_num = db.Column( db.Integer, primary_key=True) author = db.relationship('Author' ) def __repr__(self): return f"" class Book_Loan_Link(db.Model): __tablename__ = "book_loan_link" book_id = db.Column(db.Integer, db.ForeignKey('book.id'), unique=True, nullable=False, primary_key=True) loan_id = db.Column(db.Integer, db.ForeignKey('loan.id'), unique=True, nullable=False, primary_key=True) def __repr__(self): return "".format(self.book_id, self.loan_id) class Book_Series_Link(db.Model): __tablename__ = "book_series_link" book_id = db.Column(db.Integer, db.ForeignKey('book.id'), unique=True, nullable=False, primary_key=True) series_id = db.Column(db.Integer, db.ForeignKey('series.id'), unique=True, nullable=False, primary_key=True) book_num = db.Column(db.Integer) def __repr__(self): return "".format(self.book_id, self.series_id, self.book_num) def SeriesBookNum(series_id, book_id): bsl=Book_Series_Link.query.filter( Book_Series_Link.book_id==book_id, Book_Series_Link.series_id==series_id ).first() if bsl: return bsl.book_num else: print( f"WARNING: tried to find book number in this series for: book_id={book_id}, series_id={series_id} but no db data for it" ) return 0 class Book_Sub_Book_Link(db.Model): __tablename__ = "book_sub_book_link" book_id = db.Column(db.Integer, db.ForeignKey('book.id'), unique=True, nullable=False, primary_key=True) sub_book_id = db.Column(db.Integer, db.ForeignKey('book.id'), unique=True, nullable=False, primary_key=True) sub_book_num = db.Column(db.Integer) def __repr__(self): return "".format(self.book_id, self.sub_book_id, self.sub_book_num) class Book(db.Model): id = db.Column(db.Integer, db.Sequence('book_id_seq'), primary_key=True ) title = db.Column(db.String(100), unique=True, nullable=False) publisher = db.Column(db.Integer, db.ForeignKey('publisher.id')) genre = db.relationship('Genre', secondary=Book_Genre_Link.__table__ ) loan = db.relationship('Loan', secondary=Book_Loan_Link.__table__) series = db.relationship('Series', secondary=Book_Series_Link.__table__,order_by=Series.num_books) bsl = db.relationship('Book_Series_Link', overlaps="series" ) year_published = db.Column(db.Integer) condition = db.Column(db.Integer, db.ForeignKey('condition.id')) covertype = db.Column(db.Integer, db.ForeignKey('covertype.id')) owned = db.Column(db.Integer, db.ForeignKey('owned.id')) rating = db.Column(db.Integer, db.ForeignKey('rating.id')) notes = db.Column(db.Text) blurb = db.Column(db.Text) created = db.Column(db.DateTime) modified = db.Column(db.DateTime) # take actual parent book as there is no real associated sub_book_num data and can just use it parent = db.relationship('Book', secondary=Book_Sub_Book_Link.__table__, primaryjoin="Book.id==Book_Sub_Book_Link.sub_book_id", secondaryjoin="Book.id==Book_Sub_Book_Link.book_id", uselist=False ) # but use child_ref as sub_book_num is per book, and I can't connect an empty array of sub_book_nums to a child book array in "child" child_ref = db.relationship('Book_Sub_Book_Link', secondary=Book_Sub_Book_Link.__table__, primaryjoin="Book.id==Book_Sub_Book_Link.book_id", secondaryjoin="Book.id==Book_Sub_Book_Link.sub_book_id", order_by="Book_Sub_Book_Link.sub_book_num", overlaps="parent" ) # use this to manage author now that author_num is used to order them bals = db.relationship('Book_Author_Link', order_by="Book_Author_Link.author_num" ) def IsParent(self): if len(self.child_ref): return True else: return False def IsChild(self): if self.parent: return True else: return False def FirstSubBookNum(self): # need to work out the first sub book and return an id? if self.IsParent(): return self.child_ref[0].sub_book_num else: return 1 def LastSubBookNum(self): # need to work out the last sub book and return an id? if self.IsParent(): # -1 subscript returns the last one return self.child_ref[-1].sub_book_num else: return 1 def NumSubBooks(self): if self.IsParent(): return len(self.child_ref) else: return 1 def MoveBookInSeries( self, series_id, amt ): # if parent book, move all sub books instead try: if self.IsParent(): tmp_book=book_schema.dump(self) for book in tmp_book['child_ref']: tmp_bid=GetBookIdFromBookSubBookLinkByIdAndSubBookNum( self.id, book['sub_book_num'] ) bsl=Book_Series_Link.query.filter( Book_Series_Link.book_id==tmp_bid, Book_Series_Link.series_id==series_id ).all() bsl[0].book_num=bsl[0].book_num+amt else: bsl=Book_Series_Link.query.filter( Book_Series_Link.book_id==self.id, Book_Series_Link.series_id==series_id ).all() bsl[0].book_num=bsl[0].book_num+amt db.session.commit() return except SQLAlchemyError as e: st.SetAlert( "danger" ) st.SetMessage( e.orig ) return redirect( '/series/{}'.format(series_id) ) def __repr__(self): return "".format(self.id, self.title, self.year_published, self.rating, self.condition, self.owned, self.covertype, self.notes, self.blurb, self.created, self.modified, self.publisher, self.genre, self.parent ) class Book_Sub_Book_LinkSchema(ma.SQLAlchemyAutoSchema): class Meta: model = Book_Sub_Book_Link class Book_Series_LinkSchema(ma.SQLAlchemyAutoSchema): class Meta: model = Book_Series_Link class Book_Author_LinkSchema(ma.SQLAlchemyAutoSchema): class Meta: model = Book_Author_Link author = ma.Nested(AuthorSchema ) # Note not ordering this in the code below as, I can't work out how to use # jinja2 to iterate over orderedDict - seems it doesnt support it? # so I just hacked a list of keys in book.html class BookSchema(ma.SQLAlchemyAutoSchema): author = ma.Nested(AuthorSchema, many=True) bals = ma.Nested(Book_Author_LinkSchema, many=True) genre = ma.Nested(GenreSchema, many=True) loan = ma.Nested(LoanSchema, many=True) series = ma.Nested(SeriesSchema, many=True) child_ref = ma.Nested(Book_Sub_Book_LinkSchema, many=True) class Meta: model = Book class BookForm(FlaskForm): id = HiddenField() title = StringField('Title:', [validators.DataRequired()]) # author built by hand # genre built by hand publisher = SelectField( 'publisher' ) owned = SelectField( 'owned' ) covertype = SelectField( 'covertype' ) condition = SelectField( 'condition' ) year_published = IntegerField('Year Published:', validators=[validators.Optional(), validators.NumberRange(min=1850, max=2100)] ) rating = SelectField( 'rating' ) notes = TextAreaField('Notes:') blurb = TextAreaField('Blurb:') submit = SubmitField('Save' ) delete = SubmitField('Delete' ) add_sub = SubmitField('Add Sub-Book' ) rem_sub = SubmitField('Remove Sub-Book from Parent' ) def validate(self): # if on wish list, just accept year_published if int(self.owned.data) == ON_WISHLIST: return True else: # we own/sold this, so covertype and condition cant be N/A if int(self.covertype.data) == COVERTYPE_NOT_APPLICABLE: return False if int(self.condition.data) == CONDITION_NOT_APPLICABLE: return False # otherwise lets check that there is data and its in the right range if self.year_published.data and self.year_published.data>1850 and int(self.year_published.raw_data[0]) < 2100: return True else: return False ################################# helper functions ################################### def GetBookIdFromSeriesByBookNum( series_id, book_num ): tmp_book = Book_Series_Link.query.filter(Book_Series_Link.series_id==series_id,Book_Series_Link.book_num==book_num).all() if len(tmp_book): return tmp_book[0].book_id else: return None def GetBookNumBySeriesAndBookId( series_id, book_id ): tmp_book = Book_Series_Link.query.filter(Book_Series_Link.series_id==series_id,Book_Series_Link.book_id==book_id).all() return tmp_book[0].book_num def GetBookIdFromBookSubBookLinkByIdAndSubBookNum( book_id, sub_book_num ): tmp_bsbl = Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==book_id, Book_Sub_Book_Link.sub_book_num==sub_book_num ).all() return tmp_bsbl[0].sub_book_id # ignore ORM, the sub_book_num comes from the link, but does not have any attached data, so can't tell which book it connects to. # Just select sub_book data and hand add it to the books object, and use it in jinja2 to indent/order the books/sub books # This data is used to sort/indent subbooks def AddSubs(books): subs=db.session.execute( text( "select * from book_sub_book_link" ) ) for row in subs: index = next((i for i, item in enumerate(books) if item.id == row.sub_book_id), -1) if index == -1: continue books[index].parent_id = row.book_id books[index].sub_book_num = row.sub_book_num # HACK: Couldn't work out ORM to excluded sub_book self-ref, so using basic python # loop to remove sub_books from list def RemSubs(books): subs=db.session.execute( text( "select * from book_sub_book_link" ) ) for row in subs: for i, item in enumerate(books): if item.id == row.sub_book_id: books.remove(item) ####################################### GLOBALS ####################################### # allow jinja2 to call these python functions directly app.jinja_env.globals['GetCovertypeById'] = GetCovertypeById app.jinja_env.globals['GetOwnedById'] = GetOwnedById app.jinja_env.globals['GetConditionById'] = GetConditionById app.jinja_env.globals['GetPublisherById'] = GetPublisherById app.jinja_env.globals['GetRatingById'] = GetRatingById app.jinja_env.globals['SeriesBookNum'] = SeriesBookNum app.jinja_env.globals['ClearStatus'] = st.ClearStatus app.jinja_env.globals['ListOfSeriesWithMissingBooks'] = ListOfSeriesWithMissingBooks app.jinja_env.globals['current_user'] = current_user book_schema = BookSchema() books_schema = BookSchema(many=True) # Declare a User Loader for Flask-Login. # Returns the User if it exists in our 'database', otherwise returns None. @login_manager.user_loader def load_user(id): bdbu=BDBUser.query.filter(BDBUser.dn==id).first() return bdbu # Declare The User Saver for Flask-Ldap3-Login # This method is called whenever a LDAPLoginForm() successfully validates. # store the user details / session in the DB if it is not in there already @ldap_manager.save_user def save_user(dn, username, data, memberships): bdbu=BDBUser.query.filter(BDBUser.dn==dn).first() # if we already have a valid user/session, and say the web has restarted, just re-use it, dont make more users if bdbu: return bdbu bdbu=BDBUser(dn=dn) db.session.add(bdbu) db.session.commit() return bdbu # POST is when user submits pwd & uses flask-login to hit ldap, validate pwd # if valid, then we save user/session into the DB via login_user() -> calls save_user() @app.route('/login', methods=['GET', 'POST']) def login(): # Instantiate a LDAPLoginForm which has a validator to check if the user # exists in LDAP. form = LDAPLoginForm() form.submit.label.text="Login" # the re matches on any special LDAP chars, we dont want someone # ldap-injecting our username, so send them back to the login page instead if request.method == 'POST' and re.search( r'[()\\*&!]', request.form['username']): print( f"WARNING: Detected special LDAP chars in username: {request.form['username']}") return redirect(url_for('login')) if form.validate_on_submit(): # Successfully logged in, We can now access the saved user object via form.user. login_user(form.user, remember=True) # Tell flask-login to log them in. next = request.args.get("next") if next: return redirect(next) # Send them back where they came from else: return redirect( url_for('main_page') ) return render_template("login.html", form=form) @app.route('/logout') @login_required def logout(): logout_user() return redirect('/login') ####################################### ROUTES ####################################### @app.route("/search", methods=["POST"]) @login_required def search(): if 'InDBox' in request.form: # removes already loaned books from list of books to loan out books = Book.query.outerjoin(Book_Loan_Link).filter(Book.title.ilike("%{}%".format(request.form['term'])),Book_Loan_Link.loan_id==None).all() InDBox=1 else: books = Book.query.filter(Book.title.ilike("%{}%".format(request.form['term']))).all() InDBox=0 AddSubs(books) return render_template("books.html", books=books, InDBox=InDBox) @app.route("/books", methods=["GET"]) @login_required def books(): books = Book.query.order_by(Book.title).all() # okay remove subs for now (so only real books are in list) RemSubs(books) # ordered books will be each real book, but if it has any subs, add them back in (but now in the right order/spot) ordered_books=[] for b in books: ordered_books.append(b) subs=db.session.execute( text( f"select * from book_sub_book_link where book_id = {b.id}" ) ) for row in subs: s=Book.query.get(row.sub_book_id) ordered_books.append(s) # now get the parent linkages back in so it indents in the front-end html AddSubs(ordered_books) return render_template("books.html", books=ordered_books ) @app.route("/books_for_loan/", methods=["GET", "POST"]) @login_required def books_for_loan(id): books = Book.query.join(Book_Loan_Link).filter(Book_Loan_Link.loan_id==id).order_by(Book.id).all() if request.method == 'POST': InDBox=1 else: InDBox=0 return render_template("books_for_loan.html", books=books, loan_id=id, InDBox=InDBox) def CalcMoveForBookInSeries( id, bid, dir ): book1 = Book.query.get(bid) # if parent book, then up needs first sub book, if down then need last sub book (as parent book does not have a book_num in a series, its sub books do) if book1.IsParent(): if dir == "up": tmp_bid = GetBookIdFromBookSubBookLinkByIdAndSubBookNum( book1.id, book1.FirstSubBookNum() ) else: tmp_bid = GetBookIdFromBookSubBookLinkByIdAndSubBookNum( book1.id, book1.LastSubBookNum() ) moving_series_book_num = GetBookNumBySeriesAndBookId( id, tmp_bid ) else: moving_series_book_num = GetBookNumBySeriesAndBookId( id, book1.id ) # moving_series_book_num is the book_num of the book in the series we are really moving (adjusted for parent/sub book if needed) if dir == "up": swapping_with_book_num = moving_series_book_num-1 else: swapping_with_book_num = moving_series_book_num+1 # swapping_with_book_num is the book_num of the book in the series we are going to swap with (notionally next/prev book in series) tmp_bid = GetBookIdFromSeriesByBookNum( id, swapping_with_book_num ) # okay, if tmp_bid is None, then there is no book we are swapping # with (rare case of moving say book 7 of a series of 10, and we # don't have details on book 6 or 8... if tmp_bid == None: if dir == "up": book1.MoveBookInSeries( id, -1 ) else: book1.MoveBookInSeries( id, 1 ) else: book2 = Book.query.get( tmp_bid ) # if book2 is a sub book, then we need its parent as book2 instead, as we have to move the whole book as one if book2.IsChild(): book2 = Book.query.get( book2.parent.id ) # By here: book1 is parent or normal book we are swapping with book2 which is parent or normal book b1_numsb = book1.NumSubBooks() b2_numsb = book2.NumSubBooks() if dir == "up": b1_move_by=-b2_numsb b2_move_by=b1_numsb else: b1_move_by=b2_numsb b2_move_by=-b1_numsb # By here: b{1,2}_move_by is the sign & amount we are moving the relevant book (and its sub-books by). If it is a normal book, # then its by +/-1, but if it is a parent book, then its +/- the num of sub_books as they all have to move too # MoveBookInSeries -> moves the book and its sub_books book1.MoveBookInSeries( id, b1_move_by ) book2.MoveBookInSeries( id, b2_move_by ) @app.route("/books_for_series/", methods=["GET", "POST"]) @login_required def books_for_series(id): if request.method == 'POST': if 'move_button' in request.form: # split form's pressed move_button to yield: dir ("up" or "down") and bid = book_id of book dir, bid = request.form['move_button'].split('-') book1 = Book.query.get(bid) for bsl in book1.bsl: CalcMoveForBookInSeries( bsl.series_id, bid, dir ) books = Book.query.join(Book_Series_Link).filter(Book_Series_Link.series_id==id).all() series = Series.query.get(id) return render_template("books_for_series.html", books=books, series=series) @app.route("/subbooks_for_book/", methods=["GET", "POST"]) @login_required def subbooks_for_book(id): if request.method == 'POST': if 'move_button' in request.form: # split form's pressed move_button to yield: dir ("up" or "down") and bid1 = book_id of subbook dir, bid1 = request.form['move_button'].split('-') bsbl1 = Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==id, Book_Sub_Book_Link.sub_book_id==bid1 ).all() if dir == "up": swap_with_num = bsbl1[0].sub_book_num-1 else: swap_with_num = bsbl1[0].sub_book_num+1 bid2=GetBookIdFromBookSubBookLinkByIdAndSubBookNum( id, swap_with_num ) bsbl2 = Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==id, Book_Sub_Book_Link.sub_book_id==bid2 ).all() # swap the 2 books (by switching sub_book_nums) tmp=bsbl1[0].sub_book_num bsbl1[0].sub_book_num = bsbl2[0].sub_book_num bsbl2[0].sub_book_num = tmp db.session.commit() #################################### # force sub books for jinja2 to be able to use (ORM is not giving all the details #################################### with db.engine.connect() as conn: subs = conn.exec_driver_sql( f"select bsb.book_id, bsb.sub_book_id, bsb.sub_book_num, book.title, \ r.name as rating, book.year_published, book.notes, \ author.surname||', '||author.firstnames as author \ from book_sub_book_link bsb, book, book_author_link bal, author, rating r\ where bsb.book_id = {id} and book.id = bsb.sub_book_id and book.id = bal.book_id and \ bal.author_id = author.id and r.id = book.rating \ order by bsb.sub_book_num, bal.author_num" ) sub_book=[] added=[] for row in subs: if row.sub_book_num not in added: sub_book.append( { 'book_id': row.book_id, 'sub_book_id': row.sub_book_id, \ 'sub_book_num': row.sub_book_num, 'title' : row.title, 'rating': row.rating,\ 'year_published' : row.year_published, 'notes' : row.notes, 'author' : row.author } ) else: # okay, same sub_book_num, so its a second author for this sub_book add name to author for s in sub_book: if s['sub_book_num'] == row.sub_book_num: s['author'] += ", " + row.author added.append(row.sub_book_num) return render_template("subbooks_for_book.html", sub_books=sub_book ) ################################################################################ # /remove_subbook -> POST -> removes this subbook from parent, takes you back to # /books (or /book/ -- if you added a sub-book of parent_id ################################################################################ @app.route("/remove_subbook", methods=["POST"]) @login_required def remove_sub_book(): parent_book_id=request.form['rem_sub_parent_id'] sub_book_id=request.form['rem_sub_sub_book_id'] sub_book=Book.query.get( sub_book_id ) remember=Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==parent_book_id, Book_Sub_Book_Link.sub_book_id==sub_book_id ).one() orig_sub_book_num=remember.sub_book_num try: # delete book-subbook link for this subbook db.session.delete(remember) # need to reorder old sub_book_nums to remove the new gap we created bsbls=Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==parent_book_id, Book_Sub_Book_Link.sub_book_num>=orig_sub_book_num ).all() for sb in bsbls: sb.sub_book_num=sb.sub_book_num-1 # now remove subbook itself ClearAuthorsForBook(sub_book.id) db.session.delete(sub_book) db.session.commit() return redirect( '/book/{}'.format(request.form['rem_sub_parent_id']) ) except SQLAlchemyError as e: st.SetAlert( "danger" ) st.SetMessage( e.orig ) return redirect( '/book/{}'.format(request.form['rem_sub_sub_book_id']) ) ################################################################################ # Go through request.form, find author's and remove any that are duplicates as # that is not allowed / DB integrity issue. This just sets a message that # will allow the book to be create/saved still, but the duplicate author will # be removed ################################################################################ def RemoveDuplicateAuthorInForm( request ): processed=[] ret=[] cnt=1 message="" for el in request.form: if 'author-' in el: if not request.form[el] in processed: ret.append( Book_Author_Link( author_id=request.form[el], author_num=cnt ) ) processed.append( request.form[el] ) cnt+=1 else: message="Removed duplicate Author!!!" return ret, message ################################################################################ # Go through request.form, find series and remove any that are duplicates as # that is not allowed / DB integrity issue. This just sets a message that # will allow the book to be create/saved still, but the duplicate series will # be removed def RemoveDuplicateSeriesInForm( book, request ): processed=[] message="" cnt=1 while f'bsl-book_id-{cnt}' in request.form: if request.form[ f"bsl-series_id-{cnt}"] not in processed: if book.IsParent(): newbsl=Book_Series_Link( book_id=book.id, series_id=request.form[f'bsl-series_id-{cnt}'] ) else: newbsl=Book_Series_Link( book_id=book.id, series_id=request.form[f'bsl-series_id-{cnt}'], book_num=request.form[f'bsl-book_num-{cnt}'] ) # add the contains (null for book_num) bsl for the parent book if book.IsChild(): parent_bsl=Book_Series_Link( book_id=book.parent.id, series_id=request.form[f'bsl-series_id-{cnt}'] ) db.session.add(parent_bsl) db.session.add(newbsl) processed.append( request.form[ f"bsl-series_id-{cnt}"] ) else: message="Removed duplicate Series!!!" cnt += 1 db.session.commit() return message ################################################################################ # /book -> GET/POST -> creates a new book and when created, takes you back to # /books (or /book/ -- if you added a sub-book of parent_id ################################################################################ @app.route("/book", methods=["GET", "POST"]) @login_required def new_book(): form = BookForm(request.form) form.publisher.choices = [(c.id, c.name) for c in Publisher.query.order_by('name')] form.owned.choices=[(c.id, c.name) for c in Owned.query.order_by('id')] form.covertype.choices=[(c.id, c.name) for c in Covertype.query.order_by('id')] form.condition.choices=[(c.id, c.name) for c in Condition.query.order_by('id')] form.rating.choices=[(c.id, c.name) for c in Rating.query.order_by('id')] page_title='Create new Book' author_list = GetAuthors() genre_list = GetGenres() book_genres = [] bals=[] message="" if request.method == 'POST': bals, message = RemoveDuplicateAuthorInForm( request ) # handle genre info for new book for genre in genre_list: if "genre-{}".format(genre.id) in request.form: book_genres.append( genre ) print( request.form ) # handle creating a new sub-book of an existing book (add_sub_parent_id) - html / with form data for the new book... if 'add_sub' in request.form: print( "here" ) print( request.form['add_sub_parent_id'] ) book=Book.query.get(request.form['add_sub_parent_id']) print( book ) bb=QuickParentBook() print( bb ) bb.parent={ 'id': book.id, 'title': book.title } form.publisher.default = book.publisher form.owned.default = book.owned form.condition.default = book.condition form.covertype.default = book.covertype form.blurb.default = book.blurb form.process() return render_template("book.html", page_title='Create new (sub) Book', b=bb, books=None, book_form=form, author_list=author_list, genre_list=genre_list, alert="", message="", poss_series_list=ListOfSeriesWithMissingBooks() ) elif form.validate() and len(book_genres): if request.form['year_published'].isnumeric(): book = Book( title=request.form['title'], owned=request.form['owned'], covertype=request.form['covertype'], condition=request.form['condition'], publisher=request.form['publisher'], year_published=request.form['year_published'], rating=request.form['rating'], notes=request.form['notes'], blurb=request.form['blurb'], genre=book_genres, bals=bals ) else: book = Book( title=request.form['title'], owned=request.form['owned'], covertype=request.form['covertype'], condition=request.form['condition'], publisher=request.form['publisher'], rating=request.form['rating'], notes=request.form['notes'], blurb=request.form['blurb'], genre=book_genres, bals=bals ) db.session.add(book) db.session.commit() for tmp_bal in bals: tmp_bal.book_id=book.id books.bals=bals db.session.commit() # this is a sub-book we have added (after the data has been entered, now we commit it to DB) if 'parent_id' in request.form: parent=Book.query.get(request.form['parent_id']) max_bsbl = Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==parent.id).order_by(desc(Book_Sub_Book_Link.sub_book_num)).first() if max_bsbl == None: max_sbn=0 else: max_sbn=max_bsbl.sub_book_num new_bsbl = Book_Sub_Book_Link( book_id=parent.id, sub_book_id=book.id, sub_book_num=max_sbn+1 ) db.session.add(new_bsbl) if len(parent.series) > 0: # we have added a sub-book to something in a series already, so add a bsl for the next book_num for s in parent.bsl: # decided to do this as I could not see how to use Book_Series_Link.query with func.max, and I wanted to use ORM max_bsl = Book_Series_Link.query.filter(Book_Series_Link.series_id==s.series_id,Book_Series_Link.book_num!=None).order_by(desc(Book_Series_Link.book_num)).first() max_bn=max_bsl.book_num if max_bn == None: max_bn=0 newbsl=Book_Series_Link( book_id=book.id, series_id=s.series_id, book_num=max_bn+1 ) db.session.add(newbsl) db.session.commit() message=RemoveDuplicateSeriesInForm( book, request ) if message == "": st.SetMessage( "Created new Book ({})".format(book.title) ) else: st.SetAlert("warning") st.SetMessage( f"Created new Book ({book.title})
BUT {message}" ) return redirect( '/book/{}'.format(book.id) ) else: alert="danger" message="Failed to create Book." for field in form.errors: message = f"{message}
{field}={form.errors}" if len(book_genres) == 0: message = f"{message}
book has to have a genre selected" if int(request.form['owned']) != int(ON_WISHLIST) and not request.form['year_published'].isnumeric(): message = f"{message}
book is not on wish list, so needs a year_published between 1850 & 2100" print( "ERROR: Failed to create book: {}".format(message) ) if request.form['year_published'].isnumeric(): book = Book( title=request.form["title"], owned=request.form['owned'], covertype=request.form['covertype'], condition=request.form['condition'], publisher=request.form['publisher'], year_published=request.form['year_published'], rating=request.form['rating'], notes=request.form['notes'], blurb=request.form['blurb'], genre=book_genres, bals=bals ) else: book = Book( title=request.form["title"], owned=request.form['owned'], covertype=request.form['covertype'], condition=request.form['condition'], publisher=request.form['publisher'], rating=request.form['rating'], notes=request.form['notes'], blurb=request.form['blurb'], genre=book_genres, bals=bals ) if 'parent_id' in request.form: bb=QuickParentBook() bb.parent= { 'id': request.form['parent_id'], 'title': request.form['parent_title'] } else: bb=None return render_template("book.html", page_title=page_title, b=bb, books=book, book_form=form, author_list=author_list, genre_list=genre_list, alert=alert, message=message, poss_series_list=ListOfSeriesWithMissingBooks() ) else: return render_template("book.html", page_title=page_title, b=None, books=None, book_form=form, author_list=author_list, genre_list=genre_list, alert="success", message="", poss_series_list=ListOfSeriesWithMissingBooks() ) def ClearAuthorsForBook(id): bals=Book_Author_Link.query.filter(Book_Author_Link.book_id == id).delete() db.session.commit() # helper function to reduce code size for /book// route - handles deleting book def DeleteBook(id): book = Book.query.get(id) if book.IsParent(): st.SetAlert( "danger" ) st.SetMessage( "This is a parent book, cannot delete it without deleting sub books first" ) return id else: st.SetAlert("success") st.SetMessage("Deleted {}".format(book.title) ) pid = 0 if book.IsChild(): pid = book.parent.id try: ClearAuthorsForBook(book.id) db.session.delete(book) db.session.commit() except SQLAlchemyError as e: st.SetAlert( "danger" ) st.SetMessage( e.orig ) return id except Exception as e: st.SetAlert( "danger" ) if "Dependency rule tried to blank-out primary key": st.SetMessage( f"Failed to delete book: The book has a link to an another table (ddp messed up DB integrity) -- { str(e) }" ) else: st.SetMessage( f"Failed to delete book:nbsp; {str(e)} ") return id if pid > 0: return pid else: return None # handle book view / update / delete route @app.route("/book/", methods=["GET", "POST"]) @login_required def book(id): book_form = BookForm(request.form) book_form.publisher.choices = [(c.id, c.name) for c in Publisher.query.order_by('name')] book_form.owned.choices=[(c.id, c.name) for c in Owned.query.order_by('id')] book_form.covertype.choices=[(c.id, c.name) for c in Covertype.query.order_by('id')] book_form.condition.choices=[(c.id, c.name) for c in Condition.query.order_by('id')] book_form.rating.choices=[(c.id, c.name) for c in Rating.query.order_by('id')] page_title='Edit Book' CheckSeriesChange=None message="" if request.method == 'POST': if 'delete' in request.form: redirect_to=DeleteBook(id) if redirect_to == None: # happens in error conditions only return redirect( '/' ) else: # could return to parent book, or current book depending on what was deleted return redirect( f"/book/{redirect_to}" ) # save/update of book elif book_form.validate(): book = Book.query.get(id) book.title = request.form['title'] book.owned = request.form['owned'] book.covertype = request.form['covertype'] book.condition = request.form['condition'] book.publisher = request.form['publisher'] if request.form['year_published'].isnumeric(): book.year_published = request.form['year_published'] book.rating = request.form['rating'] book.notes = request.form['notes'] book.blurb = request.form['blurb'] # set book genre (empty list, add any that are checked on - this allows us to remove unticked ones) book.genre = [] genre_list = GetGenres() for genre in genre_list: if "genre-{}".format(genre.id) in request.form: book.genre.append( genre ) # set book author (empty list) - cant use ORM, not sure why ClearAuthorsForBook( book.id ) book.bals, message = RemoveDuplicateAuthorInForm( request ) # go through form, if we have removed a series, then copy data out of form to be passed into html for a pop-up removing_series=[] for field in request.form: if 'removed-book_num' in field: cnt=int(re.findall( r'\d+', field )[0]) removing_series.append( { 'series_id' : request.form['removed-series_id-{}'.format(cnt)] } ) if book.IsParent(): # go through children, and force sync any changes of physical parts of parent book tmp_book=book_schema.dump(book) for ch_ref in tmp_book['child_ref']: ch_bid=GetBookIdFromBookSubBookLinkByIdAndSubBookNum( book.id, ch_ref['sub_book_num'] ) child_book=Book.query.get(ch_bid) child_book.publisher=book.publisher child_book.owned=book.owned child_book.covertype=book.covertype child_book.condition=book.condition child_book.blurb=book.blurb # okay, saving a parent book and we ARE removing a series, then # need to pop-up for user, to ask what they want to do with sub-books # and the series (likely remove them all too, but maybe just the parent?) if book.IsParent() and len(removing_series) > 0: CheckSeriesChange={'type':'parent', 'pid': book.id, 'bid': book.id, 'removing_series': removing_series } # saving a child / sub_book, consider series if book.IsChild() and len(removing_series) > 0: CheckSeriesChange={'type':'child', 'pid': book.parent.id, 'bid': book.id, 'removing_series': removing_series } else: # either we are a normal book (no parent/child), OR not removing a series, might be adding though, so easiest is to # delete all bsls and then add them back based on the request.form Book_Series_Link.query.filter(Book_Series_Link.book_id == book.id ).delete() if book.IsChild(): Book_Series_Link.query.filter(Book_Series_Link.book_id == book.parent.id ).delete() message=RemoveDuplicateSeriesInForm( book, request ) db.session.commit() # reset rating on this/these series as the book has changed (and maybe the rating has changed) for field in request.form: if 'bsl-book_id-' in field and field != 'bsl-book_id-NUM': cnt=int(re.findall( r'\d+', field )[0]) s=Series.query.get(request.form['bsl-series_id-{}'.format(cnt)]) s.calcd_rating = CalcAvgRating(s.id) cnt=cnt+1 db.session.commit() if message == "": st.SetMessage( f"Successfully Updated Book (id={id})" ) else: st.SetAlert("warning") st.SetMessage( f"Successfully Updated Book (id={id})
BUT {message}" ) else: st.SetAlert("danger") message=f"Failed to update Book (id={id}). " if not book_form.year_published.data: message += f" year_published cannot be empty"; if int(book_form.condition.data) == CONDITION_NOT_APPLICABLE: message += f" condition cannot be N/A"; if int(book_form.covertype.data) == COVERTYPE_NOT_APPLICABLE: message += f" covertype cannot be N/A"; book = Book.query.get(id) st.SetMessage(message) else: print( f"getting book id: {id}" ) book = Book.query.get(id) print( book ) if book == None: st.SetAlert("danger") st.SetMessage("Cannot find Book (id={})".format(id)) return render_template("base.html", alert=st.GetAlert(), message=st.GetMessage()) book_form=BookForm(obj=book) book_form.publisher.choices = [(c.id, c.name) for c in Publisher.query.order_by('name')] book_form.owned.choices=[(c.id, c.name) for c in Owned.query.order_by('id')] book_form.covertype.choices=[(c.id, c.name) for c in Covertype.query.order_by('id')] book_form.condition.choices=[(c.id, c.name) for c in Condition.query.order_by('id')] book_form.rating.choices=[(c.id, c.name) for c in Rating.query.order_by('id')] author_list = GetAuthors() genre_list = GetGenres() book_s = book_schema.dump(book) return render_template("book.html", b=book, books=book_s, book_form=book_form, author_list=author_list, genre_list=genre_list, page_title=page_title, alert=st.GetAlert(), message=st.GetMessage(), poss_series_list=ListOfSeriesWithMissingBooks(), CheckSeriesChange=CheckSeriesChange) def GetCount( what, where ): st="select count(id) as count from book where " # with db.engine.connect() as conn: # res = conn.exec_driver_sql( st+where ) res = db.session.execute( text( st+where ) ) rtn={} for row in res: rtn['stat']=what rtn['value']=row.count return rtn @app.route("/stats", methods=["GET"]) @login_required def stats(): stats=[] stats.append( GetCount( "Num physical Books in DB", "id not in ( select sub_book_id from book_sub_book_link )" ) ) stats.append( GetCount( "Num physical Books owned (aka books on shelf)", "owned=(select id from owned where name = 'Currently Owned') and id not in ( select sub_book_id from book_sub_book_link )" ) ) stats.append( GetCount( "Num physical Books on Wish List", "owned=(select id from owned where name='On Wish List') and id not in ( select sub_book_id from book_sub_book_link )" ) ) stats.append( GetCount( "Num physical Books sold", "owned=(select id from owned where name='Sold') and id not in ( select sub_book_id from book_sub_book_link )" ) ) stats.append( GetCount( "Num all Books in DB", "id>0" ) ) stats.append( GetCount( "Num all Books owned", "owned in (select id from owned where name='Currently Owned')" ) ) stats.append( GetCount( "Num all Books on wish list", "owned in (select id from owned where name='On Wish List')" ) ) stats.append( GetCount( "Num all Books sold", "owned=(select id from owned where name='Sold')" ) ) stats.append( GetCount( "Num all owned Books unrated", "rating in (select id from rating where name in ('N/A', 'Undefined')) and owned = (select id from owned where name='Currently Owned')" ) ) stats.append( GetCount( "Num all Books unrated", "rating in (select id from rating where name in ('N/A', 'Undefined'))" ) ) return render_template("stats.html", stats=stats ) @app.route("/rem_books_from_loan/", methods=["POST"]) @login_required def rem_books_from_loan(id): for field in request.form: rem_id=int(re.findall( r'\d+', field )[0]) bll = Book_Loan_Link.query.filter(Book_Loan_Link.loan_id==id, Book_Loan_Link.book_id == rem_id ).one() db.session.delete(bll) db.session.commit() return jsonify(success=True) @app.route("/add_books_to_loan/", methods=["POST"]) @login_required def add_books_to_loan(id): for field in request.form: add_id=int(re.findall( r'\d+', field )[0]) bll = Book_Loan_Link( loan_id=id, book_id=add_id ) db.session.add(bll) db.session.commit() return jsonify(success=True) @app.route("/rem_parent_books_from_series/", methods=["POST"]) @login_required def rem_parent_books_from_series(pid): parent=Book.query.get(pid) for sb in parent.child_ref: Book_Series_Link.query.filter(Book_Series_Link.book_id==sb.sub_book_id).delete() Book_Series_Link.query.filter(Book_Series_Link.book_id==pid).delete() db.session.commit() return jsonify(success=True) def OrderBooks(books): # because a book can be in multiple series, without any ordering we need to find # any book that is in a series, then go through each series to find the one with # the most books in it. THEN we need to get all those books and put them in the # ordered_books array, and remove them from the books array ordered_books=[] processed=[] currently_owned = Owned.query.filter(Owned.name=='Currently Owned').one() for b in books: if b.series: # find biggest Series max_num_books=0 for s in b.series: if max_num_books < s.num_books: max_num_books = s.num_books max_series_id= s.id #order all books (sub-books too here!) in this series with the most books bsl=Book_Series_Link.query.filter( Book_Series_Link.series_id==max_series_id ).order_by(Book_Series_Link.book_num).all() for tmp in bsl: tmp_b = Book.query.get(tmp.book_id) # skip any books that are not owned sneaking back in if tmp_b.owned != currently_owned.id: continue if tmp_b.IsChild(): # this child book wont be in books, but its parent will -> use it instead # mark parent from books so we dont process it twice if tmp_b.parent.id not in processed: processed.append(tmp_b.id) processed.append(tmp_b.parent.id) ordered_books.append(tmp_b.parent) else: if tmp_b.id not in processed: ordered_books.append(tmp_b) processed.append(tmp_b.id) else: # book not in a series and we removed sub-books, so just add this book to the ordered list ordered_books.append(b) return ordered_books def RemLoans(books): """ Remove any books on loan from list - used for books on shelf view """ from main import Book_Loan_Link loaned_books=Book_Loan_Link.query.all() for b in loaned_books: for i, item in enumerate(books): if item.id == b.book_id: books.remove(item) @app.route("/books_on_shelf", methods=["GET"]) @login_required def books_on_shelf(): # start with all books owned sorted by author, then title, but it includes sub-books, so remove them... books = Book.query.join(Owned).join(Book_Author_Link).join(Author).filter(Owned.name=='Currently Owned').filter(Book_Author_Link.author_num==1).order_by(Author.surname,Author.firstnames,Book.title).all() RemLoans(books) RemSubs(books) ordered_books=OrderBooks(books) return render_template("books.html", books=ordered_books, page_title="Books on Shelf", order_by="", show_cols='', hide_cols='' ) @app.route("/unrated_books", methods=["GET"]) @login_required def unrated_books(): books = Book.query.join(Rating).join(Owned).filter(Rating.name=='Undefined',Owned.name=='Currently Owned').all() return render_template("books.html", books=books, page_title="Books with no rating", show_cols='Rating', hide_cols='' ) def FindMissingBooks(): tmp=db.session.execute(text( "select s.*, count(bsl.book_num) from book_series_link bsl, series s where bsl.book_num is not null and s.id = bsl.series_id group by s.id order by s.title") ) books=[] sold=Owned.query.filter(Owned.name=='Sold').all() for t in tmp: if t.num_books != t.count: num_sold=0 bsl=Book_Series_Link.query.filter( Book_Series_Link.series_id==t.id ).order_by(Book_Series_Link.book_num).all() missing=[] for cnt in range(1,t.num_books+1): missing.append( cnt ) # check to see if the only books in this series are SOLD, if so, there # are no missing books here, I clearly dont want more of this series for b in bsl: if b.book_num == None: continue missing.remove( b.book_num ) tmp_book=Book.query.get(b.book_id) if tmp_book.owned == sold[0].id: num_sold = num_sold + 1 if num_sold != t.count: # turn missing from array into string, and strip 0 and last char (the square brackets) books.append( { 'id': t.id, 'title': t.title, 'num_books': t.num_books, 'missing': str(missing)[1:-1] } ) return books @app.route("/lost", methods=["GET"]) @login_required def lost(): books = Book.query.join(Owned).filter(Owned.name=='Lost').all() return render_template("books.html", books=books, page_title="Lost Books", show_cols='', hide_cols='Publisher' ) @app.route("/missing_books", methods=["GET"]) @login_required def missing_books(): books=FindMissingBooks() return render_template("missing.html", books=books ) @app.route("/wishlist", methods=["GET"]) @login_required def wishlist(): books = Book.query.join(Owned).filter(Owned.name=='On Wish List').all() return render_template("books.html", books=books, page_title="Books On Wish List", show_cols='', hide_cols='Publisher,Condition,Covertype' ) @app.route("/books_to_buy", methods=["GET"]) @login_required def books_to_buy(): books = Book.query.join(Owned).filter(Owned.name=='On Wish List').all() missing = FindMissingBooks() return render_template("to_buy.html", books=books, missing=missing, page_title="Books To Buy") @app.route("/needs_replacing", methods=["GET"]) @login_required def needs_replacing(): books = Book.query.join(Condition).join(Owned).filter(Condition.name=='Needs Replacing',Owned.name=='Currently Owned').all() return render_template("books.html", books=books, page_title="Books that Need Replacing", show_cols='', hide_cols='' ) @app.route("/sold", methods=["GET"]) @login_required def sold_books(): books = Book.query.join(Owned).filter(Owned.name=='Sold').all() return render_template("books.html", books=books, page_title="Books that were Sold", show_cols='', hide_cols='' ) @app.route("/poor_rating", methods=["GET"]) @login_required def poor_rating_books(): books = Book.query.join(Rating).join(Owned).filter(Rating.id>6,Rating.name!='Undefined',Owned.name=='Currently Owned').all() return render_template("books.html", books=books, page_title="Books that have a Poor Rating (<5 out of 10)", show_cols='Rating', hide_cols='' ) # default page, just the navbar @app.route("/", methods=["GET"]) @login_required def main_page(): # Redirect users who are not logged in. if not current_user or current_user.is_anonymous: return redirect(url_for('login')) return render_template("base.html", alert=st.GetAlert(), message=st.GetMessage()) if __name__ == "__main__": if os.environ['FLASK_ENV'] == "production": app.run(ssl_context=('/etc/letsencrypt/live/book.depaoli.id.au/cert.pem', '/etc/letsencrypt/live/book.depaoli.id.au/privkey.pem'), host="0.0.0.0", debug=False) else: app.run(host="0.0.0.0", debug=True) ############################################################################### # This func creates a new filter in jinja2 to test to hand back the username # from the ldap dn ################################################################################ @app.template_filter('Username') def _jinja2_filter_parentpath(dn): # pull apart a dn (uid=xxx,dn=yyy,etc), and return the xxx username=str(dn) s=username.index('=') s+=1 f=username.index(',') return username[s:f]