Spark 2.X benefits over Spark 1.X

1. Improved Performance

a. Whole Stage Code Generation – Tungsten Engine

Apache Spark 2.0 ships with the second-generation Tungsten engine, which aims to improve Spark execution by optimizing Spark jobs for CPU and memory efficiency, through a technique called “whole-stage code generation”. Through this technique, optimized bytecode is emitted at runtime which collapses entire query into a single function, eliminates virtual function calls and uses CPU registers for intermediate data.

Below examples demonstrate the improved performance in Spark 2.0 vs Spark 1.6. We are using Spark 2.0 and turn off whole-stage code generation resulting in a code path similar to Spark 1.6.

i. Benchmark Setup

We define the following benchmark function to calculate the time taken by a function to execute.

def benchmark(name: String)(f: => Unit) {
val startTime = System.nanoTime</code>

f

val endTime = System.nanoTime

println(s"Time taken in $name: " +
(endTime - startTime).toDouble / 1000000000 + " seconds")

}

ii. Sum up 1 billion numbers using Spark 1.6

// This config turns off whole stage code generation, effectively changing the execution path to be similar to Spark 1.6.

spark.conf.set("spark.sql.codegen.wholeStage", false)

benchmark("Spark 1.6") {

spark.range(1000L * 1000 * 1000).selectExpr("sum(id)").show()

}

Time taken in Spark 1.6: 15.565444737 seconds

iii. Sum up 1 billion numbers using Spark 2.0

spark.conf.set("spark.sql.codegen.wholeStage", true)

benchmark("Spark 2.0") {

spark.range(1000L * 1000 * 1000).selectExpr("sum(id)").show()

}

Time taken in Spark 2.0: 0.675835036 seconds

iv. Join 1 billion records using Spark 1.6

spark.conf.set("spark.sql.codegen.wholeStage", false)

benchmark("Spark 1.6") {

spark.range(1000L * 1000 * 1000)
  .join(spark.range(1000L).toDF(), "id").count()

}

Time taken in Spark 1.6: 58.126652348 seconds

v. Join 1 billion records using Spark 2.0

spark.conf.set("spark.sql.codegen.wholeStage", true)

benchmark("Spark 2.0") {

spark.range(1000L * 1000 * 1005).join(spark.range(1040L).toDF(), "id").count()

}

Time taken in Spark 2.0: 0.987448172 seconds

Below table provides a comparative view of the above results:

 

Primitive

Spark 1.6

Spark 2.0

Sum

15.6
seconds

0.68
seconds

Join

58.12  seconds

0.98
seconds

b. Faster Group-by aggregation

Group-by aggregates are sped up by around 3-5X by using an in-memory hashmap which acts as a ‘cache’ for extremely fast key-value lookups while evaluating aggregates.

Using the same benchmark code mentioned in above examples :

Using Spark 1.6
benchmark("Spark 1.6 Groupby"){
sqlContext.range(1000L * 1000 * 100).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
}

Time taken in Spark 1.6 Groupby: 9.419302176 seconds

Using Spark 2.0

benchmark("Spark 2.0 Groupby"){
spark.range(1000L * 1000 * 100).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
}

Time taken in Spark 2.0 Groupby: 2.265611999 seconds

c. Catalog based partition handling

In previous versions of Spark, the first read to a table can be slow, if the number of partitions is very large, since Spark must first discover which partitions exist. The initial query over the table is blocked until Spark loads all the table partition’s metadata. For a large-partitioned table, recursively scanning the filesystem to discover file metadata for the initial query can take many minutes.

As of Spark 2.1, table partition metadata is now stored in system catalog where the partitions which are not required in the query are pruned. This avoids need to locate files from partitions which are not used.

2. SQL support

a. Native DDL command implementation.

As of Spark 1.6 most of the DDL commands are delegated directly to Hive which may cause missing functionality, failures with bad error messages or inconsistent behaviours. In Spark 2.0, native DDL command are implemented directly, no more depending on Hive.

b. Support all TPC-DS queries

