Skip to content
Snippets Groups Projects
Commit a8f8fb15 authored by zshan2's avatar zshan2
Browse files

Merge branch 'assignment-2.1' into 'master'

<assignment-2.1>

See merge request !1
parents 2bf444c7 0b872741
No related branches found
No related tags found
1 merge request!1<assignment-2.1>
Showing
with 1023 additions and 1 deletion
import requests
import os
import RegularExpressionParser.Parser as Parser
from dotenv import load_dotenv
def safe_url(query):
load_dotenv()
host = os.getenv('SERVER_HOST')
return host + Parser.url_safe(query)
def get(query):
"""
function used to send get request to find one or several documentations
:param query: user input query
:return: json file of the given author or book
"""
print("GET: " + query)
url = safe_url(query)
req = requests.get(url)
print("\nresponse code: " + str(req.status_code) + "\n")
return req.json()
def put(query, json_file):
print("PUT: " + query)
url = safe_url(query)
req = requests.put(url, json=json_file)
print("\nresponse code: " + str(req.status_code) + "\n")
return req.text
def post(query, json_file):
print("POST: " + query)
url = safe_url(query)
req = requests.post(url, json=json_file)
print("\nresponse code: " + str(req.status_code) + "\n")
return req.text
def delete(query):
print("DELETE: " + query)
url = safe_url(query)
req = requests.delete(url)
print("\nresponse code: " + str(req.status_code) + "\n")
return req.text
import typer
import RegularExpressionParser.Parser as Parser
import Crawler.Scrape as Scrape
import API.Functions as API
import os
import json
import re
from dotenv import load_dotenv
app = typer.Typer()
@app.command()
def main_page():
"""
main page of client, directly run this .py to enter and see usage
:return: none
"""
print("==================================================================\n\n"
"Welcome to use goodReads scraper!\n\n"
"This is main page\n\n"
"You can input back to return to this page at anytime\n\n"
"Usage:\n\n"
"get: find a document for a book or an author from database\n\n"
"put: create or update a document in database\n\n"
"post: add one or multiple documents of book or author in database \n\n"
"delete: delete certain document from database\n\n"
"export: export a certain collection from database\n\n"
"exit: end the program\n\n"
"Please enter your command:\n")
order = input()
if order == "get":
get()
elif order == "put":
put()
elif order == "post":
post()
elif order == "delete":
delete()
elif order == "export":
export()
if order == "exit":
exit()
else:
print("Unknown command")
main_page()
def check_and_back(command):
""" check if used in put back and return to main page if so """
if command == "back":
main_page()
def back_to_main_page():
""" wait for user to press enter and then return to the main page """
print("press enter to return to main page")
no_use = input()
main_page()
def scrape(query):
"""
Function used to create scrape request to Server
"""
attributes = query.split("?")[1].split("&")
print(attributes)
url = attributes[0].split("=", 1)[1]
book_num = attributes[1].split("=")[1]
if not book_num.isdigit():
print("max book num must be an integer")
back_to_main_page()
book_num = int(book_num)
author_num = attributes[2].split("=")[1]
if not author_num.isdigit():
print("max author num must be an integer")
back_to_main_page()
author_num = int(author_num)
print('Request received, start scraping')
if not Parser.check_if_address_valid(url):
print("Error: invalid URL")
back_to_main_page()
elif book_num >= 2000 or author_num >= 500 or book_num <= 0 or author_num <= 0:
print("Error, the number of book and author should be positive and lower than 2000/500")
back_to_main_page()
else:
if book_num > 200 or author_num > 50:
print("Warning, maximum of the number of books or authors is large")
result = API.post(query, {})
return result
def get():
""" GET API """
print("==================================================================\n")
print("Please enter your query:\n")
query = input()
check_and_back(query)
result = API.get(query)
typer.echo(result)
print()
back_to_main_page()
def put():
""" PUT API """
print("==================================================================\n")
print("Please enter your query:\n")
query = input()
check_and_back(query)
print("Please select one way to load update info: input or read:\n")
load = input()
check_and_back(load)
if load == "input":
print("Please enter json data:\n")
string = input()
check_and_back(string)
try:
data = json.loads(string)
except ValueError as err:
print(err)
back_to_main_page()
elif load == "read":
load_dotenv()
file_root = os.getenv('FILE_ROOT')
print("Please enter the name of file which is in the file root directory:\n")
name = input()
check_and_back(name)
file_path = r"" + file_root + name
if os.path.isfile(file_path):
with open(file_path, "r") as file:
try:
data = json.load(file)
except ValueError as err:
print(err)
back_to_main_page()
else:
print("Error: file not exist")
back_to_main_page()
else:
print("Unknown command")
back_to_main_page()
if re.search("books", query):
if len(data) <= 1:
print("For posting multiple books, please input json file has more than 1 books\n")
main_page()
else:
if len(data) > 1:
print("Cannot post more than one book with 'post book/author', please use 'post books/authors'")
main_page()
result = API.put(query, data)
typer.echo(result)
print()
back_to_main_page()
def post():
""" POST API """
print("==================================================================\n")
print("Please enter your query:\n")
query = input()
check_and_back(query)
if re.search("scrape", query):
result = scrape(query)
typer.echo(result)
print()
back_to_main_page()
else:
load_dotenv()
file_root = os.getenv('FILE_ROOT')
print("Please enter the name of file which is in the file root directory:\n")
name = input()
check_and_back(name)
file_path = r"" + file_root + name
if os.path.isfile(file_path):
with open(file_path, "r") as file:
try:
data = json.load(file)
except ValueError as err:
print(err)
back_to_main_page()
else:
print("Error: file not exist")
back_to_main_page()
result = API.post(query, data)
typer.echo(result)
print()
back_to_main_page()
def delete():
""" DELETE API """
print("==================================================================\n")
print("Please enter your query:\n")
query = input()
check_and_back(query)
result = API.delete(query)
typer.echo(result)
print()
back_to_main_page()
def export():
""" export the book or author collection """
print("==================================================================\n")
print("Which collection do you want to export: books or authors?\n")
collection = input()
check_and_back(collection)
if collection == "books":
json_file = API.get("book")
load_dotenv()
file_root = os.getenv('FILE_ROOT')
with open(file_root + "books" + ".json", "w") as output:
output.write(json_file)
print("books.json is stored at " + file_root)
back_to_main_page()
elif collection == "authors":
json_file = API.get("author")
load_dotenv()
file_root = os.getenv('FILE_ROOT')
with open(file_root + "authors" + ".json", "w") as output:
output.write(json_file)
print("authors.json is stored at " + file_root)
back_to_main_page()
elif collection == "back":
back_to_main_page()
else:
print("Unknown collection")
back_to_main_page()
if __name__ == "__main__":
app()
class AuthorPackage:
def __init__(self, name, url, id, rating, rating_count,
review_count, image_url, related_authors, author_books):
self.name = name
self.url = url
self.id = id
self.rating = rating
self.rating_count = rating_count
self.review_count = review_count
self.image_url = image_url
self.related_authors = related_authors
self.author_books = author_books
class BookPackage:
def __init__(self, url, title, id, ISBN13, author_url, author, rating,
rating_count, review_count, image_url, similar_books):
self.url = url
self.title = title
self.id = id
self.ISBN13 = ISBN13
self.author_url = author_url
self.author = author
self.rating = rating
self.rating_count = rating_count
self.review_count = review_count
self.image_url = image_url
self.similar_books = similar_books
import Crawler.Spider as Spider
import time
from DataBase import mongoDB as db
def scrape_api(url_api, book_num_api, author_num_api):
print('Request received, start scraping')
book_dict_api = {}
author_dict_api = {}
similar_book_dict_api = {}
count_api = 1
print("Crawling book #" + str(count_api))
start_page_api = Spider.Spider(url_api)
start_page_api.crawl(book_dict_api, author_dict_api, similar_book_dict_api)
while len(book_dict_api) < int(book_num_api) or len(author_dict_api) < int(author_num_api):
all_related_books_api = list(similar_book_dict_api.items())
for book_url_pair_api in all_related_books_api:
if len(book_dict_api) >= int(book_num_api) and len(author_dict_api) >= int(author_num_api):
break
book_name_api = book_url_pair_api[0]
if book_name_api in book_dict_api:
pass
else:
count_api += 1
print("Crawling book #" + str(count_api))
book_url_api = book_url_pair_api[1]
book_spider_api = Spider.Spider(book_url_api)
book_spider_api.crawl(book_dict_api, author_dict_api, similar_book_dict_api)
# since the network delay from asia to goodread.com is already very high,
# a small amount of time of sleep is used here.
time.sleep(0.2)
db.insert_dicts(book_dict_api, 0)
db.insert_dicts(author_dict_api, 1)
print("Scraping completed")
\ No newline at end of file
import requests
from bs4 import BeautifulSoup
import Crawler.BookPack as BookPack
import Crawler.AuthorPack as AuthorPack
class Spider:
def __init__(self, url):
self.url = url
self.soup = None
def crawl(self, book_dict, author_dict, similar_book_dict):
"""
function used to scrape data of a certain book
:param book_dict:
:param author_dict:
:param similar_book_dict:
:return: none
"""
header = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36"}
url = self.url
res = requests.get(url, headers=header)
self.soup = BeautifulSoup(res.text, "lxml")
# parsing data of current book
titleAndAuthor = self.soup.title.string.split(" by ")
title = titleAndAuthor[0]
bookID = ((self.url.split("show/")[1]).split("-")[0]).split(".")[0]
rating_raw = self.soup.find("span", {"itemprop": "ratingValue"}).text
rating = float(str(rating_raw).replace(" ", "").replace("\n", ""))
rating_count = int(self.soup.find("meta", itemprop="ratingCount")["content"])
review_count = int(self.soup.find("meta", itemprop="reviewCount")["content"])
ISBN13_raw = self.soup.find("meta", property="books:isbn")
if ISBN13_raw["content"] == "null":
print("Warning: book '" + title + "' has no ISBN code")
ISBN13 = 0
else:
ISBN13 = int(ISBN13_raw["content"])
imageURL = self.soup.find("img", id="coverImage")["src"]
author = titleAndAuthor[1]
author_page_link = self.soup.find("a", {"class": "authorName"})["href"]
authorID = (author_page_link.split("show/")[1]).split(".")[0]
see_more_link = self.soup.find("a", {"class": "actionLink right seeMoreLink"})["href"]
res_similar = requests.get(see_more_link, headers=header)
# parsing similar books of current book
similar_soup = BeautifulSoup(res_similar.text, 'lxml')
similar_books_and_authors = similar_soup.find_all("span", itemprop="name")
similar_books = []
for i in range(len(similar_books_and_authors)):
if i % 2 == 0:
# a book
similar_book = similar_books_and_authors[i]
similar_book_name = similar_book.text
book_link = "https://www.goodreads.com" + similar_book.find_parent("a")["href"]
similar_books.append(similar_book_name)
similar_book_dict[similar_book_name] = book_link
bookPKT = BookPack.BookPackage(url, title, bookID, ISBN13, author_page_link, author, rating, rating_count,
review_count, imageURL, similar_books)
book_dict[title] = bookPKT
# parse data of the author of current book
res_author = requests.get(author_page_link, headers=header)
author_soup = BeautifulSoup(res_author.text, 'lxml')
author_rating = float(author_soup.find("span", {"class": "average"}).text)
author_rating_count = float(author_soup.find("span", {"class": "value-title"})["title"])
author_review_count = float((author_soup.find_all("span", {"class": "value-title"})[1])["title"])
author_photo_url = author_soup.find("img", {"itemprop": "image"})["src"]
related_links = author_soup.find("div", {"class": "hreview-aggregate"}).findChildren("a", recursive=False)
related_authors_link = "https://www.goodreads.com" + related_links[1]["href"]
res_related_authors = requests.get(related_authors_link, headers=header)
related_author_soup = BeautifulSoup(res_related_authors.text, 'lxml')
related_authors = []
related_authors_raw = related_author_soup.find_all("span", {"itemprop": "name"})
for the_author in related_authors_raw:
related_author_name = the_author.text
if related_author_name == author:
pass
else:
related_authors.append(related_author_name)
author_books_link = "https://www.goodreads.com" + related_links[0]["href"]
res_author_books = requests.get(author_books_link, headers=header)
author_books_soup = BeautifulSoup(res_author_books.text, "lxml")
author_books_raw = author_books_soup.find_all("a", {"class": "bookTitle"})
author_books = []
for book in author_books_raw:
book_name = book.findChildren("span", recursive=False)
if not book_name:
pass
else:
book_name = book_name[0].text
author_books.append(book_name)
author_pkt = AuthorPack.AuthorPackage(author, author_page_link, authorID, author_rating, author_rating_count,
author_review_count, author_photo_url, related_authors, author_books)
author_dict[author] = author_pkt
def parse_book_dict_to_json(dictionary):
item_list = list(dictionary.items())
return_list = []
for items in item_list:
book_package = items[1]
book_json = parse_book_to_json(book_package)
return_list.append(book_json)
return return_list
def parse_author_dict_to_json(dictionary):
item_list = list(dictionary.items())
return_list = []
for items in item_list:
author_package = items[1]
author_json = parse_author_to_json(author_package)
return_list.append(author_json)
return return_list
def parse_book_to_json(bookPKT):
book_json = {
"type": "book",
"book_url": bookPKT.url,
"title": bookPKT.title,
"id": bookPKT.id,
"ISBN": bookPKT.ISBN13,
"author_url": bookPKT.author_url,
"author": bookPKT.author,
"rating": bookPKT.rating,
"rating_count": bookPKT.rating_count,
"review_count": bookPKT.review_count,
"image_url": bookPKT.image_url,
"similar_books": bookPKT.similar_books
}
return book_json
def parse_author_to_json(authorPKT):
author_json = {
"type": "author",
"name": authorPKT.name,
"author_url": authorPKT.url,
"id": authorPKT.id,
"rating": authorPKT.rating,
"rating_count": authorPKT.rating_count,
"review_count": authorPKT.review_count,
"image_url": authorPKT.image_url,
"related_authors": authorPKT.related_authors,
"author_books": authorPKT.author_books
}
return author_json
import json
import DataBase.JsonParser as parser
import os
from dotenv import load_dotenv
from pymongo import MongoClient
def get_db():
""" return the database of goodReads crawler """
load_dotenv()
url = os.getenv('MONGODB_URL')
client = MongoClient(url)
return client.get_database("crawler_db")
def insert_document(docu, opt):
db = get_db()
if opt == 0:
records = db.test_books
elif opt == 1:
records = db.test_authors
else:
print("failed to get json file: wrong opt for selecting collection")
return
records.insert_one(docu)
def insert_dicts(dictionary, opt):
"""
Insert books or authors collection in database
:param dictionary: the dictionary to insert to collection
:param opt: =0 means books collection; =1 means authors collection
:return: no return value
"""
db = get_db()
if opt == 0:
records = db.test_books
elif opt == 1:
records = db.test_authors
else:
print("failed to get json file: wrong opt for selecting collection")
return
json_list = []
if opt == 0:
json_list = parser.parse_book_dict_to_json(dictionary)
elif opt == 1:
json_list = parser.parse_author_dict_to_json(dictionary)
records.insert_many(json_list)
def update_dicts(opt, identifier, content):
"""
Update documentations in a given collection
:param opt: =0 means books collection; =1 means authors collection
:param identifier: the identifier of the documentation we want to find
:param content: the content to update
:return: no return value
"""
db = get_db()
if opt == 0:
records = db.test_books
elif opt == 1:
records = db.test_authors
else:
print("failed to get json file: wrong opt for selecting collection")
return
result = records.update_one(
identifier,
{"$set": content},
upsert=True
)
print("matched documentation: " + str(result.matched_count))
print("modified documentation: " + str(result.modified_count))
def get_documents_json(opt, identifier):
"""
find documentations specified by the identifier and output a json data
:param opt: =0 means books collection; =1 means authors collection
:param identifier: identifier of the documents we want, {} means locate the whole collection
:return: json file of selected documentations
"""
db = get_db()
if opt == 0:
records = db.test_books
elif opt == 1:
records = db.test_authors
else:
print("failed to get json file: wrong opt for selecting collection")
return json.dumps({})
data = records.find(identifier)
file = {}
if opt == 0:
typeName = "books"
else:
typeName = "authors"
file[typeName] = []
for item in data:
item.pop("_id")
file[typeName].append(item)
return json.dumps(file)
def download_collection(opt, identifier, name):
"""
download books collection or authors collection
:param opt: =0 means books collection; =1 means authors collection
:param identifier: identifier of the documents we want to download;
empty({}) means selected all documents in given collection
:param name: file name of downloaded json
:return: JSON file of the collection
"""
json_file = get_documents_json(opt, identifier)
load_dotenv()
file_root = os.getenv('FILE_ROOT')
with open(file_root + name + ".json", "w") as output:
output.write(json_file)
def clean(opt, identifier):
"""
delete specific documents in given collection
:param opt: =0 means books collection; =1 means authors collection
:param identifier: identifier of the documents we want to delete;
empty({}) means selected all documents in given collection
:return: no return value
"""
db = get_db()
if opt == 0:
records = db.test_books
elif opt == 1:
records = db.test_authors
else:
print("failed to get json file: wrong opt for selecting collection")
return
records.delete_many(identifier)
## SP21-CS242 Assignment2
\ No newline at end of file
## SP21-CS242 Assignment2: GoodReads Scraper
This project is for sp21 CS242 assignment2
Current version is 2.1
###Function
1. Scrap data of books and authors from GoodReads
2. Store scraped data on cloud or local files
3. Query cloud for certain documents of books or authors
###Composition:
####Client: Command line interface (interactive)
####Server: Simple server based on Flask
####Database: MongoDB
\ No newline at end of file
import re
def check_if_address_valid(address):
"""
Take input address and scrape date
:param addr:
:return:
"""
x = re.search("^https://www.goodreads.com/book/show/[0-9]+", address)
if x:
return True
else:
return False
def parse_query_to_url(query):
elements = re.findall("[0-9A-Za-z_\"><.]+", query)
count = len(elements)
if count == 2:
# can only be A.B:C or wrong
if re.search("^[0-9a-zA-Z_.]+:[0-9a-zA-Z_\".]", query):
return url_safe(elements[0] + "%3A" + elements[1])
else:
print("Invalid query1.")
return ""
elif count == 3:
# a pair and one of [NOT, >, <].
if re.search("^[0-9a-zA-Z_.]+:\s*NOT\s*[0-9a-zA-Z_\".]", query):
return url_safe(elements[0] + "%3A" + "NOT" + elements[2])
elif re.search("^[0-9a-zA-Z_.]+:\s*>\s*[0-9a-zA-Z_\".]", query):
return url_safe(elements[0] + "%3A" + ">" + elements[2])
elif re.search("^[0-9a-zA-Z_.]+:\s*<\s*[0-9a-zA-Z_\".]", query):
return url_safe(elements[0] + "%3A" + "<" + elements[2])
else:
print("Invalid query2.")
return ""
elif 5 <= count <= 7:
# AND or OR operator
if re.search(".*\sAND\s.*", query):
parts = query.split(" AND ")
return parse_query_to_url(parts[0]) + "%26AND%26" + parse_query_to_url(parts[1])
elif re.search(".*\sOR\s.*", query):
parts = query.split(" OR ")
return parse_query_to_url(parts[0]) + "%26OR%26" + parse_query_to_url(parts[1])
else:
print("Invalid query3.")
return ""
def parse_query_to_json(pair):
elements = re.findall("[0-9A-Za-z\"_.]+", pair)
count = len(elements)
if count != 2 and count != 3:
print("Failed to parse query: invalid args number")
return {"wrong": "True"} # will never be founded in database
elif count == 3:
# A.B: NOT C
if re.search("^[0-9.]*$", elements[2]) and not re.search("id", elements[0]):
return {elements[0].split(".")[1]: {"$ne": float(elements[2])}}
else:
return {elements[0].split(".")[1]: {"$not": {"$regex": elements[2], "$options": "i"}}}
else:
# can be A.B: C or A.B: "C"
if re.search(":", pair):
if re.search("^[0-9.]*$", elements[1]) and not re.search("id", elements[0]):
return {elements[0].split(".")[1]: float(elements[1])}
else:
if re.search("\".*\"", elements[1]):
return {elements[0].split(".")[1]: elements[1]}
else:
return {elements[0].split(".")[1]: {"$regex": elements[1], "$options": "i"}}
else:
if re.search(">", pair):
return {elements[0].split(".")[1]: {"$gt": float(elements[1])}}
elif re.search("<", pair):
return {elements[0].split(".")[1]: {"$lt": float(elements[1])}}
else:
print("Failed to parse query: unknown operator")
return {"wrong": "True"}
def url_safe(element):
return element.replace("\"", "%22").replace(">", "%3E").replace("<", "%3C").replace("&", "%26").replace(":", "%3A")
import DataBase.mongoDB as DataBase
import RegularExpressionParser.Parser as Parser
import re
import Crawler.Scrape as Scrape
from flask import Flask
from flask import request
from flask import abort
from flask import jsonify
from urllib.parse import urlparse, parse_qs
app = Flask(__name__)
# app.config["DEBUG"] = True
@app.route("/", methods=['GET'])
def home():
""" homepage of server """
return "200: successfully connected to home page\n"
@app.route("/api/<collection>/", methods=["GET", "PUT", "POST", "DELETE"])
def data_base(collection):
""" data base page of server """
print("\n===============================\n")
print(collection)
print("\n===============================\n")
if request.method == "GET":
if collection == "book":
url_parsed = urlparse(request.url)
qs_parsed = parse_qs(url_parsed.query)
if qs_parsed == {}:
return jsonify(DataBase.get_documents_json(0, {}))
return jsonify(search_document(["book.id:" + qs_parsed["id"][0]]))
elif collection == "author":
url_parsed = urlparse(request.url)
qs_parsed = parse_qs(url_parsed.query)
if qs_parsed == {}:
return jsonify(DataBase.get_documents_json(1, {}))
return jsonify(search_document(["author.id:" + qs_parsed["id"][0]]))
elif collection == "search":
url_parsed = urlparse(request.url)
qs_parsed = parse_qs(url_parsed.query)
result = jsonify(search_document(qs_parsed["q"][0].split("&")))
return jsonify(result)
else:
abort(404)
elif request.method == "PUT":
if request.headers["Content-Type"] != "application/json":
abort(415)
json_update_info = request.json
if collection == "book":
opt = 0
elif collection == "author":
opt = 1
else:
abort(404)
DataBase.update_dicts(opt, request.args.to_dict(), json_update_info)
return "200: PUT succeeded"
elif request.method == "POST":
if request.headers["Content-Type"] != "application/json":
abort(415, "content should be JSON file")
json_file = request.json
if collection == "books":
DataBase.insert_dicts(json_file, 0)
elif collection == "authors":
DataBase.insert_dicts(json_file, 1)
elif collection == "book":
DataBase.insert_document(json_file, 0)
elif collection == "author":
DataBase.insert_document(json_file, 1)
elif collection == "scrape":
param = request.args.to_dict()
url = param["url"]
max_book = param["max_book"]
max_author = param["max_author"]
Scrape.scrape_api(url, max_book, max_author)
return "200: new data has been added to database"
else:
abort(404)
return "200: POST succeeded"
elif request.method == "DELETE":
identifier = request.args.to_dict()
print(identifier)
if collection == "book":
opt = 0
elif collection == "author":
opt = 1
else:
abort(404, "Unknown Collection to DELETE")
DataBase.clean(opt, identifier)
return "200: DELETE succeeded"
def search_document(identifiers):
""" function used to find one or several document in database """
if len(identifiers) == 1:
json_idt = Parser.parse_query_to_json(identifiers[0])
print(json_idt)
if re.search("^book.*", identifiers[0]):
return DataBase.get_documents_json(0, json_idt)
else:
return DataBase.get_documents_json(1, json_idt)
elif len(identifiers) == 3:
if re.search("^book.*", identifiers[0]):
if re.search("^author.*", identifiers[2]):
print("Failed to find documentation: two statements are not pointing to the same collection")
return {}
else:
opt = 0
else:
if re.search("^book.*", identifiers[2]):
print("Failed to find documentation: two statements are not pointing to the same collection")
return {}
else:
opt = 1
json_idt1 = Parser.parse_query_to_json(identifiers[0])
json_idt2 = Parser.parse_query_to_json(identifiers[2])
if identifiers[1] == "AND":
exp = {"$and": [json_idt1, json_idt2]}
elif identifiers[1] == "OR":
exp = {"$or": [json_idt1, json_idt2]}
else:
print("Failed to parse query: unknown operator for identifiers[1]")
return {}
print("exp:")
print(exp)
return DataBase.get_documents_json(opt, exp)
else:
return "Error, unknown identifiers"
if __name__ == "__main__":
app.run()
import unittest
import os
import DataBase.mongoDB as db
import json
from pymongo import MongoClient
from dotenv import load_dotenv
class DataBaseTests(unittest.TestCase):
def setUp(self):
file_path = os.path.dirname(__file__) + r"\testData.json"
with open(file_path) as file:
self.test_data = json.load(file)
self.test_data1 = self.test_data["test_data1"]
def tearDown(self):
data_base = db.get_db()
record = data_base.test_books
record.delete_many({})
def test_upload(self):
data_base = db.get_db()
record = data_base.test_books
db.insert_document(self.test_data1, 0)
self.assertEqual(record.count_documents({}), 1)
def test_download(self):
db.insert_document(self.test_data1, 0)
json_file = json.loads(db.get_documents_json(0, {"title": "Becoming"}))
no_id_test_data1 = self.test_data1
no_id_test_data1.pop("_id")
self.assertEqual(json_file["books"][0], no_id_test_data1)
def test_update(self):
data_base = db.get_db()
record = data_base.test_books
db.insert_document(self.test_data1, 0)
db.update_dicts(0, {"title": "Becoming"}, {"rating_count": 1000000})
self.assertEqual(record.count_documents({"rating_count": self.test_data1["rating_count"]}), 0)
self.assertEqual(record.count_documents({"rating_count": 1000000}), 1)
def test_delete(self):
data_base = db.get_db()
record = data_base.test_books
db.insert_document(self.test_data1, 0)
db.clean(0, {})
self.assertEqual(record.count_documents({}), 0)
if __name__ == '__main__':
unittest.main()
# db.clean(0, {})
import RegularExpressionParser.Parser as Parser
import unittest
class DataBaseTests(unittest.TestCase):
def test_safe_url(self):
str_query = "search?q=book.rating_count<1000000&AND&book.rating>4.4"
expected_url = "http://127.0.0.1:5000/api/search?q=book.rating_count%3C1000000%26AND%26book.rating%3E4.4"
output_url = "http://127.0.0.1:5000/api/" + Parser.url_safe(str_query)
self.assertEqual(output_url, expected_url)
def test_valid_goodreads_url(self):
url = "https://www.goodreads.com/book/show/35133922-educated"
self.assertTrue(Parser.check_if_address_valid(url))
def test_invalid_goodreads_url(self):
url1 = "https://www.google.com/book/show/35133922-educated"
self.assertFalse(Parser.check_if_address_valid(url1))
url2 = "https://www.goodreads.com/book/show/over-35133922-educated"
self.assertFalse(Parser.check_if_address_valid(url2))
url3 = "https://www.goodreads.com/book/show/educated"
self.assertFalse(Parser.check_if_address_valid(url3))
def test_parse_query_to_url(self):
query = "book.id:12345678 AND book.rating: > 4.80"
result_url = Parser.parse_query_to_url(query)
expect_url = "book.id%3A12345678%26AND%26book.rating%3A%3E4.80"
self.assertEqual(result_url, expect_url)
def test_parse_query_to_json(self):
query = "book.id: NOT 12345678"
expect_json = {"id": {"$not": {"$regex": "12345678", "$options": "i"}}}
output_json = Parser.parse_query_to_json(query)
self.assertEqual(output_json, expect_json)
import unittest
import os
import json
import DataBase.mongoDB as db
import requests
import Server.SimpleServer
class DataBaseTests(unittest.TestCase):
def setUp(self):
file_path = os.path.dirname(__file__) + r"\testData.json"
with open(file_path) as file:
self.test_data = json.load(file)
self.test_data1 = self.test_data["test_data1"]
def tearDown(self):
data_base = db.get_db()
record = data_base.test_books
record.delete_many({})
def test_valid_get(self):
db.insert_document(self.test_data1, 0)
url = "http://127.0.0.1:5000/api/book?id=38746485"
res = requests.get(url)
self.assertEqual(res.status_code, 200)
def test_search(self):
db.insert_document(self.test_data1, 0)
url1 = "http://127.0.0.1:5000/api/search?q=book.id%3A38746485"
res1 = requests.get(url1)
self.assertEqual(res1.status_code, 200)
url2 = "http://127.0.0.1:5000/api/search?q=book.id%3A38746485%26book.rating%3E4.90"
res2 = requests.get(url2)
self.assertEqual(res2.json(), {})
def test_invalid_get_not_exist(self):
db.insert_document(self.test_data1, 0)
url = "http://127.0.0.1:5000/api/book?id=12345678"
res = requests.get(url)
self.assertEqual(res.status_code, 200)
self.assertEqual(res.json(), {'books': []})
def test_invalid_get_wrong_collection_name(self):
db.insert_document(self.test_data1, 0)
url = "http://127.0.0.1:5000/api/bookssss?id=38746485"
res = requests.get(url)
self.assertEqual(res.status_code, 200)
def test_valid_put(self):
db.insert_document(self.test_data1, 0)
url = "http://127.0.0.1:5000/api/book?id=38746485"
update_info = {"rating_count": 1000000}
res = requests.put(url, json=update_info)
self.assertEqual(res.status_code, 200)
def test_insert_put(self):
url = "http://127.0.0.1:5000/api/book?id=38746485"
update_info = {"rating_count": 1000000}
res = requests.put(url, json=update_info)
self.assertEqual(res.status_code, 200)
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment