318 lines
12 KiB
Python
318 lines
12 KiB
Python
## cam test cases
|
|
# - non-english?
|
|
# - different formats?
|
|
# - no ISBN?
|
|
# - nothing found?
|
|
# -- remove hardcoded password
|
|
|
|
|
|
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
from selenium.common.exceptions import NoSuchElementException, TimeoutException
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.common.by import By
|
|
import requests
|
|
import os
|
|
import smtplib
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders, utils
|
|
from flask import render_template, request
|
|
from flask_wtf import FlaskForm
|
|
from wtforms import StringField, SubmitField, validators
|
|
from flask_login import login_required
|
|
from main import app
|
|
|
|
class Scraper:
|
|
def __init__(self):
|
|
# Initialize driver & options
|
|
self.waitTimeout = 10 # default timeout for explicit waits
|
|
self.defaultDownloadDir = "/books" # default download directory, compose maps to /export/docker/storage/books
|
|
|
|
# Set Chrome options for headless mode
|
|
self.options = Options()
|
|
self.options.add_argument('--headless')
|
|
self.options.add_argument('--disable-gpu')
|
|
self.options.add_argument('--no-sandbox')
|
|
self.options.add_argument('--disable-dev-shm-usage')
|
|
|
|
# Use ChromeDriverManager to manage `chromedriver` installation
|
|
self.service = Service(ChromeDriverManager().install())
|
|
self.driver = webdriver.Chrome(options=self.options, service=self.service)
|
|
|
|
###
|
|
## Scraper.scrape(str keywords)
|
|
#
|
|
## Main func for scraping the page & downlading the ebook
|
|
#
|
|
# - return 0 = ok; file is sitting in /export/docker/storage/books/<Author>/book
|
|
# - return 1 = no books found
|
|
# - return 2 = something found, but not going to download (usually will be a broken mirror, PDF, or non-English). manually intervene.
|
|
###
|
|
def scrape(self, keywords):
|
|
terms = keywords.replace(" ", "+") # url formatting
|
|
self.driver.get(f"https://libgen.is/fiction/?q={terms}&criteria=&language=&format=")
|
|
|
|
# check if no results at all
|
|
try:
|
|
# if we find the "No files were found." text, return 1. if we except on an explicit wait timeout, pass.
|
|
WebDriverWait(self.driver, 1.5).until(EC.visibility_of_element_located((By.XPATH,'//html/body/p[text()="No files were found."]')))
|
|
return 1
|
|
except TimeoutException:
|
|
pass
|
|
|
|
best = {"index": -1, "ISBN": False, "format": "?", "size": -1.00}
|
|
catalogTable = WebDriverWait(self.driver, self.waitTimeout).until(EC.visibility_of_element_located((By.XPATH, '//html/body/table/tbody')))
|
|
for index, row in enumerate(catalogTable.find_elements(By.XPATH, './/tr')):
|
|
entry = {"index": -1, "ISBN": False, "format": "?", "size": -1.00}
|
|
|
|
# initial check; ensure the book is in English. otherwise go to next book
|
|
try:
|
|
row.find_element(By.XPATH, './/td[text()="English"]')
|
|
except NoSuchElementException:
|
|
continue
|
|
|
|
# set ISBN if it exists
|
|
try:
|
|
if row.find_element(By.XPATH, './/p[@class="catalog_identifier"]'):
|
|
entry["ISBN"] = True
|
|
except NoSuchElementException:
|
|
pass
|
|
|
|
# look for epub or mobi, otherwise go to next book
|
|
try:
|
|
fileTdStr = row.find_element(By.XPATH, './/td[contains(text(), "EPUB") or contains(text(), "MOBI")]').text
|
|
except NoSuchElementException:
|
|
continue
|
|
|
|
# set format
|
|
entry["format"] = fileTdStr.split("/")[0][:-1]
|
|
# set size
|
|
entry["size"] = fileTdStr.split("/")[1][1:]
|
|
if entry["size"].endswith("Kb"):
|
|
entry["size"] = float(entry["size"][:-3]) / 1000
|
|
elif entry["size"].endswith("Mb"):
|
|
entry["size"] = float(entry["size"][:-3])
|
|
# set index
|
|
entry["index"] = index
|
|
# set best
|
|
best = self.compareEntries(entry, best)
|
|
# now that we have a selected entry, download it.
|
|
entry = catalogTable.find_elements(By.XPATH, './/tr')[best["index"]]
|
|
try:
|
|
mirror1 = entry.find_element(By.XPATH, './/ul[@class="record_mirrors_compact"]/li[1]/a').get_attribute('href')
|
|
mirror2 = entry.find_element(By.XPATH, './/ul[@class="record_mirrors_compact"]/li[2]/a').get_attribute('href')
|
|
except NoSuchElementException:
|
|
pass
|
|
|
|
# download the book
|
|
# get both mirror links
|
|
self.driver.get(mirror1)
|
|
downloadLink1 = WebDriverWait(self.driver, self.waitTimeout).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "GET")]'))).get_attribute('href')
|
|
#self.driver.get(mirror2)
|
|
#downloadLink2 = WebDriverWait(self.driver, self.waitTimeout).until(EC.element_to_be_clickable((By.XPATH, '//h2[contains(text(), "GET")]/../'))).get_attribute('href')
|
|
res, path = self.download(downloadLink1)
|
|
if res == 0:
|
|
return (0, path)
|
|
else:
|
|
## TODO: fix this to use both mirrors...
|
|
#if self.download(downloadLink2) == 0:
|
|
# return 0
|
|
#else:
|
|
# return 2
|
|
return 2
|
|
###
|
|
## Scraper.compareEntries(self, dict entry)
|
|
# - entry = {"index": int, "ISBN": bool, "format": str(EPUB/MOBI), "size": float}
|
|
#
|
|
## Utility function for scrape() to compare book entries against cam's algorithm (below)
|
|
#
|
|
## cam's algorithm here is to check for:
|
|
# - anything with a listed ISBN; then check if EPUB; then check for biggest (filesize) EPUB (bc usually has extras and/or images/maps/etc
|
|
# - if no listed ISBN, same process. ISBN listings with lower filesizes take precedence.
|
|
# - otherwise grab the biggest MOBI. if no epub nor mobi, return 2.
|
|
#
|
|
###
|
|
def compareEntries(self, entry1, entry2):
|
|
## if index == -1 then its the temp one, just return the other
|
|
if entry1["index"] == -1:
|
|
return entry2
|
|
if entry2["index"] == -1:
|
|
return entry1
|
|
|
|
## compare ISBNs
|
|
if entry1["ISBN"] and not entry2["ISBN"]:
|
|
return entry1
|
|
elif not entry1["ISBN"] and entry2["ISBN"]:
|
|
return entry2
|
|
|
|
## compare formats
|
|
if entry1["format"] == "EPUB" and entry2["format"] == "EPUB":
|
|
## both epubs, so check filesize
|
|
if entry1["size"] > entry2["size"]:
|
|
return entry1
|
|
elif entry1["size"] < entry2["size"]:
|
|
return entry2
|
|
# if the sizes are the same && both epubs, then fuck it just pick one..
|
|
elif entry1["size"] == entry2["size"]:
|
|
return entry1
|
|
elif entry1["format"] == "EPUB" and not entry2["format"] == "EPUB":
|
|
return entry1
|
|
elif not entry1["format"] == "EPUB" and entry2["format"] == "EPUB":
|
|
return entry2
|
|
elif entry1["format"] == "MOBI" and entry2["format"] == "MOBI":
|
|
## both mobis, so check filesize
|
|
if entry1["size"] > entry2["size"]:
|
|
return entry1
|
|
elif entry1["size"] < entry2["size"]:
|
|
return entry2
|
|
# if the sizes are the same && both mobis, then fuck it just pick one..
|
|
elif entry1["size"] == entry2["size"]:
|
|
return entry1
|
|
|
|
###
|
|
# Scraper.download(self, str url)
|
|
#
|
|
## Utility function for scrape() to download a file.
|
|
# Does some other things, namely checking dirs and filestream/headers.
|
|
###
|
|
|
|
def download(self, url):
|
|
target_dir = self.defaultDownloadDir + "/tmp" # TODO: tmp for now, later make this an author.
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
|
|
response = requests.get(url, stream=True)
|
|
if response.status_code == 200:
|
|
# get filename from url, or from headers if exists
|
|
filename = url.split("/")[-1]
|
|
if 'content-disposition' in response.headers:
|
|
# attempt to extract filename from headers
|
|
filename = response.headers["content-disposition"].split("filename=")[-1].strip('"')
|
|
print('content-disposition:', filename)
|
|
|
|
file_path = target_dir + "/" + filename
|
|
|
|
# write the file
|
|
print(file_path)
|
|
with open(file_path, 'wb') as f:
|
|
print('writing file...')
|
|
for chunk in response.iter_content(8192):
|
|
f.write(chunk)
|
|
return (0, file_path)
|
|
else:
|
|
return 2
|
|
|
|
class Mailer:
|
|
def __init__(self):
|
|
# our server uses starttls
|
|
self.smtp_server = "depaoli.id.au"
|
|
self.port = 587
|
|
self.email = "cam@depaoli.id.au"
|
|
self.password = "echo $1 | grep 1"
|
|
|
|
###
|
|
# Mailer.sendMail(self, str to, str subject, str body, str attachment_path)
|
|
#
|
|
## Utility function for sending an email with an attachment.
|
|
#
|
|
# - return 0 = ok
|
|
# - return 1 = error; usually in sending.
|
|
###
|
|
def sendMail(self, to, subject, body, attachment_path):
|
|
print(f"Sending email to {to} with subject {subject} and attachment {attachment_path}.")
|
|
msg = MIMEMultipart()
|
|
msg['From'] = self.email
|
|
msg['To'] = to
|
|
msg["Date"] = utils.formatdate(localtime=True)
|
|
msg['Subject'] = subject
|
|
msg['Message-Id'] = utils.make_msgid(domain='depaoli.id.au')
|
|
|
|
msg.attach(MIMEText(body, 'plain'))
|
|
|
|
# attach the file
|
|
with open(attachment_path, 'rb') as f:
|
|
attachment = MIMEBase('application', 'epub+zip')
|
|
attachment.set_payload(f.read())
|
|
|
|
|
|
#Content-Type: application/epub+zip
|
|
#Content-Transfer-Encoding: base64
|
|
#Content-Disposition: attachment;
|
|
# filename="Summoner: : The Battlemage: Book 3 - Taran Matharu.epub"
|
|
#MIME-Version: 1.0
|
|
|
|
|
|
# encode data to base64
|
|
encoders.encode_base64(attachment)
|
|
|
|
# add headers to indicate content-disposition of attachment
|
|
attachment.add_header('Content-Disposition', f'attachment;\n filename="{attachment_path.split("/")[-1]}"')
|
|
msg.attach(attachment)
|
|
|
|
# send the email
|
|
try:
|
|
server = smtplib.SMTP(self.smtp_server, self.port)
|
|
server.starttls()
|
|
server.login(self.email, self.password)
|
|
server.sendmail(self.email, to, msg.as_string())
|
|
server.quit()
|
|
return 0
|
|
except Exception as e:
|
|
print(e)
|
|
return 1
|
|
|
|
class eBookForm(FlaskForm):
|
|
keywords = StringField('Keywords:', validators=[validators.DataRequired()])
|
|
submit = SubmitField('Submit')
|
|
|
|
###
|
|
# get_ebook(str keywords)
|
|
#
|
|
## GET = Frontend page for user to input keywords, see status, and path.
|
|
## POST = Wrapper function for Jinja to use.
|
|
#
|
|
###
|
|
@app.route('/get_ebook', methods=['GET', 'POST'])
|
|
@login_required
|
|
def get_ebook(keywords=None):
|
|
form = eBookForm(request.form)
|
|
page_title = "Get an eBook"
|
|
if request.method == 'POST' and form.validate():
|
|
keywords = request.form['keywords']
|
|
scraper = Scraper()
|
|
res_scrape, path = scraper.scrape(keywords)
|
|
|
|
if res_scrape != 0:
|
|
scraper.driver.quit()
|
|
return (res_scrape, "no file")
|
|
|
|
scraper.driver.quit()
|
|
mailer = Mailer()
|
|
res_mail = mailer.sendMail("dshop+amazon_bgh507@kindle.com", "eBook send - cdp test", "sending book", path)
|
|
res_mail = mailer.sendMail("cam@depaoli.id.au", "eBook send - cdp", "sending book", path) # test
|
|
if res_mail != 0:
|
|
return (res_mail, "no file")
|
|
page_title = "Success!"
|
|
return render_template('get_ebook.html', form=form, page_title=page_title)
|
|
elif request.method == 'GET':
|
|
return render_template('get_ebook.html', form=form, page_title=page_title)
|
|
|
|
|
|
#if __name__ == "__main__":
|
|
# scraper = Scraper()
|
|
# res, path = scraper.scrape("the outcast taran matharu")
|
|
# print("FINISHED, RESULT = ", res)
|
|
# scraper.driver.quit()
|
|
#
|
|
# #path = "C:/Users/cam/Desktop/code/tmp/(Summoner 4) Matharu, Taran - The Outcast.epub"
|
|
# #print(path)
|
|
# #mailer = Mailer()
|
|
# #res = mailer.sendMail("dshop+amazon_bgh507@kindle.com", "eBook send - cdp test", "sending book", path)
|
|
# #res = mailer.sendMail("cam@depaoli.id.au", "eBook send - cdp test", "sending book", path)
|