Spark 1.6 supports only 55 out of 99 TPC-DS queries. As of Spark 2.0, all 99 TPC-DS queries can be run. Below are some of the key changes available in Spark 2.0 due to this effort.

• Support CUBE/ROLLUP/GROUPING function

Example:

val sales = Seq(
("Warsaw", 2016, 100),
("Warsaw", 2017, 200),
("Boston", 2015, 50),
("Boston", 2016, 150),
("Toronto", 2017, 50)
).toDF("city", "year", "amount")

sales.registerTempTable("sales")

Spark 1.6

sqlContext.sql("select city, SUM(amount) as Total from sales group by rollup(city)").show
org.apache.spark.sql.AnalysisException: undefined function rollup; line 1 pos 61

Spark 2.0

spark.sql("select city, SUM(amount) as Total from sales group by rollup(city)").show
+-------+-----+
| city|Total|
+-------+-----+
| null| 550|
|Toronto| 50|
| Boston| 200|
| Warsaw| 300|
+-------+-----+

• Subquery support

In earlier versions of Spark, subqueries were only supported in FROM clause in Spark SQL.

So, statement such as given below will fail.

“select customerid from sparkbug where customerid in (select customerid from sparkbug where customerid in (2,3))”

As of Spark 2.0, subquery support has been added for SELECT/WHERE/HAVING clauses and for predicate subqueries (EXISTS, IN) only in WHERE clause.

• Support Windows Functions in SQLContext

As of Spark 2.0, three types of windows functions are supported: ranking functions, analytic functions and aggregate functions. Following ranking and analytic functions are available.

SQL

Ranking
functions

rank

dense_rank

percent_rank

ntile

row_number

Analytic
functions

cume_dist

first_value

last_value

lag

Lead

3. New Features

a. To remove the confusion of whether SQLContext or HiveContext is to be used, SparkSession is introduced to serve as the single-entry point to access DataFrame and Dataset APIs.

i. SQLContext and HiveContext are deprecated.

ii. In REPL (spark-shell), spark is available as SparkSession.

b. To take advantage of RDD type of operations, type-safety and DataFrame type of query optimization capability, DataFrame/Dataset APIs are unified into Dataset for Java and Scala.

b. CSV data source implementation is now built in. Now there is no need to include the Apache Spark CSV module JAR while submitting Apache Spark applications. Instead, use CSV as a datasource provider when you read or write CSV datasource tables.

val df = spark.read.option(“header”, true).csv(path)

PySpark – aggregateByKey

I have found Spark’s aggregateByKey function to be somewhat difficult to understand at one go. Here are some tips, tricks which I employed to understand it better.

Let’s define an rdd first.

>> data = sc.parallelize([(“a”,1),(“b”,2),(“a”,2),(“b”,3)])

Let’s check the number of partitions for “data” rdd.

>> data.getNumPartitions()
4

So here our each of the tuple in “data” rdd is in a separate partition.

>> data.glom().collect()
[[(‘a’, 1)], [(‘b’, 2)], [(‘a’, 2)], [(‘b’, 3)]]

Our problem statement is to find out the sum of values for each key. But the “zeroValue” for accumulator is 1.

>> data.aggregateByKey(1,lambda acc,val: acc+val, lambda acc1, acc2: acc1+acc2).collect()
[(‘a’, 5), (‘b’, 7)]

So what’s happening here?

This transformation has 3 parameters here:

  1. First one is the “zeroValue” which is the initial value of the accumulator, acc.
  2. Second parameter is a  sequencing function. Let’s term it as function-1.
  3. Third parameter is a combiner function. Let’s term it as function-2.

The “zeroValue” of aggregateByKey transformation is applied to accumulator on each partition.

When tuple (“a”,1) is passed to the aggregateByKey transformation, function-1 is applied as below:

lambda acc,val: acc + val

lambda 1,1: 1 + 1 => 2

So the value of accumulator for partition-1 is now 2.

Now when the second tuple having key “a”, which is (“a”,2) is passed to the transformation, the value 2 is not directly added to the previously calculated value of 2.

