Source code for scrape_cost_of_living

import os

import numpy as np
import pandas as pd
import pycountry

import scrape_urls

data_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'data'))

data_file_path = os.path.join(data_dir, 'Cost of Living Items.csv')
cost_of_living_units = pd.read_csv(data_file_path)
data_file_path = os.path.join(data_dir, 'Climate by Country.csv')
climate_data = pd.read_csv(data_file_path)


cost_of_living_tables_dict = {}
simulated_cost_of_living_dict = {}

[docs]def clean_numbeo_table(numbeo_df: pd.DataFrame) -> pd.DataFrame: """ Cleans the default Numbeo cost of living table. :param numbeo_df: pandas dataframe :type numbeo_df: pd.DataFrame :return: pandas dataframe that has been cleaned up. :rtype: pd.DataFrame """ # Promote first column to index numbeo_df = numbeo_df.set_index(numbeo_df.columns[0]) # Remove rows where there are nulls in the next column. numbeo_df = numbeo_df[~numbeo_df[numbeo_df.columns[0]].isnull()] # Replace empty strings with nan numbeo_df = numbeo_df.replace(r'^\s*$', np.NaN, regex=True) # Replace '?' with NaN numbeo_df = numbeo_df.replace(r'^\?$', np.NaN, regex=True) # Split range try: numbeo_df[['lower', 'upper']] = numbeo_df['Range'].str.split('-', expand=True) except: numbeo_df['lower'] = np.NaN numbeo_df['upper'] = np.NaN numbeo_df = numbeo_df.astype(str) # Remove currency 'NZ$' numbeo_df['mode'] = numbeo_df['Edit'].str[:-5] numbeo_df.index.rename('category', inplace=True) numbeo_df['lower'] = pd.to_numeric(numbeo_df['lower'].str.replace(',', ''), errors='coerce') numbeo_df['mode'] = pd.to_numeric(numbeo_df['mode'].str.replace(',', ''), errors='coerce') numbeo_df['upper'] = pd.to_numeric(numbeo_df['upper'].str.replace(',', ''), errors='coerce') return numbeo_df
[docs]def get_cost_of_living(place_name: str, simulations: int = 10000, percentile: int = 90) -> float: """ For all cost categories, get the cost and multiply it by the number of units. I simulate the cost using a triangular distribution if there is a lower and upper bound. If there isn't one though, I simply take the mode. :param numbeo_table: pandas dataframe with the cost of living. :type numbeo_table: pd.DataFrame :param simulations: number of simulations to run. :type simulations: int :param percentile: percentile to use. :type percentile: int :return: cost of living in the input country. :rtype: float """ if place_name in simulated_cost_of_living_dict.keys(): return np.percentile(simulated_cost_of_living_dict[place_name], percentile) else: numbeo_table = cost_of_living_tables_dict[place_name] vals = np.zeros(simulations) for i in range(len(cost_of_living_units) - 1): # Will catch an error if the category is not in the table try: category = cost_of_living_units.iloc[i]['Category'] lower_units = cost_of_living_units.iloc[i]['Lower Units pw'] upper_units = cost_of_living_units.iloc[i]['Upper Units pw'] lower = numbeo_table.loc[category]['lower'] mode = numbeo_table.loc[category]['mode'] upper = numbeo_table.loc[category]['upper'] if lower > 0 and upper > 0 and mode > 0: vals = np.add(vals, np.random.uniform(lower_units, upper_units, simulations) * np.random.triangular(lower, mode, upper, simulations)) else: vals = np.add(vals, [mode * np.random.uniform(lower_units, upper_units, simulations)] *simulations) except: continue simulated_cost_of_living_dict[place_name] = vals return np.percentile(vals, percentile)
[docs]def check_enough_data(numbeo_df: pd.DataFrame) -> float: """ Checks that the number of data points we have is sufficient. I check that I have enough data to be able to use it to estimate cost of living. :param numbeo_df: pandas dataframe. :type numbeo_df: pd.DataFrame :return: proportion of filled cells as a proportion of number of total categories. :rtype: float """ if type(numbeo_df) == list: return 0 else: categories = cost_of_living_units[ cost_of_living_units['Upper Units pw'] > 0]['Category'] intersecting_categories = numbeo_df.index.intersection(categories) num_nulls = numbeo_df.loc[intersecting_categories]['Edit'].isna().sum() return (len(intersecting_categories) - num_nulls) / len(categories)
[docs]def get_numbeo_countries() -> list: """ This function returns a list of countries that have been scraped from Numbeo. The countries get standardized using the pycountry library. :return: list of countries. :rtype: list """ soup = scrape_urls.scrape_page('https://www.numbeo.com/cost-of-living') table = soup.find_all(class_='related_links') countries = table[0].find_all('a') std_countries = {} for i in range(len(countries)): try: std_countries[countries[i].text] = pycountry.countries.search_fuzzy( countries[i].text)[0].name except: pass return std_countries
[docs]def main(): urls = [] # Create a url for each country based on the temperature # website country names (there may be some differences). countries = get_numbeo_countries() for country in countries.keys(): country_str = '+'.join(country.title().split()) urls.append(f'https://www.numbeo.com/cost-of-living/country_result.jsp?country={country_str}&displayCurrency=NZD') # Scrape the pages # soups = scrape_urls.multi_thread_func(scrape_urls.scrape_page, urls) soups = [scrape_urls.scrape_page(url) for url in urls] # Extract the data tables tables = [scrape_urls.get_table(soup, 1, 0, -1) for soup in soups] # Clean and format the tables cleaned_tables = [clean_numbeo_table(table) if type(table) != list else [] for table in tables] # Get cost of living for every country if there is enough data. cost_of_living_dic = {} for country, table in zip(countries.values(), cleaned_tables): if check_enough_data(table) > 0.9: cost_of_living_dic[country] = get_cost_of_living(table) else: cost_of_living_dic[country] = '' # Format results table and save df = pd.DataFrame.from_dict(cost_of_living_dic, orient='index') df.index.rename('Country', inplace=True) df.columns = ['Cost of Living pw'] df.to_csv('data/Cost of Living by Country.csv')
[docs]def get_cost_of_living_table(place_name: str, country=True): """ Get the cost of living table for a place. :param place_name: name of the place. :type place_name: str :param country: whether the place is a country or city. :type country: bool :return: cost of living table. :rtype: pd.DataFrame """ if place_name in cost_of_living_tables_dict.keys(): return cost_of_living_tables_dict[place_name] else: if country: formatted_place_name = '+'.join(place_name.title().split()) url = f'https://www.numbeo.com/cost-of-living/country_result.jsp?country={formatted_place_name}&displayCurrency=NZD' else: formatted_place_name = '+'.join(place_name.title().split()) url = f'https://www.numbeo.com/cost-of-living/in/{formatted_place_name}?displayCurrency=NZD' soup = scrape_urls.scrape_page(url) table = scrape_urls.get_table(soup, 1, 0, -1) cleaned_table = clean_numbeo_table(table) # Cache the table cost_of_living_tables_dict[place_name] = cleaned_table return cleaned_table
[docs]def get_city_cost_of_living(city: str, percentile: int = 90) -> float: """ Get the cost of living for a city. :param city: city to get the cost of living for. :type city: str :param percentile: percentile to use. :type percentile: int :return: cost of living for the city. :rtype: float """ cleaned_table = get_cost_of_living_table(city.title(), country=False) if check_enough_data(cleaned_table) > 0.9: cost_of_living = get_cost_of_living(city.title(), percentile=percentile) print(f'{percentile}th percentile weekly cost of living in {city.title()}: {round(cost_of_living, 2)}') else: print('Not enough data to estimate cost of living.')
[docs]def get_country_cost_of_living(country: str, percentile: int = 90) -> float: """ Get the cost of living for a country. :param country: country to get the cost of living for. :type country: str :param percentile: percentile to use. :type percentile: int :return: cost of living for the country. :rtype: float """ cleaned_table = get_cost_of_living_table(country.title(), country=True) if check_enough_data(cleaned_table) > 0.9: cost_of_living = round(get_cost_of_living(country.title(), percentile=percentile), 2) print(f'{percentile}th percentile weekly cost of living in {country.title()}: {cost_of_living}') else: print('Not enough data.') cost_of_living = None return cost_of_living
if __name__ == "__main__": get_country_cost_of_living('colombia', 99) get_country_cost_of_living('colombia', 50) # get_country_cost_of_living('New zealand', 99) # get_country_cost_of_living('New zealand', 50) # get_country_cost_of_living('georgia', 99) # get_country_cost_of_living('georgia', 50) # get_country_cost_of_living('Uruguay', 99) # get_country_cost_of_living('Uruguay', 50) # get_city_cost_of_living('Queenstown', 10) # get_city_cost_of_living('Tauranga', 10) # get_city_cost_of_living('Houston', 50) # get_city_cost_of_living('Austin', 50) # get_city_cost_of_living('Bogota', 50) # get_city_cost_of_living('Medellin', 50) # get_city_cost_of_living('Wellington', 90) # get_city_cost_of_living('Christchurch', 90) # get_city_cost_of_living('Auckland', 90) # get_city_cost_of_living('Hamilton', 90) # get_city_cost_of_living('Dunedin', 90) # get_city_cost_of_living('Bali', 99) # get_city_cost_of_living('Bali', 50) # get_city_cost_of_living('Yogyakarta', 99) # get_city_cost_of_living('Yogyakarta', 50) # get_country_cost_of_living('Indonesia', 99) # get_country_cost_of_living('Indonesia', 50) # get_country_cost_of_living('Thailand', 99) # get_country_cost_of_living('Thailand', 50) # get_country_cost_of_living('india', 90) # get_country_cost_of_living('india', 50) # get_country_cost_of_living('Australia', 90) # get_country_cost_of_living('Australia', 50) # get_country_cost_of_living('Peru', 90) # get_country_cost_of_living('Peru', 50) # get_country_cost_of_living('Argentina', 90) # get_country_cost_of_living('Argentina', 50) # get_country_cost_of_living('Ecuador', 90) # get_country_cost_of_living('Ecuador', 50) # get_country_cost_of_living('Mexico', 90) # get_country_cost_of_living('Mexico', 50) # get_country_cost_of_living('Spain', 90) # get_country_cost_of_living('Spain', 50) # get_country_cost_of_living('Portugal', 90) # get_country_cost_of_living('Portugal', 50) # get_country_cost_of_living('Slovakia', 90) # get_country_cost_of_living('Slovakia', 50) # get_country_cost_of_living('Pakistan', 90) # get_country_cost_of_living('Pakistan', 50) # get_country_cost_of_living('Costa Rica', 90) # get_country_cost_of_living('Costa Rica', 50) # get_country_cost_of_living('Puerto Rico', 90) # get_country_cost_of_living('Puerto Rico', 50) # get_country_cost_of_living('Iceland', 90) # get_country_cost_of_living('Iceland', 50)