From 4d89e062033304dd5a2b26c4e98d73cacb7b275c Mon Sep 17 00:00:00 2001 From: Damien De Paoli Date: Fri, 15 Nov 2024 23:09:18 +1100 Subject: [PATCH] added comment to remove hardcoded username/password --- ebook.py | 317 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 317 insertions(+) create mode 100644 ebook.py diff --git a/ebook.py b/ebook.py new file mode 100644 index 0000000..67f0ef9 --- /dev/null +++ b/ebook.py @@ -0,0 +1,317 @@ +## cam test cases + # - non-english? + # - different formats? + # - no ISBN? + # - nothing found? + # -- remove hardcoded password + + + + +from selenium import webdriver +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.chrome.options import Options +from webdriver_manager.chrome import ChromeDriverManager +from selenium.common.exceptions import NoSuchElementException, TimeoutException +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.common.by import By +import requests +import os +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.mime.base import MIMEBase +from email import encoders, utils +from flask import render_template, request +from flask_wtf import FlaskForm +from wtforms import StringField, SubmitField, validators +from flask_login import login_required +from main import app + +class Scraper: + def __init__(self): + # Initialize driver & options + self.waitTimeout = 10 # default timeout for explicit waits + self.defaultDownloadDir = "/books" # default download directory, compose maps to /export/docker/storage/books + + # Set Chrome options for headless mode + self.options = Options() + self.options.add_argument('--headless') + self.options.add_argument('--disable-gpu') + self.options.add_argument('--no-sandbox') + self.options.add_argument('--disable-dev-shm-usage') + + # Use ChromeDriverManager to manage `chromedriver` installation + self.service = Service(ChromeDriverManager().install()) + self.driver = webdriver.Chrome(options=self.options, service=self.service) + + ### + ## Scraper.scrape(str keywords) + # + ## Main func for scraping the page & downlading the ebook + # + # - return 0 = ok; file is sitting in /export/docker/storage/books//book + # - return 1 = no books found + # - return 2 = something found, but not going to download (usually will be a broken mirror, PDF, or non-English). manually intervene. + ### + def scrape(self, keywords): + terms = keywords.replace(" ", "+") # url formatting + self.driver.get(f"https://libgen.is/fiction/?q={terms}&criteria=&language=&format=") + + # check if no results at all + try: + # if we find the "No files were found." text, return 1. if we except on an explicit wait timeout, pass. + WebDriverWait(self.driver, 1.5).until(EC.visibility_of_element_located((By.XPATH,'//html/body/p[text()="No files were found."]'))) + return 1 + except TimeoutException: + pass + + best = {"index": -1, "ISBN": False, "format": "?", "size": -1.00} + catalogTable = WebDriverWait(self.driver, self.waitTimeout).until(EC.visibility_of_element_located((By.XPATH, '//html/body/table/tbody'))) + for index, row in enumerate(catalogTable.find_elements(By.XPATH, './/tr')): + entry = {"index": -1, "ISBN": False, "format": "?", "size": -1.00} + + # initial check; ensure the book is in English. otherwise go to next book + try: + row.find_element(By.XPATH, './/td[text()="English"]') + except NoSuchElementException: + continue + + # set ISBN if it exists + try: + if row.find_element(By.XPATH, './/p[@class="catalog_identifier"]'): + entry["ISBN"] = True + except NoSuchElementException: + pass + + # look for epub or mobi, otherwise go to next book + try: + fileTdStr = row.find_element(By.XPATH, './/td[contains(text(), "EPUB") or contains(text(), "MOBI")]').text + except NoSuchElementException: + continue + + # set format + entry["format"] = fileTdStr.split("/")[0][:-1] + # set size + entry["size"] = fileTdStr.split("/")[1][1:] + if entry["size"].endswith("Kb"): + entry["size"] = float(entry["size"][:-3]) / 1000 + elif entry["size"].endswith("Mb"): + entry["size"] = float(entry["size"][:-3]) + # set index + entry["index"] = index + # set best + best = self.compareEntries(entry, best) + # now that we have a selected entry, download it. + entry = catalogTable.find_elements(By.XPATH, './/tr')[best["index"]] + try: + mirror1 = entry.find_element(By.XPATH, './/ul[@class="record_mirrors_compact"]/li[1]/a').get_attribute('href') + mirror2 = entry.find_element(By.XPATH, './/ul[@class="record_mirrors_compact"]/li[2]/a').get_attribute('href') + except NoSuchElementException: + pass + + # download the book + # get both mirror links + self.driver.get(mirror1) + downloadLink1 = WebDriverWait(self.driver, self.waitTimeout).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "GET")]'))).get_attribute('href') + #self.driver.get(mirror2) + #downloadLink2 = WebDriverWait(self.driver, self.waitTimeout).until(EC.element_to_be_clickable((By.XPATH, '//h2[contains(text(), "GET")]/../'))).get_attribute('href') + res, path = self.download(downloadLink1) + if res == 0: + return (0, path) + else: + ## TODO: fix this to use both mirrors... + #if self.download(downloadLink2) == 0: + # return 0 + #else: + # return 2 + return 2 + ### + ## Scraper.compareEntries(self, dict entry) + # - entry = {"index": int, "ISBN": bool, "format": str(EPUB/MOBI), "size": float} + # + ## Utility function for scrape() to compare book entries against cam's algorithm (below) + # + ## cam's algorithm here is to check for: + # - anything with a listed ISBN; then check if EPUB; then check for biggest (filesize) EPUB (bc usually has extras and/or images/maps/etc + # - if no listed ISBN, same process. ISBN listings with lower filesizes take precedence. + # - otherwise grab the biggest MOBI. if no epub nor mobi, return 2. + # + ### + def compareEntries(self, entry1, entry2): + ## if index == -1 then its the temp one, just return the other + if entry1["index"] == -1: + return entry2 + if entry2["index"] == -1: + return entry1 + + ## compare ISBNs + if entry1["ISBN"] and not entry2["ISBN"]: + return entry1 + elif not entry1["ISBN"] and entry2["ISBN"]: + return entry2 + + ## compare formats + if entry1["format"] == "EPUB" and entry2["format"] == "EPUB": + ## both epubs, so check filesize + if entry1["size"] > entry2["size"]: + return entry1 + elif entry1["size"] < entry2["size"]: + return entry2 + # if the sizes are the same && both epubs, then fuck it just pick one.. + elif entry1["size"] == entry2["size"]: + return entry1 + elif entry1["format"] == "EPUB" and not entry2["format"] == "EPUB": + return entry1 + elif not entry1["format"] == "EPUB" and entry2["format"] == "EPUB": + return entry2 + elif entry1["format"] == "MOBI" and entry2["format"] == "MOBI": + ## both mobis, so check filesize + if entry1["size"] > entry2["size"]: + return entry1 + elif entry1["size"] < entry2["size"]: + return entry2 + # if the sizes are the same && both mobis, then fuck it just pick one.. + elif entry1["size"] == entry2["size"]: + return entry1 + + ### + # Scraper.download(self, str url) + # + ## Utility function for scrape() to download a file. + # Does some other things, namely checking dirs and filestream/headers. + ### + + def download(self, url): + target_dir = self.defaultDownloadDir + "/tmp" # TODO: tmp for now, later make this an author. + os.makedirs(target_dir, exist_ok=True) + + response = requests.get(url, stream=True) + if response.status_code == 200: + # get filename from url, or from headers if exists + filename = url.split("/")[-1] + if 'content-disposition' in response.headers: + # attempt to extract filename from headers + filename = response.headers["content-disposition"].split("filename=")[-1].strip('"') + print('content-disposition:', filename) + + file_path = target_dir + "/" + filename + + # write the file + print(file_path) + with open(file_path, 'wb') as f: + print('writing file...') + for chunk in response.iter_content(8192): + f.write(chunk) + return (0, file_path) + else: + return 2 + +class Mailer: + def __init__(self): + # our server uses starttls + self.smtp_server = "depaoli.id.au" + self.port = 587 + self.email = "cam@depaoli.id.au" + self.password = "echo $1 | grep 1" + + ### + # Mailer.sendMail(self, str to, str subject, str body, str attachment_path) + # + ## Utility function for sending an email with an attachment. + # + # - return 0 = ok + # - return 1 = error; usually in sending. + ### + def sendMail(self, to, subject, body, attachment_path): + print(f"Sending email to {to} with subject {subject} and attachment {attachment_path}.") + msg = MIMEMultipart() + msg['From'] = self.email + msg['To'] = to + msg["Date"] = utils.formatdate(localtime=True) + msg['Subject'] = subject + msg['Message-Id'] = utils.make_msgid(domain='depaoli.id.au') + + msg.attach(MIMEText(body, 'plain')) + + # attach the file + with open(attachment_path, 'rb') as f: + attachment = MIMEBase('application', 'epub+zip') + attachment.set_payload(f.read()) + + +#Content-Type: application/epub+zip +#Content-Transfer-Encoding: base64 +#Content-Disposition: attachment; +# filename="Summoner: : The Battlemage: Book 3 - Taran Matharu.epub" +#MIME-Version: 1.0 + + + # encode data to base64 + encoders.encode_base64(attachment) + + # add headers to indicate content-disposition of attachment + attachment.add_header('Content-Disposition', f'attachment;\n filename="{attachment_path.split("/")[-1]}"') + msg.attach(attachment) + + # send the email + try: + server = smtplib.SMTP(self.smtp_server, self.port) + server.starttls() + server.login(self.email, self.password) + server.sendmail(self.email, to, msg.as_string()) + server.quit() + return 0 + except Exception as e: + print(e) + return 1 + +class eBookForm(FlaskForm): + keywords = StringField('Keywords:', validators=[validators.DataRequired()]) + submit = SubmitField('Submit') + +### +# get_ebook(str keywords) +# +## GET = Frontend page for user to input keywords, see status, and path. +## POST = Wrapper function for Jinja to use. +# +### +@app.route('/get_ebook', methods=['GET', 'POST']) +@login_required +def get_ebook(keywords=None): + form = eBookForm(request.form) + page_title = "Get an eBook" + if request.method == 'POST' and form.validate(): + keywords = request.form['keywords'] + scraper = Scraper() + res_scrape, path = scraper.scrape(keywords) + + if res_scrape != 0: + scraper.driver.quit() + return (res_scrape, "no file") + + scraper.driver.quit() + mailer = Mailer() + res_mail = mailer.sendMail("dshop+amazon_bgh507@kindle.com", "eBook send - cdp test", "sending book", path) + res_mail = mailer.sendMail("cam@depaoli.id.au", "eBook send - cdp", "sending book", path) # test + if res_mail != 0: + return (res_mail, "no file") + page_title = "Success!" + return render_template('get_ebook.html', form=form, page_title=page_title) + elif request.method == 'GET': + return render_template('get_ebook.html', form=form, page_title=page_title) + + +#if __name__ == "__main__": +# scraper = Scraper() +# res, path = scraper.scrape("the outcast taran matharu") +# print("FINISHED, RESULT = ", res) +# scraper.driver.quit() +# +# #path = "C:/Users/cam/Desktop/code/tmp/(Summoner 4) Matharu, Taran - The Outcast.epub" +# #print(path) +# #mailer = Mailer() +# #res = mailer.sendMail("dshop+amazon_bgh507@kindle.com", "eBook send - cdp test", "sending book", path) +# #res = mailer.sendMail("cam@depaoli.id.au", "eBook send - cdp test", "sending book", path)