Since tuple (“a”,2) is in a separate partition, the accumulator is set to the initial “zeroValue” which is 1.

Here function-1 plays as follows:

lambda acc,val: acc + val

lambda 1,2: 1 + 2 => 3

Now, these values are passed on to the second function, which is the combiner function and are finally added to give the result (2+3)=5 for key “a”.

Below figure may help in better understanding:

combinebykey1

Now, let’s take another scenario where we have rdd with only a single partition instead of 4.

>> data = sc.parallelize([(“a”,1),(“b”,2),(“a”,2),(“b”,3)],1)

Let’s get the  number of partitions for rdd “data”.

>> data.getNumPartitions()
1

Let’s have a look at the rdd partitioning.

>> data.glom().collect()
[[(‘a’, 1), (‘b’, 2), (‘a’, 2), (‘b’, 3)]]

Here we can see that all the 4 tuples are in a single partition.

Lets run the combiner function again.

>> data.aggregateByKey(1,lambda acc,val: acc+val, lambda acc1, acc2: acc1+acc2).collect()
[(‘a’, 4), (‘b’, 6)]

The result is different now. By changing only the number of partitions, we are getting a different result.

In this case, since the partition is single, the accumulator value will not be initialized to “zeroValue” when the 2nd tuple of key “a” is encountered.

For the first tuple of key “a” i.e. (“a”,1), function-1 is applied as :

lambda acc,val: acc + val

lambda 1,1: 1 + 1 => 2

For the 2nd tuple of key “a” i.e.(“a”,2), function-1 is applied as:

lambda acc,val: acc + val

lambda 2,2: 2 + 2 => 4

Since there is only one partition, there will be only one accumulator per key.

So acc2 = 0 for both key “a” as well as “b”.

Below illustration help in better understanding.

combinebykey2

 

Web Scraping – 2

Going ahead with our web scraping adventure, there are a few more tricks which I had explored over the last weekend.
Since web scraping spiders can reduce the website performance considerably (if the spiders hit the website frequently), some websites implement techniques to ward off spiders crawling their website.

Websites have ‘robots.txt’ file which can be found at the root itself, such as http://www.example.com/robots.txt. Even wordpress blogs have this file. For this blog, ‘robots.txt’ is at https://devopsrecipe.wordpress.com/robots.txt.  This file defines which user-agents are blocked from crawling which pages of the website. Visit http://www.robotstxt.org/robotstxt.html for more information on this.

‘robots.txt’ just instructs web scrapers what to scrape and what not to, but does not implement anything to block scrapers. So it all depends on your good judgement to exclude sites or pages of sites from your scraper.

Secondly, if websites identify a scraper hitting it, they can re-direct to a different link. For avoiding this, you can disable re-direction. For this, you need to add the below line in settings.py of your scrapy project:


REDIRECT_ENABLED=False

Web Scraping using Scrapy

So recently I have been playing with web scraping using Python Scrapy module. It is fun when you can see the data across multiple pages of website in a csv format with a single hit of key.

This tutorial assumes that you have installed scrapy.

Now let’s get back straight to the topic. Scrapy provides you with various in-built commands and one of that can be used for creating the required files and folder structure. That scrapy command is ‘startproject’ and used as follows:

startproject

It creates the files and folder structure as below:

tree

Now the first file of importance is items.py. Here we declare items. Items are the attributes of the website data which we need to capture.

For example, in the below code, from an event ticketing website, we want to pick up the Date of the event, Name of the event, event’s organizer and the venue.


import scrapy
from scrapy.loader.processors import Join, MapCompose
from w3lib.html import replace_escape_chars

class ExampleItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
Date = scrapy.Field(input_processor=MapCompose(lambda v: v.split(),replace_escape_chars,unicode.strip),output_processor=Join(),)
Name = scrapy.Field(input_processor=MapCompose(lambda v: v.split(),replace_escape_chars,unicode.strip),output_processor=Join(),)
Organizer = scrapy.Field(input_processor=MapCompose(lambda v: v.split(),replace_escape_chars,unicode.strip),output_processor=Join(),)
Location = scrapy.Field(input_processor=MapCompose(lambda v: v.split(),replace_escape_chars,unicode.strip),output_processor=Join(),)
pass

 

