from multiprocessing.pool import ThreadPool
from random import choice
from typing import Callable, List, Optional, Union
import pandas as pd
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from requests.exceptions import (ConnectionError, HTTPError, ReadTimeout,
RequestException, Timeout, TooManyRedirects,
URLRequired)
[docs]def get_table(soup: BeautifulSoup, table_num: int = 2, row_start: int = 1, row_end: int = 5) -> pd.DataFrame:
"""
Pulls out a table from a beautifulsoup html.
Format is: Row labels with 'Average' in the name.
The table returns just the rows 1-4 inclusive.
This was for the type of tables coming from the climate page.
:param soup: BeautifulSoup object.
:type soup: BeautifulSoup
:param table_num: The table number to pull out.
:type table_num: int
:param row_start: The row to start pulling from.
:type row_start: int
:param row_end: The row to end pulling from.
:type row_end: int
:return: a pandas dataframe of the table.
:rtype: pd.DataFrame
"""
try:
table = soup.find_all('table')[table_num]
# Get headers of table
t_headers = []
for th in table.find_all("th"):
# remove any newlines and extra spaces from left and right
t_headers.append(th.text.replace('\n', ' ').strip())
# Get all the rows of table
table_data = []
# find all tr's from table's tbody
for tr in table.find_all("tr"):
t_row = {}
# find all td's in tr and zip it with t_header
for td, th in zip(tr.find_all("td"), t_headers):
val = td.text.replace('\n', '').strip()
if val == '---':
t_row[th] = '0'
else:
t_row[th] = val
table_data.append(t_row)
# Put the data for the table with his heading.
return pd.DataFrame(table_data[row_start:row_end])
except:
return []
[docs]def find_html_class(soup: BeautifulSoup, class_name: str) -> List[BeautifulSoup]:
"""
Finds all elements with a given class name.
:param soup: BeautifulSoup object.
:type soup: BeautifulSoup
:param class_name: The class name to find.
:type class_name: str
:return: A list of elements with the given class name.
:rtype: List[BeautifulSoup]
"""
return soup.find_all(class_=class_name)
[docs]def find_in_html(soup: BeautifulSoup, element: Union[str, list]) -> Optional[BeautifulSoup]:
"""
Finds an element in a BeautifulSoup object.
:param soup: BeautifulSoup object.
:type soup: BeautifulSoup
:param element: The element to find.
:type element: Union[str, list]
:return: The element if found, else None.
:rtype: Optional[BeautifulSoup]
"""
return soup.find_all(element)
[docs]def find_id_in_html(soup: BeautifulSoup, id: str) -> Optional[BeautifulSoup]:
"""
Finds an element with a given id in a BeautifulSoup object.
:param soup: BeautifulSoup object.
:type soup: BeautifulSoup
:param id: The id to find.
:type id: str
:return: The element if found, else None.
:rtype: Optional[BeautifulSoup]
"""
return soup.find_all('div', {'id': id})
[docs]def proxy_generator() -> dict:
"""
This function scrapes a list of a free proxies from:
https://sslproxies.org/
It then returns a random proxy from the list.
:return: A random proxy from the list.
:rtype: dict
"""
# Where we get the proxies
soup = scrape_page("https://sslproxies.org/")
# Creates the url
create_url = lambda x: 'http://'+x[0]+':'+x[1]
# Strip text from soup element
get_text = lambda x: x.text
# Get elements from proxy list
proxy_element1 = map(get_text, soup.findAll('td')[::8])
proxy_element2 = map(get_text, soup.findAll('td')[1::8])
proxies = list(zip(proxy_element1, proxy_element2))
proxy = {'https': choice(list(map(create_url, proxies)))}
return proxy
[docs]def scrape_page(url: str, spoof: bool = False) -> Optional[BeautifulSoup]:
"""
This function tries to get page information by
spoofing the header and trying a random proxy.
If successful, it returns the soup of the page.
:param url: The url to scrape.
:type url: str
:param spoof: Whether to spoof the header and use a proxy.
:type spoof: bool
:return: The soup of the page.
:rtype: Optional[BeautifulSoup]
"""
try:
if spoof:
proxy = proxy_generator()
user_agent = UserAgent()
headers = {'User-Agent': user_agent.random}
page = requests.get(url, headers=headers, proxies=proxy, timeout=1.5)
page.raise_for_status()
else:
page = requests.get(url)
page.raise_for_status()
if page.status_code == 200:
soup = BeautifulSoup(page.content, 'html.parser')
return soup
else:
print(f"There was an error downloading the page {url}.")
except ConnectionError:
print(f"Could not establish a connection: {url}.")
except RequestException as e:
print(f"An error occurred while making the request: {e}.")
except TooManyRedirects:
print(f"Too many redirects: {url}.")
except URLRequired:
print(f"Please enter a valid URL. {url} is not valid.")
except ReadTimeout:
print(f"The server did not return any data within the allotted time: {url}")
except Timeout:
print(f"The request timed out: {url}")
except HTTPError as err:
print(f"An HTTP error occurred: {err}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
[docs]def multi_thread_func(func: Callable, values: List, threads: int = 126) -> List:
"""
This function takes a function and a list of values.
It then runs the function on each value in the list
using a thread pool.
:param func: The function to run.
:type func: Callable
:param values: The values to run the function on.
:type values: List
:param threads: The number of threads to use.
:type threads: int
:return: A list of the results of the function.
:rtype: List
"""
listing_soups = []
with ThreadPool(threads) as pool:
for result in pool.map(func, values):
listing_soups.append(result)
return listing_soups