Files
books/main.py

1075 lines
52 KiB
Python

from flask import Flask, render_template, request, redirect, jsonify, url_for
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError
from flask_marshmallow import Marshmallow
from flask_bootstrap import Bootstrap
from wtforms import SubmitField, StringField, HiddenField, SelectField, IntegerField, TextAreaField, validators, ValidationError
from flask_wtf import FlaskForm
#from flask_compress import Compress
from status import st, Status
# for ldap auth
from flask_ldap3_login import LDAP3LoginManager
from flask_login import LoginManager, login_user, login_required, UserMixin, current_user, logout_user
from flask_ldap3_login.forms import LDAPLoginForm
import re
import os
import contextlib
####################################### Flask App globals #######################################
app = Flask(__name__)
# local DB conn string
if os.environ['FLASK_ENV'] == "production":
DB_URL = 'postgresql+psycopg2://ddp:blahdeblah@bookdb:5432/library'
elif os.environ['FLASK_ENV'] == "container":
DB_URL = 'postgresql+psycopg2://ddp:blahdeblah@bookdb_dev:5432/library'
else:
DB_URL = 'postgresql+psycopg2://ddp:blahdeblah@127.0.0.1:55432/library'
app.config['SQLALCHEMY_DATABASE_URI'] = DB_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config.from_mapping( SECRET_KEY=b'\xd6\x04\xbdj\xfe\xed$c\x1e@\xad\x0f\x13,@G')
# ldap config vars: (the last one is required, or python ldap freaks out)
app.config['LDAP_HOST'] = 'mara.ddp.net'
app.config['LDAP_BASE_DN'] = 'dc=depaoli,dc=id,dc=au'
app.config['LDAP_USER_DN'] = 'ou=users'
app.config['LDAP_GROUP_DN'] = 'ou=groups'
app.config['LDAP_USER_RDN_ATTR'] = 'cn'
app.config['LDAP_USER_LOGIN_ATTR'] = 'uid'
app.config['LDAP_BIND_USER_DN'] = None
app.config['LDAP_BIND_USER_PASSWORD'] = None
app.config['LDAP_GROUP_OBJECT_FILTER'] = '(objectclass=posixGroup)'
db = SQLAlchemy(app)
ma = Marshmallow(app)
Bootstrap(app)
# setup ldap for auth
login_manager = LoginManager(app) # Setup a Flask-Login Manager
ldap_manager = LDAP3LoginManager(app) # Setup a LDAP3 Login Manager.
login_manager.login_view = "login" # default login route, failed with url_for, so hard-coded
# enable compression for http / speed
#Compress(app)
################################# Now, import non-book classes ###################################
from author import Author, AuthorForm, AuthorSchema, GetAuthors
from publisher import Publisher, PublisherForm, PublisherSchema, GetPublisherById
from genre import Genre, GenreForm, GenreSchema, GetGenres
from condition import Condition, ConditionForm, ConditionSchema, GetConditionById
from covertype import Covertype, CovertypeForm, CovertypeSchema, GetCovertypeById
from owned import Owned, OwnedForm, OwnedSchema, GetOwnedById
from rating import Rating, RatingForm, RatingSchema, GetRatingById
from loan import Loan, LoanForm, LoanSchema
from series import Series, SeriesForm, SeriesSchema, ListOfSeriesWithMissingBooks, CalcAvgRating
from user import BDBUser
# hacky constants (FIX THIS)
ON_WISHLIST=2
COVERTYPE_NOT_APPLICABLE=4
CONDITION_NOT_APPLICABLE=4
####################################### CLASSES / DB model #######################################
class QuickParentBook:
parent=[]
def __repr__(self):
return "<parent: {}, publisher: {}, owned: {}, covertype: {}, condition: {}, blurb: {}>".format(self.parent, self.publisher, self.owned, self.covertype, self.condition, self.blurb )
book_genre_link = db.Table('book_genre_link', db.Model.metadata,
db.Column('book_id', db.Integer, db.ForeignKey('book.id')),
db.Column('genre_id', db.Integer, db.ForeignKey('genre.id'))
)
class Book_Author_Link(db.Model):
__tablename__ = "book_author_link"
book_id = db.Column( db.Integer, db.ForeignKey('book.id'), primary_key=True)
author_id = db.Column( db.Integer, db.ForeignKey('author.id'), primary_key=True)
author_num = db.Column( db.Integer, primary_key=True)
author = db.relationship('Author' )
def __repr__(self):
return f"<book_author_link: id: {self.book_id}, author_id: {self.author_id}, author_num: {self.author_num}>"
class Book_Loan_Link(db.Model):
__tablename__ = "book_loan_link"
book_id = db.Column(db.Integer, db.ForeignKey('book.id'), unique=True, nullable=False, primary_key=True)
loan_id = db.Column(db.Integer, db.ForeignKey('loan.id'), unique=True, nullable=False, primary_key=True)
def __repr__(self):
return "<book_id: {}, loan_id: {}>".format(self.book_id, self.loan_id)
class Book_Series_Link(db.Model):
__tablename__ = "book_series_link"
book_id = db.Column(db.Integer, db.ForeignKey('book.id'), unique=True, nullable=False, primary_key=True)
series_id = db.Column(db.Integer, db.ForeignKey('series.id'), unique=True, nullable=False, primary_key=True)
book_num = db.Column(db.Integer)
def __repr__(self):
return "<book_id: {}, series_id: {}, book_num: {}>".format(self.book_id, self.series_id, self.book_num)
def SeriesBookNum(series_id, book_id):
bsl=Book_Series_Link.query.filter( Book_Series_Link.book_id==book_id, Book_Series_Link.series_id==series_id ).first()
if bsl:
return bsl.book_num
else:
print( f"WARNING: tried to find book number in this series for: book_id={book_id}, series_id={series_id} but no db data for it" )
return 0
class Book_Sub_Book_Link(db.Model):
__tablename__ = "book_sub_book_link"
book_id = db.Column(db.Integer, db.ForeignKey('book.id'), unique=True, nullable=False, primary_key=True)
sub_book_id = db.Column(db.Integer, db.ForeignKey('book.id'), unique=True, nullable=False, primary_key=True)
sub_book_num = db.Column(db.Integer)
def __repr__(self):
return "<book_id: {}, sub_book_id: {}, sub_book_num: {}>".format(self.book_id, self.sub_book_id, self.sub_book_num)
class Book(db.Model):
id = db.Column(db.Integer, db.Sequence('book_id_seq'), primary_key=True )
title = db.Column(db.String(100), unique=True, nullable=False)
publisher = db.Column(db.Integer, db.ForeignKey('publisher.id'))
genre = db.relationship('Genre', secondary=book_genre_link )
loan = db.relationship('Loan', secondary=Book_Loan_Link.__table__);
series = db.relationship('Series', secondary=Book_Series_Link.__table__);
bsl = db.relationship('Book_Series_Link', overlaps="series" )
year_published = db.Column(db.Integer)
condition = db.Column(db.Integer, db.ForeignKey('condition.id'))
covertype = db.Column(db.Integer, db.ForeignKey('covertype.id'))
owned = db.Column(db.Integer, db.ForeignKey('owned.id'))
rating = db.Column(db.Integer, db.ForeignKey('rating.id'))
notes = db.Column(db.Text)
blurb = db.Column(db.Text)
created = db.Column(db.DateTime)
modified = db.Column(db.DateTime)
# take actual parent book as there is no real associated sub_book_num data and can just use it
parent = db.relationship('Book', secondary=Book_Sub_Book_Link.__table__, primaryjoin="Book.id==Book_Sub_Book_Link.sub_book_id", secondaryjoin="Book.id==Book_Sub_Book_Link.book_id" )
# but use child_ref as sub_book_num is per book, and I can't connect an empty array of sub_book_nums to a child book array in "child"
child_ref = db.relationship('Book_Sub_Book_Link', secondary=Book_Sub_Book_Link.__table__, primaryjoin="Book.id==Book_Sub_Book_Link.book_id", secondaryjoin="Book.id==Book_Sub_Book_Link.sub_book_id", order_by="Book_Sub_Book_Link.sub_book_num", overlaps="parent" )
# use this to manage author now that author_num is used to order them
bals = db.relationship('Book_Author_Link', order_by="Book_Author_Link.author_num" )
def IsParent(self):
if len(self.child_ref):
return True
else:
return False
def IsChild(self):
if len(self.parent):
return True
else:
return False
def FirstSubBookNum(self):
# need to work out the first sub book and return an id?
if self.IsParent():
return self.child_ref[0].sub_book_num
else:
return 1
def LastSubBookNum(self):
# need to work out the last sub book and return an id?
if self.IsParent():
# -1 subscript returns the last one
return self.child_ref[-1].sub_book_num
else:
return 1
def NumSubBooks(self):
if self.IsParent():
return len(self.child_ref)
else:
return 1
def MoveBookInSeries( self, series_id, amt ):
# if parent book, move all sub books instead
try:
if self.IsParent():
tmp_book=book_schema.dump(self)
for book in tmp_book['child_ref']:
tmp_bid=GetBookIdFromBookSubBookLinkByIdAndSubBookNum( self.id, book['sub_book_num'] )
bsl=Book_Series_Link.query.filter( Book_Series_Link.book_id==tmp_bid, Book_Series_Link.series_id==series_id ).all()
bsl[0].book_num=bsl[0].book_num+amt
else:
bsl=Book_Series_Link.query.filter( Book_Series_Link.book_id==self.id, Book_Series_Link.series_id==series_id ).all()
bsl[0].book_num=bsl[0].book_num+amt
db.session.commit()
return
except SQLAlchemyError as e:
st.SetAlert( "danger" )
st.SetMessage( e.orig )
return redirect( '/series/{}'.format(series_id) )
def __repr__(self):
return "<id: {}, title: {}, year_published: {}, rating: {}, condition: {}, owned: {}, covertype: {}, notes: {}, blurb: {}, created: {}, modified: {}, publisher: {}, genre: {}, parent: {}>".format(self.id, self.title, self.year_published, self.rating, self.condition, self.owned, self.covertype, self.notes, self.blurb, self.created, self.modified, self.publisher, self.genre, self.parent )
class Book_Sub_Book_LinkSchema(ma.SQLAlchemyAutoSchema):
class Meta: model = Book_Sub_Book_Link
class Book_Series_LinkSchema(ma.SQLAlchemyAutoSchema):
class Meta: model = Book_Series_Link
class Book_Author_LinkSchema(ma.SQLAlchemyAutoSchema):
class Meta: model = Book_Author_Link
author = ma.Nested(AuthorSchema )
# Note not ordering this in the code below as, I can't work out how to use
# jinja2 to iterate over orderedDict - seems it doesnt support it?
# so I just hacked a list of keys in book.html
class BookSchema(ma.SQLAlchemyAutoSchema):
author = ma.Nested(AuthorSchema, many=True)
bals = ma.Nested(Book_Author_LinkSchema, many=True)
genre = ma.Nested(GenreSchema, many=True)
loan = ma.Nested(LoanSchema, many=True)
series = ma.Nested(SeriesSchema, many=True)
child_ref = ma.Nested(Book_Sub_Book_LinkSchema, many=True)
class Meta: model = Book
class BookForm(FlaskForm):
id = HiddenField()
title = StringField('Title:', [validators.DataRequired()])
# author built by hand
# genre built by hand
publisher = SelectField( 'publisher' )
owned = SelectField( 'owned' )
covertype = SelectField( 'covertype' )
condition = SelectField( 'condition' )
year_published = IntegerField('Year Published:', validators=[validators.Optional(), validators.NumberRange(min=1850, max=2100)] )
rating = SelectField( 'rating' )
notes = TextAreaField('Notes:')
blurb = TextAreaField('Blurb:')
submit = SubmitField('Save' )
delete = SubmitField('Delete' )
add_sub = SubmitField('Add Sub-Book' )
rem_sub = SubmitField('Remove Sub-Book from Parent' )
def validate(self):
# if on wish list, just accept year_published
if int(self.owned.data) == ON_WISHLIST:
return True
else:
# we own/sold this, so covertype and condition cant be N/A
if int(self.covertype.data) == COVERTYPE_NOT_APPLICABLE:
return False
if int(self.condition.data) == CONDITION_NOT_APPLICABLE:
return False
# otherwise lets check that there is data and its in the right range
if self.year_published.data and self.year_published.data>1850 and int(self.year_published.raw_data[0]) < 2100:
return True
else:
return False
################################# helper functions ###################################
def GetBookIdFromSeriesByBookNum( series_id, book_num ):
tmp_book = Book_Series_Link.query.filter(Book_Series_Link.series_id==series_id,Book_Series_Link.book_num==book_num).all()
if len(tmp_book):
return tmp_book[0].book_id
else:
return None
def GetBookNumBySeriesAndBookId( series_id, book_id ):
tmp_book = Book_Series_Link.query.filter(Book_Series_Link.series_id==series_id,Book_Series_Link.book_id==book_id).all()
return tmp_book[0].book_num
def GetBookIdFromBookSubBookLinkByIdAndSubBookNum( book_id, sub_book_num ):
tmp_bsbl = Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==book_id, Book_Sub_Book_Link.sub_book_num==sub_book_num ).all()
return tmp_bsbl[0].sub_book_id
# ignore ORM, the sub_book_num comes from the link, but does not have any attached data, so can't tell which book it connects to.
# Just select sub_book data and hand add it to the books object, and use it in jinja2 to indent/order the books/sub books
# This data is used to sort/indent subbooks
def AddSubs(books):
with db.engine.connect() as conn:
subs = conn.exec_driver_sql( "select * from book_sub_book_link" )
for row in subs:
index = next((i for i, item in enumerate(books) if item.id == row.sub_book_id), -1)
if index == -1:
continue
books[index].parent_id = row.book_id
books[index].sub_book_num = row.sub_book_num
# HACK: Couldn't work out ORM to excluded sub_book self-ref, so using basic python
# loop to remove sub_books from list
def RemSubs(books):
with db.engine.connect() as conn:
subs = conn.exec_driver_sql( "select * from book_sub_book_link" )
for row in subs:
for i, item in enumerate(books):
if item.id == row.sub_book_id:
books.remove(item)
####################################### GLOBALS #######################################
# allow jinja2 to call these python functions directly
app.jinja_env.globals['GetCovertypeById'] = GetCovertypeById
app.jinja_env.globals['GetOwnedById'] = GetOwnedById
app.jinja_env.globals['GetConditionById'] = GetConditionById
app.jinja_env.globals['GetPublisherById'] = GetPublisherById
app.jinja_env.globals['GetRatingById'] = GetRatingById
app.jinja_env.globals['SeriesBookNum'] = SeriesBookNum
app.jinja_env.globals['ClearStatus'] = st.ClearStatus
app.jinja_env.globals['ListOfSeriesWithMissingBooks'] = ListOfSeriesWithMissingBooks
app.jinja_env.globals['current_user'] = current_user
book_schema = BookSchema()
books_schema = BookSchema(many=True)
# Declare a User Loader for Flask-Login.
# Returns the User if it exists in our 'database', otherwise returns None.
@login_manager.user_loader
def load_user(id):
bdbu=BDBUser.query.filter(BDBUser.dn==id).first()
return bdbu
# Declare The User Saver for Flask-Ldap3-Login
# This method is called whenever a LDAPLoginForm() successfully validates.
# store the user details / session in the DB if it is not in there already
@ldap_manager.save_user
def save_user(dn, username, data, memberships):
bdbu=BDBUser.query.filter(BDBUser.dn==dn).first()
# if we already have a valid user/session, and say the web has restarted, just re-use it, dont make more users
if bdbu:
return bdbu
bdbu=BDBUser(dn=dn)
db.session.add(bdbu)
db.session.commit()
return bdbu
# POST is when user submits pwd & uses flask-login to hit ldap, validate pwd
# if valid, then we save user/session into the DB via login_user() -> calls save_user()
@app.route('/login', methods=['GET', 'POST'])
def login():
# Instantiate a LDAPLoginForm which has a validator to check if the user
# exists in LDAP.
form = LDAPLoginForm()
form.submit.label.text="Login"
# the re matches on any special LDAP chars, we dont want someone
# ldap-injecting our username, so send them back to the login page instead
if request.method == 'POST' and re.search( r'[()\\*&!]', request.form['username']):
print( f"WARNING: Detected special LDAP chars in username: {request.form['username']}")
return redirect(url_for('login'))
if form.validate_on_submit():
# Successfully logged in, We can now access the saved user object via form.user.
login_user(form.user, remember=True) # Tell flask-login to log them in.
next = request.args.get("next")
if next:
return redirect(next) # Send them back where they came from
else:
return redirect( url_for('main_page') )
return render_template("login.html", form=form)
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect('/login')
####################################### ROUTES #######################################
@app.route("/search", methods=["POST"])
@login_required
def search():
if 'InDBox' in request.form:
# removes already loaned books from list of books to loan out
books = Book.query.outerjoin(Book_Loan_Link).filter(Book.title.ilike("%{}%".format(request.form['term'])),Book_Loan_Link.loan_id==None).all()
InDBox=1
else:
books = Book.query.filter(Book.title.ilike("%{}%".format(request.form['term']))).all()
InDBox=0
AddSubs(books)
return render_template("books.html", books=books, InDBox=InDBox)
@app.route("/books", methods=["GET"])
@login_required
def books():
books = Book.query.all()
AddSubs(books)
return render_template("books.html", books=books )
@app.route("/books_for_loan/<id>", methods=["GET", "POST"])
@login_required
def books_for_loan(id):
books = Book.query.join(Book_Loan_Link).filter(Book_Loan_Link.loan_id==id).order_by(Book.id).all()
if request.method == 'POST':
InDBox=1
else:
InDBox=0
return render_template("books_for_loan.html", books=books, loan_id=id, InDBox=InDBox)
def CalcMoveForBookInSeries( id, bid, dir ):
book1 = Book.query.get(bid)
# if parent book, then up needs first sub book, if down then need last sub book (as parent book does not have a book_num in a series, its sub books do)
if book1.IsParent():
if dir == "up":
tmp_bid = GetBookIdFromBookSubBookLinkByIdAndSubBookNum( book1.id, book1.FirstSubBookNum() )
else:
tmp_bid = GetBookIdFromBookSubBookLinkByIdAndSubBookNum( book1.id, book1.LastSubBookNum() )
moving_series_book_num = GetBookNumBySeriesAndBookId( id, tmp_bid )
else:
moving_series_book_num = GetBookNumBySeriesAndBookId( id, book1.id )
# moving_series_book_num is the book_num of the book in the series we are really moving (adjusted for parent/sub book if needed)
if dir == "up":
swapping_with_book_num = moving_series_book_num-1
else:
swapping_with_book_num = moving_series_book_num+1
# swapping_with_book_num is the book_num of the book in the series we are going to swap with (notionally next/prev book in series)
tmp_bid = GetBookIdFromSeriesByBookNum( id, swapping_with_book_num )
# okay, if tmp_bid is None, then there is no book we are swapping
# with (rare case of moving say book 7 of a series of 10, and we
# don't have details on book 6 or 8...
if tmp_bid == None:
if dir == "up":
book1.MoveBookInSeries( id, -1 )
else:
book1.MoveBookInSeries( id, 1 )
else:
book2 = Book.query.get( tmp_bid )
# if book2 is a sub book, then we need its parent as book2 instead, as we have to move the whole book as one
if book2.IsChild():
book2 = Book.query.get( book2.parent[0].id )
# By here: book1 is parent or normal book we are swapping with book2 which is parent or normal book
b1_numsb = book1.NumSubBooks()
b2_numsb = book2.NumSubBooks()
if dir == "up":
b1_move_by=-b2_numsb
b2_move_by=b1_numsb
else:
b1_move_by=b2_numsb
b2_move_by=-b1_numsb
# By here: b{1,2}_move_by is the sign & amount we are moving the relevant book (and its sub-books by). If it is a normal book,
# then its by +/-1, but if it is a parent book, then its +/- the num of sub_books as they all have to move too
# MoveBookInSeries -> moves the book and its sub_books
book1.MoveBookInSeries( id, b1_move_by )
book2.MoveBookInSeries( id, b2_move_by )
@app.route("/books_for_series/<id>", methods=["GET", "POST"])
@login_required
def books_for_series(id):
if request.method == 'POST':
if 'move_button' in request.form:
# split form's pressed move_button to yield: dir ("up" or "down") and bid = book_id of book
dir, bid = request.form['move_button'].split('-')
book1 = Book.query.get(bid)
for bsl in book1.bsl:
CalcMoveForBookInSeries( bsl.series_id, bid, dir )
books = Book.query.join(Book_Series_Link).filter(Book_Series_Link.series_id==id).all()
series = Series.query.get(id)
return render_template("books_for_series.html", books=books, series=series)
@app.route("/subbooks_for_book/<id>", methods=["GET", "POST"])
@login_required
def subbooks_for_book(id):
if request.method == 'POST':
if 'move_button' in request.form:
# split form's pressed move_button to yield: dir ("up" or "down") and bid1 = book_id of subbook
dir, bid1 = request.form['move_button'].split('-')
bsbl1 = Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==id, Book_Sub_Book_Link.sub_book_id==bid1 ).all()
if dir == "up":
swap_with_num = bsbl1[0].sub_book_num-1
else:
swap_with_num = bsbl1[0].sub_book_num+1
bid2=GetBookIdFromBookSubBookLinkByIdAndSubBookNum( id, swap_with_num )
bsbl2 = Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==id, Book_Sub_Book_Link.sub_book_id==bid2 ).all()
# swap the 2 books (by switching sub_book_nums)
tmp=bsbl1[0].sub_book_num
bsbl1[0].sub_book_num = bsbl2[0].sub_book_num
bsbl2[0].sub_book_num = tmp
db.session.commit()
####################################
# force sub books for jinja2 to be able to use (ORM is not giving all the details
####################################
with db.engine.connect() as conn:
subs = conn.exec_driver_sql( f"select bsb.book_id, bsb.sub_book_id, bsb.sub_book_num, book.title, \
r.name as rating, book.year_published, book.notes, \
author.surname||', '||author.firstnames as author \
from book_sub_book_link bsb, book, book_author_link bal, author, rating r\
where bsb.book_id = {id} and book.id = bsb.sub_book_id and book.id = bal.book_id and \
bal.author_id = author.id and r.id = book.rating \
order by bsb.sub_book_num, bal.author_num" )
sub_book=[]
added=[]
for row in subs:
if row.sub_book_num not in added:
sub_book.append( { 'book_id': row.book_id, 'sub_book_id': row.sub_book_id, \
'sub_book_num': row.sub_book_num, 'title' : row.title, 'rating': row.rating,\
'year_published' : row.year_published, 'notes' : row.notes, 'author' : row.author } )
else:
# okay, same sub_book_num, so its a second author for this sub_book add name to author
for s in sub_book:
if s['sub_book_num'] == row.sub_book_num:
s['author'] += ", " + row.author
added.append(row.sub_book_num)
return render_template("subbooks_for_book.html", sub_books=sub_book )
################################################################################
# /remove_subbook -> POST -> removes this subbook from parent, takes you back to
# /books (or /book/<parent_id> -- if you added a sub-book of parent_id
################################################################################
@app.route("/remove_subbook", methods=["POST"])
@login_required
def remove_sub_book():
parent_book_id=request.form['rem_sub_parent_id']
sub_book_id=request.form['rem_sub_sub_book_id']
sub_book=Book.query.get( sub_book_id )
remember=Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==parent_book_id, Book_Sub_Book_Link.sub_book_id==sub_book_id ).one()
orig_sub_book_num=remember.sub_book_num
try:
# delete book-subbook link for this subbook
db.session.delete(remember)
# need to reorder old sub_book_nums to remove the new gap we created
bsbls=Book_Sub_Book_Link.query.filter(Book_Sub_Book_Link.book_id==parent_book_id, Book_Sub_Book_Link.sub_book_num>=orig_sub_book_num ).all()
for sb in bsbls:
sb.sub_book_num=sb.sub_book_num-1
# now remove subbook itself
ClearAuthorsForBook(sub_book.id)
db.session.delete(sub_book)
db.session.commit()
return redirect( '/book/{}'.format(request.form['rem_sub_parent_id']) )
except SQLAlchemyError as e:
st.SetAlert( "danger" )
st.SetMessage( e.orig )
return redirect( '/book/{}'.format(request.form['rem_sub_sub_book_id']) )
################################################################################
# /book -> GET/POST -> creates a new book and when created, takes you back to
# /books (or /book/<parent_id> -- if you added a sub-book of parent_id
################################################################################
@app.route("/book", methods=["GET", "POST"])
@login_required
def new_book():
form = BookForm(request.form)
form.publisher.choices = [(c.id, c.name) for c in Publisher.query.order_by('name')]
form.owned.choices=[(c.id, c.name) for c in Owned.query.order_by('id')]
form.covertype.choices=[(c.id, c.name) for c in Covertype.query.order_by('id')]
form.condition.choices=[(c.id, c.name) for c in Condition.query.order_by('id')]
form.rating.choices=[(c.id, c.name) for c in Rating.query.order_by('id')]
page_title='Create new Book'
author_list = GetAuthors()
genre_list = GetGenres()
book_genres = []
bals=[]
auth_cnt=1
if request.method == 'POST':
# handle author info for new book
for el in request.form:
if 'author-' in el:
bals.append( Book_Author_Link( author_id=request.form[el], author_num=auth_cnt ) )
auth_cnt+=1
# handle genre info for new book
for genre in genre_list:
if "genre-{}".format(genre.id) in request.form:
book_genres.append( genre )
# handle creating a new sub-book of an existing book (add_sub_parent_id) - html / with form data for the new book...
if 'add_sub' in request.form:
book=Book.query.get(request.form['add_sub_parent_id'])
bb=QuickParentBook()
bb.parent=[]
bb.parent.append( { 'id': parent, 'title': book.title } )
form.publisher.default = book.publisher
form.owned.default = book.owned
form.condition.default = book.condition
form.covertype.default = book.covertype
form.blurb.default = book.blurb
form.process()
return render_template("book.html", page_title='Create new (sub) Book', b=bb, books=None, book_form=form, author_list=author_list, genre_list=genre_list, alert="", message="", poss_series_list=ListOfSeriesWithMissingBooks() )
elif form.validate_on_submit() and len(book_genres):
if request.form['year_published'].isnumeric():
book = Book( title=request.form['title'], owned=request.form['owned'], covertype=request.form['covertype'], condition=request.form['condition'], publisher=request.form['publisher'], year_published=request.form['year_published'], rating=request.form['rating'], notes=request.form['notes'], blurb=request.form['blurb'], genre=book_genres, bals=bals )
else:
book = Book( title=request.form['title'], owned=request.form['owned'], covertype=request.form['covertype'], condition=request.form['condition'], publisher=request.form['publisher'], rating=request.form['rating'], notes=request.form['notes'], blurb=request.form['blurb'], genre=book_genres, bals=bals )
db.session.add(book)
db.session.commit()
for tmp_bal in bals:
tmp_bal.book_id=book.id
books.bals=bals
db.session.commit()
# this is a sub-book we have added (after the data has been entered, now we commit it to DB)
if 'parent_id' in request.form:
with db.engine.connect() as conn:
conn.exec_driver_sql( "insert into book_sub_book_link ( book_id, sub_book_id, sub_book_num ) values ( {}, {}, (select COALESCE(MAX(sub_book_num),0)+1 from book_sub_book_link where book_id = {}) )".format( request.form['parent_id'], book.id, request.form['parent_id'] ) )
parent=Book.query.get(request.form['parent_id'])
if len(parent.series) > 0:
# we have added a sub-book to something in a series already, so add a bsl for the next book_num
for s in parent.bsl:
with db.engine.connect() as conn:
conn.exec_driver_sql( "insert into book_series_link ( series_id, book_id, book_num ) values ( {}, {}, (select COALESCE(MAX(book_num),0)+1 from book_series_link where series_id={}) )".format( s.series_id, book.id, s.series_id ) )
db.session.commit()
st.SetMessage( "Created new Book ({})".format(book.title) )
cnt=1
for field in request.form:
if 'bsl-book_id-' in field and field != 'bsl-book_id-NUM':
cnt=int(re.findall( '\d+', field )[0])
sql="insert into book_series_link (book_id, series_id, book_num) values ( {}, {}, {} )".format( book.id, request.form['bsl-series_id-{}'.format(cnt)], request.form['bsl-book_num-{}'.format(cnt)])
with db.engine.connect() as conn:
conn.exec_driver_sql( sql )
cnt=cnt+1
return redirect( '/book/{}'.format(book.id) )
else:
alert="danger"
message="Failed to create Book."
for field in form.errors:
message = "{}<br>{}={}".format( message, field, form.errors[field] )
if len(book_genres) == 0:
message = "{}<br>book has to have a genre selected".format( message )
if request.form['owned'] != ON_WISHLIST and not request.form['year_published'].isnumeric():
message = "{}<br>book is not on wish list, so needs a year_published between 1850 & 2100".format( message )
print( "ERROR: Failed to create book: {}".format(message) )
if request.form['year_published'].isnumeric():
book = Book( title=request.form["title"], owned=request.form['owned'], covertype=request.form['covertype'], condition=request.form['condition'], publisher=request.form['publisher'], year_published=request.form['year_published'], rating=request.form['rating'], notes=request.form['notes'], blurb=request.form['blurb'], genre=book_genres, bals=bals )
else:
book = Book( title=request.form["title"], owned=request.form['owned'], covertype=request.form['covertype'], condition=request.form['condition'], publisher=request.form['publisher'], rating=request.form['rating'], notes=request.form['notes'], blurb=request.form['blurb'], genre=book_genres, bals=bals )
if 'parent_id' in request.form:
bb=QuickParentBook()
bb.parent=[]
bb.parent.append( { 'id': request.form['parent_id'], 'title': request.form['parent_title'] } )
else:
bb=None
return render_template("book.html", page_title=page_title, b=bb, books=book, book_form=form, author_list=author_list, genre_list=genre_list, alert=alert, message=message, poss_series_list=ListOfSeriesWithMissingBooks() )
else:
return render_template("book.html", page_title=page_title, b=None, books=None, book_form=form, author_list=author_list, genre_list=genre_list, alert="success", message="", poss_series_list=ListOfSeriesWithMissingBooks() )
def ClearAuthorsForBook(id):
with db.engine.connect() as conn:
res = conn.exec_driver_sql( f"delete from book_author_link where book_id = {id}" )
db.session.commit()
# helper function to reduce code size for /book/<id>/ route - handles deleting book
def DeleteBook(id):
book = Book.query.get(id)
if book.IsParent():
st.SetAlert( "danger" )
st.SetMessage( "This is a parent book, cannot delete it without deleting sub books first" )
return id
else:
st.SetAlert("success")
st.SetMessage("Deleted {}".format(book.title) )
pid = 0
if book.IsChild():
pid = book.parent[0].id
try:
ClearAuthorsForBook(book.id)
db.session.delete(book)
db.session.commit()
except SQLAlchemyError as e:
st.SetAlert( "danger" )
st.SetMessage( e.orig )
return id
except Exception as e:
st.SetAlert( "danger" )
if "Dependency rule tried to blank-out primary key":
st.SetMessage( f"<b>Failed to delete book:</b> The book has a link to an another table (ddp messed up DB integrity) -- { str(e) }" )
else:
st.SetMessage( f"<b>Failed to delete book:</b>nbsp; {str(e)} ")
return id
if pid > 0:
return pid
else:
return None
# handle book view / update / delete route
@app.route("/book/<id>", methods=["GET", "POST"])
@login_required
def book(id):
book_form = BookForm(request.form)
book_form.publisher.choices = [(c.id, c.name) for c in Publisher.query.order_by('name')]
book_form.owned.choices=[(c.id, c.name) for c in Owned.query.order_by('id')]
book_form.covertype.choices=[(c.id, c.name) for c in Covertype.query.order_by('id')]
book_form.condition.choices=[(c.id, c.name) for c in Condition.query.order_by('id')]
book_form.rating.choices=[(c.id, c.name) for c in Rating.query.order_by('id')]
page_title='Edit Book'
CheckSeriesChange=None
if request.method == 'POST':
if 'delete' in request.form:
redirect_to=DeleteBook(id)
if redirect_to == None:
# happens in error conditions only
return redirect( '/' )
else:
# could return to parent book, or current book depending on what was delted
return redirect( f"/book/{redirect_to}" )
# save/update of book
elif book_form.validate():
book = Book.query.get(id)
book.title = request.form['title']
book.owned = request.form['owned']
book.covertype = request.form['covertype']
book.condition = request.form['condition']
book.publisher = request.form['publisher']
if request.form['year_published'].isnumeric():
book.year_published = request.form['year_published']
book.rating = request.form['rating']
book.notes = request.form['notes']
book.blurb = request.form['blurb']
# set book genre (empty list, add any that are checked on - this allows us to remove unticked ones)
book.genre = []
genre_list = GetGenres()
for genre in genre_list:
if "genre-{}".format(genre.id) in request.form:
book.genre.append( genre )
# set book author (empty list) - cant use ORM, not sure why
ClearAuthorsForBook( book.id )
# then use form data -> author-0, author-1, ... author-n) & append them back to now empty list
cnt=1
for el in request.form:
if 'author-' in el:
book.bals.append( Book_Author_Link( author_id=request.form[el], book_id=id, author_num=cnt ) )
cnt += 1
# go through form, if we have removed a series, then copy data out of form to be passed into html for a pop-up
removing_series=[]
for field in request.form:
if 'removed-book_num' in field:
cnt=int(re.findall( '\d+', field )[0])
removing_series.append( { 'series_id' : request.form['removed-series_id-{}'.format(cnt)] } )
if book.IsParent():
# go through children, and force sync any changes of physical parts of parent book
tmp_book=book_schema.dump(book)
for ch_ref in tmp_book['child_ref']:
ch_bid=GetBookIdFromBookSubBookLinkByIdAndSubBookNum( book.id, ch_ref['sub_book_num'] )
child_book=Book.query.get(ch_bid)
child_book.publisher=book.publisher
child_book.owned=book.owned
child_book.covertype=book.covertype
child_book.condition=book.condition
child_book.blurb=book.blurb
# okay, saving a parent book and we ARE removing a series, then
# need to pop-up for user, to ask what they want to do with sub-books
# and the series (likely remove them all too, but maybe just the parent?)
if book.IsParent() and len(removing_series) > 0:
CheckSeriesChange={'type':'parent', 'pid': book.id, 'bid': book.id, 'removing_series': removing_series }
# saving a child / sub_book, consider series
if book.IsChild() and len(removing_series) > 0:
CheckSeriesChange={'type':'child', 'pid': book.parent[0].id, 'bid': book.id, 'removing_series': removing_series }
else:
# either we are a normal book (no parent/child), OR not removing a series, might be adding though, so easiest is to
# delete all bsls and then add them back based on the request.form
Book_Series_Link.query.filter(Book_Series_Link.book_id == book.id ).delete()
cnt=1
for field in request.form:
if 'bsl-book_id-' in field and field != 'bsl-book_id-NUM':
cnt=int(re.findall( '\d+', field )[0])
if book.IsParent():
newbsl=Book_Series_Link( book_id=request.form[f'bsl-book_id-{cnt}'],
series_id=request.form[f'bsl-series_id-{cnt}'] )
else:
newbsl=Book_Series_Link( book_id=request.form[f'bsl-book_id-{cnt}'],
series_id=request.form[f'bsl-series_id-{cnt}'],
book_num=request.form[f'bsl-book_num-{cnt}'] )
# add the contains (null for book_num) bsl for the parent book
if book.IsChild():
parent_bsl=Book_Series_Link( book_id=book.parent[0].id, series_id=request.form[f'bsl-series_id-{cnt}'] )
db.session.add(parent_bsl)
db.session.add(newbsl)
db.session.commit()
# reset rating on this/these series as the book has changed (and maybe the rating has changed)
for field in request.form:
if 'bsl-book_id-' in field and field != 'bsl-book_id-NUM':
cnt=int(re.findall( '\d+', field )[0])
s=Series.query.get(request.form['bsl-series_id-{}'.format(cnt)])
s.calcd_rating = CalcAvgRating(s.id)
cnt=cnt+1
db.session.commit()
st.SetMessage( "Successfully Updated Book (id={})".format(id) )
else:
st.SetAlert("danger")
message=f"Failed to update Book (id={id}). "
if not book_form.year_published.data:
message += f" year_published cannot be empty";
if int(book_form.condition.data) == CONDITION_NOT_APPLICABLE:
message += f" condition cannot be N/A";
if int(book_form.covertype.data) == COVERTYPE_NOT_APPLICABLE:
message += f" covertype cannot be N/A";
book = Book.query.get(id)
st.SetMessage(message)
else:
book = Book.query.get(id)
if book == None:
st.SetAlert("danger")
st.SetMessage("Cannot find Book (id={})".format(id))
return render_template("base.html", alert=st.GetAlert(), message=st.GetMessage())
book_form=BookForm(obj=book)
book_form.publisher.choices = [(c.id, c.name) for c in Publisher.query.order_by('name')]
book_form.owned.choices=[(c.id, c.name) for c in Owned.query.order_by('id')]
book_form.covertype.choices=[(c.id, c.name) for c in Covertype.query.order_by('id')]
book_form.condition.choices=[(c.id, c.name) for c in Condition.query.order_by('id')]
book_form.rating.choices=[(c.id, c.name) for c in Rating.query.order_by('id')]
author_list = GetAuthors()
genre_list = GetGenres()
book_s = book_schema.dump(book)
return render_template("book.html", b=book, books=book_s, book_form=book_form, author_list=author_list, genre_list=genre_list, page_title=page_title, alert=st.GetAlert(), message=st.GetMessage(), poss_series_list=ListOfSeriesWithMissingBooks(), CheckSeriesChange=CheckSeriesChange)
def GetCount( what, where ):
st="select count(id) as count from book where "
with db.engine.connect() as conn:
res = conn.exec_driver_sql( st+where )
rtn={}
for row in res:
rtn['stat']=what
rtn['value']=row.count
return rtn
@app.route("/stats", methods=["GET"])
@login_required
def stats():
stats=[]
stats.append( GetCount( "Num physical Books in DB", "id not in ( select sub_book_id from book_sub_book_link )" ) )
stats.append( GetCount( "Num physical Books owned (aka books on shelf)", "owned=(select id from owned where name = 'Currently Owned') and id not in ( select sub_book_id from book_sub_book_link )" ) )
stats.append( GetCount( "Num physical Books on Wish List", "owned=(select id from owned where name='On Wish List') and id not in ( select sub_book_id from book_sub_book_link )" ) )
stats.append( GetCount( "Num physical Books sold", "owned=(select id from owned where name='Sold') and id not in ( select sub_book_id from book_sub_book_link )" ) )
stats.append( GetCount( "Num all Books in DB", "id>0" ) )
stats.append( GetCount( "Num all Books owned", "owned in (select id from owned where name='Currently Owned')" ) )
stats.append( GetCount( "Num all Books on wish list", "owned in (select id from owned where name='On Wish List')" ) )
stats.append( GetCount( "Num all Books sold", "owned=(select id from owned where name='Sold')" ) )
stats.append( GetCount( "Num all owned Books unrated", "rating in (select id from rating where name in ('N/A', 'Undefined')) and owned = (select id from owned where name='Currently Owned')" ) )
stats.append( GetCount( "Num all Books unrated", "rating in (select id from rating where name in ('N/A', 'Undefined'))" ) )
return render_template("stats.html", stats=stats )
@app.route("/rem_books_from_loan/<id>", methods=["POST"])
@login_required
def rem_books_from_loan(id):
for field in request.form:
rem_id=int(re.findall( '\d+', field )[0])
bll = Book_Loan_Link.query.filter(Book_Loan_Link.loan_id==id, Book_Loan_Link.book_id == rem_id ).one()
db.session.delete(bll)
db.session.commit()
return jsonify(success=True)
@app.route("/add_books_to_loan/<id>", methods=["POST"])
@login_required
def add_books_to_loan(id):
for field in request.form:
add_id=int(re.findall( '\d+', field )[0])
bll = Book_Loan_Link( loan_id=id, book_id=add_id )
db.session.add(bll)
db.session.commit()
return jsonify(success=True)
@app.route("/rem_parent_books_from_series/<pid>", methods=["POST"])
@login_required
def rem_parent_books_from_series(pid):
print ("pid={}".format(pid) )
try:
with db.engine.connect() as conn:
conn.exec_driver_sql( "delete from book_series_link where book_id in ( select sub_book_id from book_sub_book_link where book_id = {} ) ".format( pid ))
conn.exec_driver_sql( "delete from book_series_link where book_id = {}".format( pid ))
db.session.commit()
except SQLAlchemyError as e:
st.SetAlert("danger")
st.SetMessage("Failed to delete parent & sub books from ALL series! -- {}".format( e.orig ))
return jsonify(success=True)
@app.route("/books_on_shelf", methods=["GET"])
@login_required
def books_on_shelf():
# start with basic sort by title (we will sort by author at the end) in javascript,
# also re-order this list to deal with series ordering - this will leave overall sort by
# author, title & the moment we hit a series, the rest of those books are ordered by series
# start with all books owned by title, but it includes sub-books, so remove them...
# books = Book.query.join(Owned).filter(Owned.name=='Currently Owned').order_by(Book.title).all()
books = Book.query.join(Owned).join(Book_Author_Link).join(Author).filter(Owned.name=='Currently Owned').filter(Book_Author_Link.author_num==1).order_by(Author.surname,Author.firstnames,Book.title).all()
RemSubs(books)
# because a book can be in multiple series, without any ordering we need to find
# any book that is in a series, then go through each series to find the one with
# the most books in it. THEN we need to get all those books and put them in the
# ordered_books array, and remove them from the books array
ordered_books=[]
processed=[]
currently_owned = Owned.query.filter(Owned.name=='Currently Owned').one()
for b in books:
if b.series:
# find biggest Series
max_num_books=0
for s in b.series:
if max_num_books < s.num_books:
max_num_books = s.num_books
max_series_id= s.id
#order all books (sub-books too here!) in this series with the most books
bsl=Book_Series_Link.query.filter( Book_Series_Link.series_id==max_series_id ).order_by(Book_Series_Link.book_num).all()
for tmp in bsl:
tmp_b = Book.query.get(tmp.book_id)
# skip any books that are not owned sneaking back in
if tmp_b.owned != currently_owned.id:
continue
if tmp_b.IsChild():
# this child book wont be in books, but its parent will -> use it instead
# mark parent from books so we dont process it twice
if tmp_b.parent[0].id not in processed:
processed.append(tmp_b.id)
processed.append(tmp_b.parent[0].id)
ordered_books.append(tmp_b.parent[0])
else:
if tmp_b.id not in processed:
ordered_books.append(tmp_b)
processed.append(tmp_b.id)
else:
# book not in a series or a sub-book, so just add this book to the ordered list
ordered_books.append(b)
# return render_template("books.html", books=ordered_books, page_title="Books on Shelf", order_by="Author(s)", show_cols='', hide_cols='' )
for o in ordered_books:
print( f"ord: {o.title}" )
return render_template("books.html", books=ordered_books, page_title="Books on Shelf", order_by="", show_cols='', hide_cols='' )
@app.route("/unrated_books", methods=["GET"])
@login_required
def unrated_books():
books = Book.query.join(Rating,Owned).filter(Rating.name=='Undefined',Owned.name=='Currently Owned').all()
return render_template("books.html", books=books, page_title="Books with no rating", show_cols='Rating', hide_cols='' )
def FindMissingBooks():
with db.engine.connect() as conn:
tmp=conn.exec_driver_sql( "select s.*, count(bsl.book_num) from book_series_link bsl, series s where bsl.book_num is not null and s.id = bsl.series_id group by s.id order by s.title")
books=[]
sold=Owned.query.filter(Owned.name=='Sold').all()
for t in tmp:
if t.num_books != t.count:
num_sold=0
bsl=Book_Series_Link.query.filter( Book_Series_Link.series_id==t.id ).order_by(Book_Series_Link.book_num).all()
missing=[]
for cnt in range(1,t.num_books+1):
missing.append( cnt )
# check to see if the only books in this series are SOLD, if so, there
# are no missing books here, I clearly dont want more of this series
for b in bsl:
if b.book_num == None:
continue
missing.remove( b.book_num )
tmp_book=Book.query.get(b.book_id)
if tmp_book.owned == sold[0].id:
num_sold = num_sold + 1
if num_sold != t.count:
# turn missing from array into string, and strip 0 and last char (the square brackets)
books.append( { 'id': t.id, 'title': t.title, 'num_books': t.num_books, 'missing': str(missing)[1:-1] } )
return books
@app.route("/missing_books", methods=["GET"])
@login_required
def missing_books():
books=FindMissingBooks()
return render_template("missing.html", books=books )
@app.route("/wishlist", methods=["GET"])
@login_required
def wishlist():
books = Book.query.join(Owned).filter(Owned.name=='On Wish List').all()
return render_template("books.html", books=books, page_title="Books On Wish List", show_cols='', hide_cols='Publisher,Condition,Covertype' )
@app.route("/books_to_buy", methods=["GET"])
@login_required
def books_to_buy():
books = Book.query.join(Owned).filter(Owned.name=='On Wish List').all()
missing = FindMissingBooks()
return render_template("to_buy.html", books=books, missing=missing, page_title="Books To Buy")
@app.route("/needs_replacing", methods=["GET"])
@login_required
def needs_replacing():
books = Book.query.join(Condition,Owned).filter(Condition.name=='Needs Replacing',Owned.name=='Currently Owned').all()
return render_template("books.html", books=books, page_title="Books that Need Replacing", show_cols='', hide_cols='' )
@app.route("/sold", methods=["GET"])
@login_required
def sold_books():
books = Book.query.join(Owned).filter(Owned.name=='Sold').all()
return render_template("books.html", books=books, page_title="Books that were Sold", show_cols='', hide_cols='' )
@app.route("/poor_rating", methods=["GET"])
@login_required
def poor_rating_books():
books = Book.query.join(Rating,Owned).filter(Rating.id>6,Rating.name!='Undefined',Owned.name=='Currently Owned').all()
return render_template("books.html", books=books, page_title="Books that have a Poor Rating (<5 out of 10)", show_cols='Rating', hide_cols='' )
@app.route("/fix_an")
@login_required
def fix_an():
print("A")
books=Book.query.all();
print("B")
for b in books:
print( f"process {b.title}")
cnt=1
for a in b.bals:
print( f"process bal {cnt} for {b.title}")
a.author_num=cnt
db.session.add(a)
cnt+=1
db.session.commit()
return render_template("base.html", alert="success", message="Fixed author numbering" )
# default page, just the navbar
@app.route("/", methods=["GET"])
@login_required
def main_page():
# Redirect users who are not logged in.
if not current_user or current_user.is_anonymous:
return redirect(url_for('login'))
return render_template("base.html", alert=st.GetAlert(), message=st.GetMessage())
if __name__ == "__main__":
if os.environ['FLASK_ENV'] == "production":
app.run(ssl_context=('/etc/letsencrypt/live/book.depaoli.id.au/cert.pem', '/etc/letsencrypt/live/book.depaoli.id.au/privkey.pem'), host="0.0.0.0", debug=False)
else:
app.run(host="0.0.0.0", debug=True)
###############################################################################
# This func creates a new filter in jinja2 to test to hand back the username
# from the ldap dn
################################################################################
@app.template_filter('Username')
def _jinja2_filter_parentpath(dn):
# pull apart a dn (uid=xxx,dn=yyy,etc), and return the xxx
username=str(dn)
s=username.index('=')
s+=1
f=username.index(',')
return username[s:f]