Here we are using something called input and output processors to manipulate the string data that we will be scraping from the website. More information about these processors is here. Input processors takes in the raw string returned by the scraper function and manipulates it as the the functions inside “MapCompose”.

Now, comes the main scarper function which we will be defining inside the spiders directory. Lets create a file as exampleCrawl.py.

touch

The code in exampleCrawl.py is :

import scrapy
from aus_events.items import ExampleItem
from scrapy.loader import ItemLoader
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import Rule, CrawlSpider

class ExampleSpider(CrawlSpider):
name = "sample"
allowed_domains = ["example.com"]
start_urls = [
"https://www.example.com/events/?page=1"
]

rules = (
Rule(LinkExtractor(allow=("www\.example\.com\/events\/\?page=[0-9]")),callback='parse_item',follow=True),
)

def parse_item(self,response):

parent_url = response.xpath('//div[@class="l-block-2"]')
#print response.url

for card in parent_url:
l = ItemLoader(item=ExampleItem(), selector=card)
l.add_xpath('Name','a/div[2]/h4/text()')
l.add_xpath('Date','a/div[2]/time/text()')
l.add_xpath('Organizer','a/div[2]/div[1]/text()')
l.add_xpath('Location','a/div[2]/div[2]/text()')
yield l.load_item()

In this particular website, we need to crawl through every page and gather the data. Therefore, we are importing CrawlSpider from scrapy.spiders.

The url for different pages in this particular website is like:

http://www.example.com/events/?page=1

Here, we need to extract links for different pages, so we have used Rules where we have defined a regular expression for the url which we need to extract. And for every page url extracted, there is a function, parse_item, called which extracts the name, date, organizer and location for each event from that page.

For transforming data to csv format there are two ways. Either you can do it from command line while running the crawler or specify it in settings.py.

From command line:

crawl

Or in settings.py, mention below lines:


FEED_FORMAT = 'csv'
FEED_URI = "data.csv"

PowerShell- Retrieve today’s Files

Below function recurses through the path provided as parameter and lists out the files whose LastWriteTime has today’s date.

function Get-TodaysFiles{
param(
[string]$Path
)
$date=get-date
Get-ChildItem -Path $Path -Recurse | Where {!$_.PSIsContainer -and $_.LastWriteTime.Date -eq $date.Date } |select Name,LastWriteTime, Directory
}

Puppet – Cisco Switch Management

Cisco network devices with IOS as operating system can be managed through Puppet Enterprise, but some of its other features like ‘switchport nonegotiate’, ‘switchport voice vlan’ etc. are not included in the attributes of the ‘interface’ resource type of Puppet. Moreover, our requirement is to fetch the current running configuration of the Cisco switch, compare it with the base configuration and then display it on the Puppet Enterprise Console.

Our whole approach depends on below 3 events:

  • Fetching the base configuration of switch daily.
  • Fetching current configuration with every run of puppet agent.
  • Comparing the above 2 configurations.

We are using Cisco Catalyst 3650 switch in this endeavor, with puppet master (Puppet Enterprise 3.8) installed on CentOS 7. Below steps are explained for a single interface of the switch, but can be easily scaled to multiple interfaces.

Fetching Switch Configuration Daily

The script below is written in Python using PExpect module. It fetches the current running configuration of the switch for Gigabit Ethernet and converts to a yaml file. This script can be run a cron job on daily basis.

from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals

import sys
import pexpect
import string


switch_ip="<IPADDR>"
switch_un="user"
switch_pw="s3cr3t"

child=pexpect.spawnu('ssh %s@%s' % (switch_un, switch_ip))
child.logfile=sys.stdout
child.timeout=4
child.expect('Password:')
child.sendline(switch_pw)
child.expect('>')
child.sendline('en')
child.expect('Password:')
child.sendline('enable_pwd')
child.expect('#')

