added comment to remove hardcoded username/password
This commit is contained in:
317
ebook.py
Normal file
317
ebook.py
Normal file
@@ -0,0 +1,317 @@
|
||||
## cam test cases
|
||||
# - non-english?
|
||||
# - different formats?
|
||||
# - no ISBN?
|
||||
# - nothing found?
|
||||
# -- remove hardcoded password
|
||||
|
||||
|
||||
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from webdriver_manager.chrome import ChromeDriverManager
|
||||
from selenium.common.exceptions import NoSuchElementException, TimeoutException
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.common.by import By
|
||||
import requests
|
||||
import os
|
||||
import smtplib
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.base import MIMEBase
|
||||
from email import encoders, utils
|
||||
from flask import render_template, request
|
||||
from flask_wtf import FlaskForm
|
||||
from wtforms import StringField, SubmitField, validators
|
||||
from flask_login import login_required
|
||||
from main import app
|
||||
|
||||
class Scraper:
|
||||
def __init__(self):
|
||||
# Initialize driver & options
|
||||
self.waitTimeout = 10 # default timeout for explicit waits
|
||||
self.defaultDownloadDir = "/books" # default download directory, compose maps to /export/docker/storage/books
|
||||
|
||||
# Set Chrome options for headless mode
|
||||
self.options = Options()
|
||||
self.options.add_argument('--headless')
|
||||
self.options.add_argument('--disable-gpu')
|
||||
self.options.add_argument('--no-sandbox')
|
||||
self.options.add_argument('--disable-dev-shm-usage')
|
||||
|
||||
# Use ChromeDriverManager to manage `chromedriver` installation
|
||||
self.service = Service(ChromeDriverManager().install())
|
||||
self.driver = webdriver.Chrome(options=self.options, service=self.service)
|
||||
|
||||
###
|
||||
## Scraper.scrape(str keywords)
|
||||
#
|
||||
## Main func for scraping the page & downlading the ebook
|
||||
#
|
||||
# - return 0 = ok; file is sitting in /export/docker/storage/books/<Author>/book
|
||||
# - return 1 = no books found
|
||||
# - return 2 = something found, but not going to download (usually will be a broken mirror, PDF, or non-English). manually intervene.
|
||||
###
|
||||
def scrape(self, keywords):
|
||||
terms = keywords.replace(" ", "+") # url formatting
|
||||
self.driver.get(f"https://libgen.is/fiction/?q={terms}&criteria=&language=&format=")
|
||||
|
||||
# check if no results at all
|
||||
try:
|
||||
# if we find the "No files were found." text, return 1. if we except on an explicit wait timeout, pass.
|
||||
WebDriverWait(self.driver, 1.5).until(EC.visibility_of_element_located((By.XPATH,'//html/body/p[text()="No files were found."]')))
|
||||
return 1
|
||||
except TimeoutException:
|
||||
pass
|
||||
|
||||
best = {"index": -1, "ISBN": False, "format": "?", "size": -1.00}
|
||||
catalogTable = WebDriverWait(self.driver, self.waitTimeout).until(EC.visibility_of_element_located((By.XPATH, '//html/body/table/tbody')))
|
||||
for index, row in enumerate(catalogTable.find_elements(By.XPATH, './/tr')):
|
||||
entry = {"index": -1, "ISBN": False, "format": "?", "size": -1.00}
|
||||
|
||||
# initial check; ensure the book is in English. otherwise go to next book
|
||||
try:
|
||||
row.find_element(By.XPATH, './/td[text()="English"]')
|
||||
except NoSuchElementException:
|
||||
continue
|
||||
|
||||
# set ISBN if it exists
|
||||
try:
|
||||
if row.find_element(By.XPATH, './/p[@class="catalog_identifier"]'):
|
||||
entry["ISBN"] = True
|
||||
except NoSuchElementException:
|
||||
pass
|
||||
|
||||
# look for epub or mobi, otherwise go to next book
|
||||
try:
|
||||
fileTdStr = row.find_element(By.XPATH, './/td[contains(text(), "EPUB") or contains(text(), "MOBI")]').text
|
||||
except NoSuchElementException:
|
||||
continue
|
||||
|
||||
# set format
|
||||
entry["format"] = fileTdStr.split("/")[0][:-1]
|
||||
# set size
|
||||
entry["size"] = fileTdStr.split("/")[1][1:]
|
||||
if entry["size"].endswith("Kb"):
|
||||
entry["size"] = float(entry["size"][:-3]) / 1000
|
||||
elif entry["size"].endswith("Mb"):
|
||||
entry["size"] = float(entry["size"][:-3])
|
||||
# set index
|
||||
entry["index"] = index
|
||||
# set best
|
||||
best = self.compareEntries(entry, best)
|
||||
# now that we have a selected entry, download it.
|
||||
entry = catalogTable.find_elements(By.XPATH, './/tr')[best["index"]]
|
||||
try:
|
||||
mirror1 = entry.find_element(By.XPATH, './/ul[@class="record_mirrors_compact"]/li[1]/a').get_attribute('href')
|
||||
mirror2 = entry.find_element(By.XPATH, './/ul[@class="record_mirrors_compact"]/li[2]/a').get_attribute('href')
|
||||
except NoSuchElementException:
|
||||
pass
|
||||
|
||||
# download the book
|
||||
# get both mirror links
|
||||
self.driver.get(mirror1)
|
||||
downloadLink1 = WebDriverWait(self.driver, self.waitTimeout).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "GET")]'))).get_attribute('href')
|
||||
#self.driver.get(mirror2)
|
||||
#downloadLink2 = WebDriverWait(self.driver, self.waitTimeout).until(EC.element_to_be_clickable((By.XPATH, '//h2[contains(text(), "GET")]/../'))).get_attribute('href')
|
||||
res, path = self.download(downloadLink1)
|
||||
if res == 0:
|
||||
return (0, path)
|
||||
else:
|
||||
## TODO: fix this to use both mirrors...
|
||||
#if self.download(downloadLink2) == 0:
|
||||
# return 0
|
||||
#else:
|
||||
# return 2
|
||||
return 2
|
||||
###
|
||||
## Scraper.compareEntries(self, dict entry)
|
||||
# - entry = {"index": int, "ISBN": bool, "format": str(EPUB/MOBI), "size": float}
|
||||
#
|
||||
## Utility function for scrape() to compare book entries against cam's algorithm (below)
|
||||
#
|
||||
## cam's algorithm here is to check for:
|
||||
# - anything with a listed ISBN; then check if EPUB; then check for biggest (filesize) EPUB (bc usually has extras and/or images/maps/etc
|
||||
# - if no listed ISBN, same process. ISBN listings with lower filesizes take precedence.
|
||||
# - otherwise grab the biggest MOBI. if no epub nor mobi, return 2.
|
||||
#
|
||||
###
|
||||
def compareEntries(self, entry1, entry2):
|
||||
## if index == -1 then its the temp one, just return the other
|
||||
if entry1["index"] == -1:
|
||||
return entry2
|
||||
if entry2["index"] == -1:
|
||||
return entry1
|
||||
|
||||
## compare ISBNs
|
||||
if entry1["ISBN"] and not entry2["ISBN"]:
|
||||
return entry1
|
||||
elif not entry1["ISBN"] and entry2["ISBN"]:
|
||||
return entry2
|
||||
|
||||
## compare formats
|
||||
if entry1["format"] == "EPUB" and entry2["format"] == "EPUB":
|
||||
## both epubs, so check filesize
|
||||
if entry1["size"] > entry2["size"]:
|
||||
return entry1
|
||||
elif entry1["size"] < entry2["size"]:
|
||||
return entry2
|
||||
# if the sizes are the same && both epubs, then fuck it just pick one..
|
||||
elif entry1["size"] == entry2["size"]:
|
||||
return entry1
|
||||
elif entry1["format"] == "EPUB" and not entry2["format"] == "EPUB":
|
||||
return entry1
|
||||
elif not entry1["format"] == "EPUB" and entry2["format"] == "EPUB":
|
||||
return entry2
|
||||
elif entry1["format"] == "MOBI" and entry2["format"] == "MOBI":
|
||||
## both mobis, so check filesize
|
||||
if entry1["size"] > entry2["size"]:
|
||||
return entry1
|
||||
elif entry1["size"] < entry2["size"]:
|
||||
return entry2
|
||||
# if the sizes are the same && both mobis, then fuck it just pick one..
|
||||
elif entry1["size"] == entry2["size"]:
|
||||
return entry1
|
||||
|
||||
###
|
||||
# Scraper.download(self, str url)
|
||||
#
|
||||
## Utility function for scrape() to download a file.
|
||||
# Does some other things, namely checking dirs and filestream/headers.
|
||||
###
|
||||
|
||||
def download(self, url):
|
||||
target_dir = self.defaultDownloadDir + "/tmp" # TODO: tmp for now, later make this an author.
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
|
||||
response = requests.get(url, stream=True)
|
||||
if response.status_code == 200:
|
||||
# get filename from url, or from headers if exists
|
||||
filename = url.split("/")[-1]
|
||||
if 'content-disposition' in response.headers:
|
||||
# attempt to extract filename from headers
|
||||
filename = response.headers["content-disposition"].split("filename=")[-1].strip('"')
|
||||
print('content-disposition:', filename)
|
||||
|
||||
file_path = target_dir + "/" + filename
|
||||
|
||||
# write the file
|
||||
print(file_path)
|
||||
with open(file_path, 'wb') as f:
|
||||
print('writing file...')
|
||||
for chunk in response.iter_content(8192):
|
||||
f.write(chunk)
|
||||
return (0, file_path)
|
||||
else:
|
||||
return 2
|
||||
|
||||
class Mailer:
|
||||
def __init__(self):
|
||||
# our server uses starttls
|
||||
self.smtp_server = "depaoli.id.au"
|
||||
self.port = 587
|
||||
self.email = "cam@depaoli.id.au"
|
||||
self.password = "echo $1 | grep 1"
|
||||
|
||||
###
|
||||
# Mailer.sendMail(self, str to, str subject, str body, str attachment_path)
|
||||
#
|
||||
## Utility function for sending an email with an attachment.
|
||||
#
|
||||
# - return 0 = ok
|
||||
# - return 1 = error; usually in sending.
|
||||
###
|
||||
def sendMail(self, to, subject, body, attachment_path):
|
||||
print(f"Sending email to {to} with subject {subject} and attachment {attachment_path}.")
|
||||
msg = MIMEMultipart()
|
||||
msg['From'] = self.email
|
||||
msg['To'] = to
|
||||
msg["Date"] = utils.formatdate(localtime=True)
|
||||
msg['Subject'] = subject
|
||||
msg['Message-Id'] = utils.make_msgid(domain='depaoli.id.au')
|
||||
|
||||
msg.attach(MIMEText(body, 'plain'))
|
||||
|
||||
# attach the file
|
||||
with open(attachment_path, 'rb') as f:
|
||||
attachment = MIMEBase('application', 'epub+zip')
|
||||
attachment.set_payload(f.read())
|
||||
|
||||
|
||||
#Content-Type: application/epub+zip
|
||||
#Content-Transfer-Encoding: base64
|
||||
#Content-Disposition: attachment;
|
||||
# filename="Summoner: : The Battlemage: Book 3 - Taran Matharu.epub"
|
||||
#MIME-Version: 1.0
|
||||
|
||||
|
||||
# encode data to base64
|
||||
encoders.encode_base64(attachment)
|
||||
|
||||
# add headers to indicate content-disposition of attachment
|
||||
attachment.add_header('Content-Disposition', f'attachment;\n filename="{attachment_path.split("/")[-1]}"')
|
||||
msg.attach(attachment)
|
||||
|
||||
# send the email
|
||||
try:
|
||||
server = smtplib.SMTP(self.smtp_server, self.port)
|
||||
server.starttls()
|
||||
server.login(self.email, self.password)
|
||||
server.sendmail(self.email, to, msg.as_string())
|
||||
server.quit()
|
||||
return 0
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return 1
|
||||
|
||||
class eBookForm(FlaskForm):
|
||||
keywords = StringField('Keywords:', validators=[validators.DataRequired()])
|
||||
submit = SubmitField('Submit')
|
||||
|
||||
###
|
||||
# get_ebook(str keywords)
|
||||
#
|
||||
## GET = Frontend page for user to input keywords, see status, and path.
|
||||
## POST = Wrapper function for Jinja to use.
|
||||
#
|
||||
###
|
||||
@app.route('/get_ebook', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def get_ebook(keywords=None):
|
||||
form = eBookForm(request.form)
|
||||
page_title = "Get an eBook"
|
||||
if request.method == 'POST' and form.validate():
|
||||
keywords = request.form['keywords']
|
||||
scraper = Scraper()
|
||||
res_scrape, path = scraper.scrape(keywords)
|
||||
|
||||
if res_scrape != 0:
|
||||
scraper.driver.quit()
|
||||
return (res_scrape, "no file")
|
||||
|
||||
scraper.driver.quit()
|
||||
mailer = Mailer()
|
||||
res_mail = mailer.sendMail("dshop+amazon_bgh507@kindle.com", "eBook send - cdp test", "sending book", path)
|
||||
res_mail = mailer.sendMail("cam@depaoli.id.au", "eBook send - cdp", "sending book", path) # test
|
||||
if res_mail != 0:
|
||||
return (res_mail, "no file")
|
||||
page_title = "Success!"
|
||||
return render_template('get_ebook.html', form=form, page_title=page_title)
|
||||
elif request.method == 'GET':
|
||||
return render_template('get_ebook.html', form=form, page_title=page_title)
|
||||
|
||||
|
||||
#if __name__ == "__main__":
|
||||
# scraper = Scraper()
|
||||
# res, path = scraper.scrape("the outcast taran matharu")
|
||||
# print("FINISHED, RESULT = ", res)
|
||||
# scraper.driver.quit()
|
||||
#
|
||||
# #path = "C:/Users/cam/Desktop/code/tmp/(Summoner 4) Matharu, Taran - The Outcast.epub"
|
||||
# #print(path)
|
||||
# #mailer = Mailer()
|
||||
# #res = mailer.sendMail("dshop+amazon_bgh507@kindle.com", "eBook send - cdp test", "sending book", path)
|
||||
# #res = mailer.sendMail("cam@depaoli.id.au", "eBook send - cdp test", "sending book", path)
|
||||
Reference in New Issue
Block a user