We will continue our open data analytics workflow starting with the AWS Console then moving to using the notebook. Using AWS Glue we can automate creating a metadata catalog based on flat files stored on Amazon S3. Glue is a fully managed extract, transform, and load (ETL) service that makes it easy for customers to prepare and load their data for analytics. You can create and run an ETL job with a few clicks in the AWS Management Console. You simply point AWS Glue to your data stored on AWS, and AWS Glue discovers your data and stores the associated metadata (e.g. table definition and schema) in the AWS Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.
We have sourced the open dataset from the Registry of Open Data on AWS. We also stored the data on S3. Now we are ready to extract, transform, and load the data for analytics. We will use AWS Glue service to do this. First step is to create a logical database entry in the data catalog. Note that we are not creating a physical database which requires resources. This is just a metadata placeholder for the flat file we copied into S3.
When creating the data catalog name try choosing a name without hyphens and few characters long. This will make SQL queries more readable and also avoid certain errors when running these queries.
We can also setup the notebook for accessing AWS Glue service using the Boto3 Python SDK. The pandas and IPython dependencies are imported for output formatting purposes only. We also import numpy a popular statistical analysis library. Charts and visualizations will be supported by seaborn and matplotlib libraries. To access the Glue service API we create a Glue client.
import boto3
import pandas as pd
import numpy as np
from IPython.display import display, Markdown
import seaborn as sns
%matplotlib inline
import matplotlib.pyplot as pltglue = boto3.client('glue')
s3 = boto3.client('s3')We will recreate the AWS Console GUI experience using SDK calls by creating the list_glue_databases function. We simply get the data catalogs in one statement and iterate over the results in the next one.
def list_glue_databases():
glue_database = glue.get_databases()
for db in glue_database['DatabaseList']:
print(db['Name'])list_glue_databases()default
odoc
sampledb
taxicatalog
Next, we create a logical table using Glue crawler. This is again just table metadata definition while the actual data is still stored only in the flat file on S3. For this notebook we will define and run the default Glue Crawler to extract and load the metadata schema from our flat file. This requires selection of a data store which is S3 in this case, defining an IAM role for access from Glue to S3, selecting a schedule for the crawler to run repeatedly if required, and output destination of the crawler results.
Please ensure that the flat file is stored on S3 within its own folder and you point at the folder when picking the data source during crawler definition. If you point directly to a flat file when running the crawler, it may return zero results when querying using Amazon Athena.
Glue will pick up folder name for the logical table name. Keeping our data source files in a folder has the added advantage of incremntally updating the folder with updates to the data with more files or updating the original file. Glue will pick up these changes based on crawler run schedule.
This results in extraction of table metadata stored within our data catalog. The schema with data types is extracted and stored in Glue Data Catalog. Note that the default Glue Crawler understands well-formed CSV files with first row as comma-separated list of column names, and next set of rows representing ordered data records. The Glue Crawler automatically guesses data types based on the contents of the flat file.
Transforming big data in notebook environment is not viable. Instead we can use Amazon Athena for large data transforms and bring the results back into our notebook.
We will use following query to create a well formed table transformed from our original table. Note that we specify the output location so that Athena defined WorkGroup location is not used by default. We also specify the format as TEXTFILE otherwise default PARQUET format is used which may generate errors when sampling this data.
CREATE TABLE
IF NOT EXISTS "taxicatalog"."many_trips_well_formed"
WITH (
external_location = 's3://open-data-analytics-taxi-trips/many-trips-well-formed/',
format = 'TEXTFILE',
field_delimiter = ','
)
AS SELECT vendorid AS vendor,
passenger_count AS passengers,
trip_distance AS distance,
ratecodeid AS rate,
pulocationid AS pick_location,
dolocationid AS drop_location,
payment_type AS payment_type,
fare_amount AS fare,
extra AS extra_fare,
mta_tax AS tax,
tip_amount AS tip,
tolls_amount AS toll,
improvement_surcharge AS surcharge,
total_amount AS total_fare,
tpep_pickup_datetime AS pick_when,
tpep_dropoff_datetime AS drop_when
FROM "taxicatalog"."many_trips";In the spirit of AWS Open Data Analytics API we will recreate the AWS Console feature which lists the tables and displays the metadata within one single reusable function. We get the list of table metadata stored within our data catalog by passing the database parameter. Next we iterate over each table object and display the name, source data file, number of records (estimate), average record size, data size in MB, and the name of the crawler used to extract the table metadata. We also display the list of column names and data types extracted as schema from the flat file stored on S3.
def list_glue_tables(database, verbose=True):
glue_tables = glue.get_tables(DatabaseName=database)
for table in glue_tables['TableList']:
display(Markdown('**Table: ' + table['Name'] + '**'))
display(Markdown('Location: ' + table['StorageDescriptor']['Location']))
created = table['CreatedBy'].split('/')
display(Markdown('Created by: ' + created[-1]))
if verbose and created[-1] == 'AWS Crawler':
display(Markdown(f'Records: {int(table["Parameters"]["recordCount"]):,}'))
display(Markdown(f'Average Record Size: {table["Parameters"]["averageRecordSize"]} Bytes'))
display(Markdown(f'Dataset Size: {float(table["Parameters"]["sizeKey"])/1024/1024:3.0f} MB'))
display(Markdown(f'Crawler: {table["Parameters"]["UPDATED_BY_CRAWLER"]}'))
if verbose:
df_columns = pd.DataFrame.from_dict(table["StorageDescriptor"]["Columns"])
display(df_columns[['Name', 'Type']])
display(Markdown('---'))list_glue_tables('taxicatalog', verbose=False)Table: many_trips
Location: s3://open-data-analytics-taxi-trips/many-trips/
Created by: AWS-Crawler
Table: many_trips_well_formed
Location: s3://open-data-analytics-taxi-trips/many-trips-well-formed
Created by: manav
athena = boto3.client('athena')Our next action is to bring the data created within Athena into the notebook environment using a pandas DataFrame. This can be done using the athena_query function which calls the Amazon Athena API to execute a query and store the output within a bucket and folder. This output is then read by a DataFrame which is returned by the function.
def athena_query(query, bucket, folder):
output = 's3://' + bucket + '/' + folder + '/'
response = athena.start_query_execution(QueryString=query,
ResultConfiguration={'OutputLocation': output})
qid = response['QueryExecutionId']
response = athena.get_query_execution(QueryExecutionId=qid)
state = response['QueryExecution']['Status']['State']
while state == 'RUNNING':
response = athena.get_query_execution(QueryExecutionId=qid)
state = response['QueryExecution']['Status']['State']
key = folder + '/' + qid + '.csv'
data_source = {'Bucket': bucket, 'Key': key}
url = s3.generate_presigned_url(ClientMethod = 'get_object', Params = data_source)
data = pd.read_csv(url)
return dataTo explore the data within Athena we will query returning thousand random samples.
bucket = 'open-data-analytics-taxi-trips'
folder = 'queries'
query = 'SELECT * FROM "taxicatalog"."many_trips_well_formed" TABLESAMPLE BERNOULLI(100) LIMIT 1000;'
df = athena_query(query, bucket, folder)
df.head()| vendor | passengers | distance | rate | pick_location | drop_location | payment_type | fare | extra_fare | tax | tip | toll | surcharge | total_fare | pick_when | drop_when | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2 | 1 | 1.25 | 1 | 237 | 236 | 1 | 9.0 | 0.0 | 0.5 | 0.00 | 0.0 | 0.3 | 9.80 | 2018-06-06 10:43:34 | 2018-06-06 10:54:58 |
| 1 | 1 | 1 | 1.20 | 1 | 158 | 90 | 2 | 7.5 | 0.0 | 0.5 | 0.00 | 0.0 | 0.3 | 8.30 | 2018-06-06 10:06:22 | 2018-06-06 10:15:21 |
| 2 | 1 | 1 | 3.30 | 1 | 234 | 236 | 1 | 17.0 | 0.0 | 0.5 | 3.55 | 0.0 | 0.3 | 21.35 | 2018-06-06 10:17:20 | 2018-06-06 10:43:07 |
| 3 | 1 | 1 | 0.90 | 1 | 236 | 140 | 1 | 7.0 | 0.0 | 0.5 | 1.55 | 0.0 | 0.3 | 9.35 | 2018-06-06 10:48:28 | 2018-06-06 10:57:08 |
| 4 | 1 | 1 | 1.00 | 1 | 141 | 162 | 1 | 7.0 | 0.0 | 0.5 | 1.95 | 0.0 | 0.3 | 9.75 | 2018-06-06 10:59:28 | 2018-06-06 11:08:05 |
Next we will determine statistical correlation between various features (columns) within the given set of samples (records).
corr = df.corr(method ='spearman')
corr| vendor | passengers | distance | rate | pick_location | drop_location | payment_type | fare | extra_fare | tax | tip | toll | surcharge | total_fare | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| vendor | 1.000000 | 0.283619 | 0.015401 | 0.055897 | -0.024097 | -0.005115 | 0.013634 | -0.014083 | NaN | -0.009308 | -0.024436 | 0.025840 | NaN | -0.015078 |
| passengers | 0.283619 | 1.000000 | 0.033053 | 0.051624 | -0.021166 | 0.003783 | 0.020200 | 0.035106 | NaN | -0.022736 | 0.008936 | 0.003765 | NaN | 0.033289 |
| distance | 0.015401 | 0.033053 | 1.000000 | 0.119010 | -0.119491 | -0.148011 | -0.068732 | 0.917127 | NaN | -0.080828 | 0.389773 | 0.401863 | NaN | 0.903529 |
| rate | 0.055897 | 0.051624 | 0.119010 | 1.000000 | -0.042557 | -0.053956 | -0.007774 | 0.185992 | NaN | -0.501256 | 0.083778 | 0.246460 | NaN | 0.184445 |
| pick_location | -0.024097 | -0.021166 | -0.119491 | -0.042557 | 1.000000 | 0.150656 | -0.009998 | -0.129692 | NaN | 0.010869 | -0.028087 | -0.153488 | NaN | -0.127936 |
| drop_location | -0.005115 | 0.003783 | -0.148011 | -0.053956 | 0.150656 | 1.000000 | 0.003079 | -0.162211 | NaN | 0.090225 | -0.042135 | -0.087721 | NaN | -0.154017 |
| payment_type | 0.013634 | 0.020200 | -0.068732 | -0.007774 | -0.009998 | 0.003079 | 1.000000 | -0.073051 | NaN | -0.015087 | -0.776507 | -0.068458 | NaN | -0.212893 |
| fare | -0.014083 | 0.035106 | 0.917127 | 0.185992 | -0.129692 | -0.162211 | -0.073051 | 1.000000 | NaN | -0.091508 | 0.425216 | 0.395950 | NaN | 0.983444 |
| extra_fare | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| tax | -0.009308 | -0.022736 | -0.080828 | -0.501256 | 0.010869 | 0.090225 | -0.015087 | -0.091508 | NaN | 1.000000 | 0.012988 | -0.148891 | NaN | -0.089695 |
| tip | -0.024436 | 0.008936 | 0.389773 | 0.083778 | -0.028087 | -0.042135 | -0.776507 | 0.425216 | NaN | 0.012988 | 1.000000 | 0.267483 | NaN | 0.555170 |
| toll | 0.025840 | 0.003765 | 0.401863 | 0.246460 | -0.153488 | -0.087721 | -0.068458 | 0.395950 | NaN | -0.148891 | 0.267483 | 1.000000 | NaN | 0.403146 |
| surcharge | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
| total_fare | -0.015078 | 0.033289 | 0.903529 | 0.184445 | -0.127936 | -0.154017 | -0.212893 | 0.983444 | NaN | -0.089695 | 0.555170 | 0.403146 | NaN | 1.000000 |
We can drop features which show NaN correlation.
df = df.drop(columns=['surcharge'])corr = df.corr(method ='spearman')Completing the data science workflow from sourcing big data, wrangling it using Amazon Athena to well formed schema, bringing adequate sample data from Athena to notebook environment, conducting exploratory data analysis, and finally visualizing the results.
def heatmap(corr):
sns.set(style="white")
# Generate a mask for the upper triangle
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(11, 9))
# Generate a custom diverging colormap
cmap = sns.diverging_palette(220, 10, as_cmap=True)
# Draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr, mask=mask, cmap=cmap, vmax=.3, center=0, annot=True, fmt="3.2f",
square=True, linewidths=.5, cbar_kws={"shrink": .5})heatmap(corr)