child.sendline('show running-config interface GigabitEthernet 1/0/1')

child.expect('#')
data=child.before
child.sendline('exit')
child.expect(pexpect.EOF)

list1=data.splitlines()

desc=None
avlan=None
mode1=None
nego=None
vvlan=None
duplex=None
bps=None
action=None

for member in list1:
	if "description" in member:
		desc=member.split('"')
		desc=desc[1]
		desc=desc.lstrip()

	if "switchport access" in member:
		avlan=member.split("vlan")
		avlan=avlan[1]
		avlan=avlan.lstrip()
	
	if "switchport mode" in member:
		mode1=member.split("mode")
		mode1=mode1[1]
		mode1=mode1.lstrip()
		
	if "switchport nonegotiate" in member:
		nego=member.split("switchport")
		nego=nego[1]
		nego=nego.lstrip()
	
	if "switchport voice" in member:
		vvlan=member.split("vlan")
		vvlan=vvlan[1]
		vvlan=vvlan.lstrip()

	if "duplex" in member:
		duplex=member.split("duplex")
		duplex=duplex[1]
		duplex=duplex.lstrip()
	
	if "storm-control" in member:
		if "broadcast level bps" in member:
			bps=member.split("bps")
			bps=bps[1]
			bps=bps.lstrip()
		elif "action" in member:
			action=member.split("action")
			action=action[1]
			action=action.lstrip()
			
	
	if "spanning-tree" in member:
		sptree=member.split("spanning-tree")
		sptree=sptree[1]
		sptree=sptree.lstrip()


with open('/etc/puppetlabs/facter/facts.d/bckupconfig121.yaml','w') as f:
	f.write('---\n')
	f.write('IfGB101Desc: %s\n' % desc)
	f.write('IfGB101AVlan: %s\n' % avlan)
	f.write('IfGB101Mode: %s\n' % mode1)
	f.write('IfGB101Nego: %s\n' % nego)
	f.write('IfGB101VVlan: %s\n' % vvlan)
	f.write('IfGB101Duplex: %s\n' % duplex)
	f.write('IfGB101Strmbps: %s \n' % bps)
	f.write('IfGB101Strmact: %s\n' % action)
	f.write('IfGB101SPtree: %s\n' % sptree)
	f.close()

 

This yaml file is stored in Hiera repository so that the configuration can be read in Puppet manifests.

Below is the sample yaml file stored in Hiera repository.

yamlHiera

Below is a snapshot of the hiera.yaml configuration file.

hieraconfig

Yaml files are named in below format:

dateyyyy-mm-dd.yaml

dateformat

Fetching current configuration with every run of puppet agent.

Once the daily configuration script is run and the data stored in yaml file, next step is to fetch the switch running configuration with every run of Puppet Agent. The script used for fetching this configuration is same as the one used before, only difference being the name of the yaml file and the location where it is stored. Also, this script will be called through Puppet manifest using the ‘exec’ resource.

I have named this file as ‘bckupconfig121.yaml’ and stored it in ‘/etc/puppetlabs/facter/facts.d/’. This location is the one where the external facts are stored. Thus, the data stored in ‘bckupconfig121.yaml’ can be invoked from the command line as external facts.

Below is a snapshot of ‘bckupconfig121.yaml’:

bckupconfig

Data stored in the above yaml file can be accessed using facter tool.

facter1

Comparison of the yaml files

For comparison of the yaml files generated in the above 2 steps, following code in puppet manifest is used.


class cisconw{

# Fetch configuration from Cisco switch with every run of Puppet Agent
exec{ "/usr/bin/python /home/pratyush/Cisco/ShowInterface-Agent.py": }

#Compare the values
if hiera("IfGB101Desc") != $::ifgb101desc{
fail("Description of Gigabit Ethernet Interface has changed to $::ifgb101desc")
}
}

If the values does not match, the puppet agent run fails on the server and will show up in ‘Failed’ nodes in Puppet Enterprise Console.

PEConsole

This may not be best or the last way, so let me know your thoughts on this approach in the comments section below.