## cam test cases # - non-english? # - different formats? # - no ISBN? # - nothing found? # -- remove hardcoded password from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from webdriver_manager.chrome import ChromeDriverManager from selenium.common.exceptions import NoSuchElementException, TimeoutException from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.common.by import By import requests import os import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders, utils from flask import render_template, request from flask_wtf import FlaskForm from wtforms import StringField, SubmitField, validators from flask_login import login_required from main import app class Scraper: def __init__(self): # Initialize driver & options self.waitTimeout = 10 # default timeout for explicit waits self.defaultDownloadDir = "/books" # default download directory, compose maps to /export/docker/storage/books # Set Chrome options for headless mode self.options = Options() self.options.add_argument('--headless') self.options.add_argument('--disable-gpu') self.options.add_argument('--no-sandbox') self.options.add_argument('--disable-dev-shm-usage') # Use ChromeDriverManager to manage `chromedriver` installation self.service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(options=self.options, service=self.service) ### ## Scraper.scrape(str keywords) # ## Main func for scraping the page & downlading the ebook # # - return 0 = ok; file is sitting in /export/docker/storage/books//book # - return 1 = no books found # - return 2 = something found, but not going to download (usually will be a broken mirror, PDF, or non-English). manually intervene. ### def scrape(self, keywords): terms = keywords.replace(" ", "+") # url formatting self.driver.get(f"https://libgen.is/fiction/?q={terms}&criteria=&language=&format=") # check if no results at all try: # if we find the "No files were found." text, return 1. if we except on an explicit wait timeout, pass. WebDriverWait(self.driver, 1.5).until(EC.visibility_of_element_located((By.XPATH,'//html/body/p[text()="No files were found."]'))) return 1 except TimeoutException: pass best = {"index": -1, "ISBN": False, "format": "?", "size": -1.00} catalogTable = WebDriverWait(self.driver, self.waitTimeout).until(EC.visibility_of_element_located((By.XPATH, '//html/body/table/tbody'))) for index, row in enumerate(catalogTable.find_elements(By.XPATH, './/tr')): entry = {"index": -1, "ISBN": False, "format": "?", "size": -1.00} # initial check; ensure the book is in English. otherwise go to next book try: row.find_element(By.XPATH, './/td[text()="English"]') except NoSuchElementException: continue # set ISBN if it exists try: if row.find_element(By.XPATH, './/p[@class="catalog_identifier"]'): entry["ISBN"] = True except NoSuchElementException: pass # look for epub or mobi, otherwise go to next book try: fileTdStr = row.find_element(By.XPATH, './/td[contains(text(), "EPUB") or contains(text(), "MOBI")]').text except NoSuchElementException: continue # set format entry["format"] = fileTdStr.split("/")[0][:-1] # set size entry["size"] = fileTdStr.split("/")[1][1:] if entry["size"].endswith("Kb"): entry["size"] = float(entry["size"][:-3]) / 1000 elif entry["size"].endswith("Mb"): entry["size"] = float(entry["size"][:-3]) # set index entry["index"] = index # set best best = self.compareEntries(entry, best) # now that we have a selected entry, download it. entry = catalogTable.find_elements(By.XPATH, './/tr')[best["index"]] try: mirror1 = entry.find_element(By.XPATH, './/ul[@class="record_mirrors_compact"]/li[1]/a').get_attribute('href') mirror2 = entry.find_element(By.XPATH, './/ul[@class="record_mirrors_compact"]/li[2]/a').get_attribute('href') except NoSuchElementException: pass # download the book # get both mirror links self.driver.get(mirror1) downloadLink1 = WebDriverWait(self.driver, self.waitTimeout).until(EC.element_to_be_clickable((By.XPATH, '//a[contains(text(), "GET")]'))).get_attribute('href') #self.driver.get(mirror2) #downloadLink2 = WebDriverWait(self.driver, self.waitTimeout).until(EC.element_to_be_clickable((By.XPATH, '//h2[contains(text(), "GET")]/../'))).get_attribute('href') res, path = self.download(downloadLink1) if res == 0: return (0, path) else: ## TODO: fix this to use both mirrors... #if self.download(downloadLink2) == 0: # return 0 #else: # return 2 return 2 ### ## Scraper.compareEntries(self, dict entry) # - entry = {"index": int, "ISBN": bool, "format": str(EPUB/MOBI), "size": float} # ## Utility function for scrape() to compare book entries against cam's algorithm (below) # ## cam's algorithm here is to check for: # - anything with a listed ISBN; then check if EPUB; then check for biggest (filesize) EPUB (bc usually has extras and/or images/maps/etc # - if no listed ISBN, same process. ISBN listings with lower filesizes take precedence. # - otherwise grab the biggest MOBI. if no epub nor mobi, return 2. # ### def compareEntries(self, entry1, entry2): ## if index == -1 then its the temp one, just return the other if entry1["index"] == -1: return entry2 if entry2["index"] == -1: return entry1 ## compare ISBNs if entry1["ISBN"] and not entry2["ISBN"]: return entry1 elif not entry1["ISBN"] and entry2["ISBN"]: return entry2 ## compare formats if entry1["format"] == "EPUB" and entry2["format"] == "EPUB": ## both epubs, so check filesize if entry1["size"] > entry2["size"]: return entry1 elif entry1["size"] < entry2["size"]: return entry2 # if the sizes are the same && both epubs, then fuck it just pick one.. elif entry1["size"] == entry2["size"]: return entry1 elif entry1["format"] == "EPUB" and not entry2["format"] == "EPUB": return entry1 elif not entry1["format"] == "EPUB" and entry2["format"] == "EPUB": return entry2 elif entry1["format"] == "MOBI" and entry2["format"] == "MOBI": ## both mobis, so check filesize if entry1["size"] > entry2["size"]: return entry1 elif entry1["size"] < entry2["size"]: return entry2 # if the sizes are the same && both mobis, then fuck it just pick one.. elif entry1["size"] == entry2["size"]: return entry1 ### # Scraper.download(self, str url) # ## Utility function for scrape() to download a file. # Does some other things, namely checking dirs and filestream/headers. ### def download(self, url): target_dir = self.defaultDownloadDir + "/tmp" # TODO: tmp for now, later make this an author. os.makedirs(target_dir, exist_ok=True) response = requests.get(url, stream=True) if response.status_code == 200: # get filename from url, or from headers if exists filename = url.split("/")[-1] if 'content-disposition' in response.headers: # attempt to extract filename from headers filename = response.headers["content-disposition"].split("filename=")[-1].strip('"') print('content-disposition:', filename) file_path = target_dir + "/" + filename # write the file print(file_path) with open(file_path, 'wb') as f: print('writing file...') for chunk in response.iter_content(8192): f.write(chunk) return (0, file_path) else: return 2 class Mailer: def __init__(self): # our server uses starttls self.smtp_server = "depaoli.id.au" self.port = 587 self.email = "cam@depaoli.id.au" self.password = "echo $1 | grep 1" ### # Mailer.sendMail(self, str to, str subject, str body, str attachment_path) # ## Utility function for sending an email with an attachment. # # - return 0 = ok # - return 1 = error; usually in sending. ### def sendMail(self, to, subject, body, attachment_path): print(f"Sending email to {to} with subject {subject} and attachment {attachment_path}.") msg = MIMEMultipart() msg['From'] = self.email msg['To'] = to msg["Date"] = utils.formatdate(localtime=True) msg['Subject'] = subject msg['Message-Id'] = utils.make_msgid(domain='depaoli.id.au') msg.attach(MIMEText(body, 'plain')) # attach the file with open(attachment_path, 'rb') as f: attachment = MIMEBase('application', 'epub+zip') attachment.set_payload(f.read()) #Content-Type: application/epub+zip #Content-Transfer-Encoding: base64 #Content-Disposition: attachment; # filename="Summoner: : The Battlemage: Book 3 - Taran Matharu.epub" #MIME-Version: 1.0 # encode data to base64 encoders.encode_base64(attachment) # add headers to indicate content-disposition of attachment attachment.add_header('Content-Disposition', f'attachment;\n filename="{attachment_path.split("/")[-1]}"') msg.attach(attachment) # send the email try: server = smtplib.SMTP(self.smtp_server, self.port) server.starttls() server.login(self.email, self.password) server.sendmail(self.email, to, msg.as_string()) server.quit() return 0 except Exception as e: print(e) return 1 class eBookForm(FlaskForm): keywords = StringField('Keywords:', validators=[validators.DataRequired()]) submit = SubmitField('Submit') ### # get_ebook(str keywords) # ## GET = Frontend page for user to input keywords, see status, and path. ## POST = Wrapper function for Jinja to use. # ### @app.route('/get_ebook', methods=['GET', 'POST']) @login_required def get_ebook(keywords=None): form = eBookForm(request.form) page_title = "Get an eBook" if request.method == 'POST' and form.validate(): keywords = request.form['keywords'] scraper = Scraper() res_scrape, path = scraper.scrape(keywords) if res_scrape != 0: scraper.driver.quit() return (res_scrape, "no file") scraper.driver.quit() mailer = Mailer() res_mail = mailer.sendMail("dshop+amazon_bgh507@kindle.com", "eBook send - cdp test", "sending book", path) res_mail = mailer.sendMail("cam@depaoli.id.au", "eBook send - cdp", "sending book", path) # test if res_mail != 0: return (res_mail, "no file") page_title = "Success!" return render_template('get_ebook.html', form=form, page_title=page_title) elif request.method == 'GET': return render_template('get_ebook.html', form=form, page_title=page_title) #if __name__ == "__main__": # scraper = Scraper() # res, path = scraper.scrape("the outcast taran matharu") # print("FINISHED, RESULT = ", res) # scraper.driver.quit() # # #path = "C:/Users/cam/Desktop/code/tmp/(Summoner 4) Matharu, Taran - The Outcast.epub" # #print(path) # #mailer = Mailer() # #res = mailer.sendMail("dshop+amazon_bgh507@kindle.com", "eBook send - cdp test", "sending book", path) # #res = mailer.sendMail("cam@depaoli.id.au", "eBook send - cdp test", "sending book", path)