Skip to content

Commit fc77edd

Browse files
JWileczekrfecher
authored andcommitted
Implement custom geometry udt(s) for Spark. (#1375)
1 parent 2b0c3b9 commit fc77edd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+636
-376
lines changed

analytics/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,12 @@
1515
<module>spark</module>
1616
<module>mapreduce</module>
1717
</modules>
18+
<profiles>
19+
<profile>
20+
<id>pyspark</id>
21+
<modules>
22+
<module>pyspark</module>
23+
</modules>
24+
</profile>
25+
</profiles>
1826
</project>

analytics/pyspark/pom.xml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>geowave-analytic-parent</artifactId>
7+
<groupId>mil.nga.giat</groupId>
8+
<version>0.9.8-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<name>GeoWave pyspark</name>
13+
<artifactId>geowave-analytic-pyspark</artifactId>
14+
<packaging>pom</packaging>
15+
<build>
16+
<plugins>
17+
<plugin>
18+
<groupId>org.apache.maven.plugins</groupId>
19+
<artifactId>maven-resources-plugin</artifactId>
20+
<executions>
21+
<execution>
22+
<id>copy-resources</id>
23+
<phase>process-resources</phase>
24+
<goals>
25+
<goal>copy-resources</goal>
26+
</goals>
27+
<configuration>
28+
<outputDirectory>${project.build.directory}/python</outputDirectory>
29+
<resources>
30+
<resource>
31+
<directory>src/main/python</directory>
32+
<includes><include>**</include></includes>
33+
<excludes><exclude>**/*.pyc</exclude></excludes>
34+
<filtering>true</filtering>
35+
</resource>
36+
</resources>
37+
</configuration>
38+
</execution>
39+
</executions>
40+
</plugin>
41+
42+
<plugin>
43+
<groupId>org.codehaus.mojo</groupId>
44+
<artifactId>exec-maven-plugin</artifactId>
45+
<configuration>
46+
<executable>python</executable>
47+
</configuration>
48+
<executions>
49+
<execution>
50+
<id>setuptools package</id>
51+
<phase>package</phase>
52+
<goals>
53+
<goal>exec</goal>
54+
</goals>
55+
<configuration>
56+
<workingDirectory>${project.build.directory}/python</workingDirectory>
57+
<arguments>
58+
<argument>setup.py</argument>
59+
<argument>sdist</argument>
60+
<argument>--dist-dir=${project.build.directory}</argument>
61+
</arguments>
62+
</configuration>
63+
</execution>
64+
</executions>
65+
</plugin>
66+
<plugin>
67+
<groupId>org.codehaus.mojo</groupId>
68+
<artifactId>build-helper-maven-plugin</artifactId>
69+
<executions>
70+
<execution>
71+
<id>attach-artifacts</id>
72+
<phase>package</phase>
73+
<goals>
74+
<goal>attach-artifact</goal>
75+
</goals>
76+
<configuration>
77+
<artifacts>
78+
<artifact>
79+
<file>${project.build.directory}/geowave_pyspark-${project.version}.tar.gz</file>
80+
<type>tar.gz</type>
81+
</artifact>
82+
</artifacts>
83+
</configuration>
84+
</execution>
85+
</executions>
86+
</plugin>
87+
</plugins>
88+
</build>
89+
</project>
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
import types
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from shapely import wkb
2+
from shapely.geometry import LineString, MultiLineString, MultiPoint, MultiPolygon, Point, Polygon
3+
from shapely.geometry.base import BaseGeometry
4+
from pyspark.sql.types import UserDefinedType, StructField, BinaryType, StructType
5+
6+
class AbstractGeometryUDT(UserDefinedType):
7+
@classmethod
8+
def sqlType(cls):
9+
return StructType([StructField("wkb", BinaryType(), True)])
10+
11+
@classmethod
12+
def module(cls):
13+
return 'geowave_pyspark.types'
14+
15+
@classmethod
16+
def scalaUDT(cls):
17+
return 'mil.nga.giat.geowave.analytic.spark.sparksql.udt.' + cls.__name__
18+
19+
def serialize(self, obj):
20+
return _serialize_to_wkb(obj)
21+
22+
def deserialize(self, datum):
23+
return _deserialize_from_wkb(datum[0])
24+
25+
class PointUDT(AbstractGeometryUDT):
26+
pass
27+
28+
29+
class LineStringUDT(AbstractGeometryUDT):
30+
pass
31+
32+
33+
class PolygonUDT(AbstractGeometryUDT):
34+
pass
35+
36+
37+
class MultiPointUDT(AbstractGeometryUDT):
38+
pass
39+
40+
41+
class MultiLineStringUDT(AbstractGeometryUDT):
42+
pass
43+
44+
45+
class MultiPolygonUDT(AbstractGeometryUDT):
46+
pass
47+
48+
49+
class GeometryUDT(AbstractGeometryUDT):
50+
pass
51+
52+
53+
def _serialize_to_wkb(data):
54+
if isinstance(data, BaseGeometry):
55+
return bytearray(data.wkb)
56+
return None
57+
58+
59+
def _deserialize_from_wkb(data):
60+
if data is None:
61+
return None
62+
return wkb.loads(bytes(data))
63+
64+
_deserialize_from_wkb.__safe_for_unpickling__ = True
65+
66+
# Spark expects a private link to the UDT representation of the class
67+
Point.__UDT__ = PointUDT()
68+
MultiPoint.__UDT__ = MultiPointUDT()
69+
LineString.__UDT__ = LineStringUDT()
70+
MultiLineString.__UDT__ = MultiLineStringUDT()
71+
Polygon.__UDT__ = PolygonUDT()
72+
MultiPolygon.__UDT__ = MultiPolygonUDT()
73+
BaseGeometry.__UDT__ = GeometryUDT()
74+
75+
# make Geometry dumps a little cleaner
76+
BaseGeometry.__repr__ = BaseGeometry.__str__
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from setuptools import setup, find_packages
2+
3+
setup(
4+
name='geowave_pyspark',
5+
version='${project.version}',
6+
url='https://locationtech.github.io/geowave/',
7+
packages=find_packages(),
8+
install_requires=['pytz', 'shapely', 'pyspark>=2.1.1,<2.3.1']
9+
)

analytics/spark/src/main/java/mil/nga/giat/geowave/analytic/spark/GeoWaveSparkConf.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.Serializable;
44

5+
import mil.nga.giat.geowave.analytic.spark.sparksql.GeoWaveSpatialEncoders;
56
import org.apache.spark.SparkConf;
67
import org.apache.spark.sql.SparkSession;
78
import org.apache.spark.sql.SparkSession.Builder;
@@ -46,17 +47,19 @@ public static SparkConf applyDefaultsToConfig(
4647
// Create a default SparkSession with GeoWave settings applied to config.
4748
public static SparkSession createDefaultSession() {
4849
SparkConf defaultConfig = GeoWaveSparkConf.getDefaultConfig();
49-
return createDefaultSession(defaultConfig);
50+
return GeoWaveSparkConf.internalCreateSession(
51+
defaultConfig,
52+
null);
5053
}
5154

5255
// Create a SparkSession with GeoWave settings and then user configuration
5356
// options added on top of defaults.
5457
public static SparkSession createDefaultSession(
5558
final SparkConf addonOptions ) {
5659
SparkConf defaultConfig = GeoWaveSparkConf.getDefaultConfig();
57-
return SparkSession.builder().config(
58-
defaultConfig).config(
59-
addonOptions).getOrCreate();
60+
return GeoWaveSparkConf.internalCreateSession(
61+
defaultConfig,
62+
addonOptions);
6063
}
6164

6265
// Create a SparkSession from default config with additional options, if
@@ -73,20 +76,16 @@ public static SparkSession createSessionFromParams(
7376
master = "yarn";
7477
}
7578

76-
// Create initial SessionBuilder from default Configuration.
77-
Builder builder = SparkSession.builder().config(
78-
defaultConfig);
79-
8079
// Apply user options if set, correctly handling host for yarn.
8180
if (appName != null) {
82-
builder = builder.appName(appName);
81+
defaultConfig = defaultConfig.setAppName(appName);
8382
}
8483
if (master != null) {
85-
builder = builder.master(master);
84+
defaultConfig = defaultConfig.setMaster(master);
8685
}
8786
if (host != null) {
8887
if (master != null && master != "yarn") {
89-
builder = builder.config(
88+
defaultConfig = defaultConfig.set(
9089
"spark.driver.host",
9190
host);
9291
}
@@ -97,12 +96,33 @@ public static SparkSession createSessionFromParams(
9796
}
9897

9998
if (jars != null) {
100-
builder = builder.config(
99+
defaultConfig = defaultConfig.set(
101100
"spark.jars",
102101
jars);
103102
}
104103

105104
// Finally return the session from builder
105+
return GeoWaveSparkConf.internalCreateSession(
106+
defaultConfig,
107+
null);
108+
}
109+
110+
private static SparkSession internalCreateSession(
111+
SparkConf conf,
112+
SparkConf addonOptions ) {
113+
114+
// Create initial SessionBuilder from default Configuration.
115+
Builder builder = SparkSession.builder().config(
116+
conf);
117+
118+
// Ensure SpatialEncoders and UDTs are registered at each session
119+
// creation.
120+
GeoWaveSpatialEncoders.registerUDTs();
121+
122+
if (addonOptions != null) {
123+
builder = builder.config(addonOptions);
124+
}
125+
106126
return builder.getOrCreate();
107127
}
108128

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package mil.nga.giat.geowave.analytic.spark.sparksql;
2+
3+
import com.vividsolutions.jts.geom.*;
4+
import mil.nga.giat.geowave.analytic.spark.sparksql.udt.*;
5+
import org.apache.spark.sql.types.UDTRegistration;
6+
7+
/**
8+
* Created by jwileczek on 7/24/18.
9+
*/
10+
public class GeoWaveSpatialEncoders
11+
{
12+
13+
public static GeometryUDT geometryUDT = new GeometryUDT();
14+
public static PointUDT pointUDT = new PointUDT();
15+
public static LineStringUDT lineStringUDT = new LineStringUDT();
16+
public static PolygonUDT polygonUDT = new PolygonUDT();
17+
public static MultiPointUDT multiPointUDT = new MultiPointUDT();
18+
public static MultiPolygonUDT multiPolygonUDT = new MultiPolygonUDT();
19+
20+
public static void registerUDTs() {
21+
UDTRegistration.register(
22+
Geometry.class.getCanonicalName(),
23+
GeometryUDT.class.getCanonicalName());
24+
UDTRegistration.register(
25+
Point.class.getCanonicalName(),
26+
PointUDT.class.getCanonicalName());
27+
UDTRegistration.register(
28+
LineString.class.getCanonicalName(),
29+
LineStringUDT.class.getCanonicalName());
30+
UDTRegistration.register(
31+
Polygon.class.getCanonicalName(),
32+
PolygonUDT.class.getCanonicalName());
33+
34+
UDTRegistration.register(
35+
MultiLineString.class.getCanonicalName(),
36+
MultiLineStringUDT.class.getCanonicalName());
37+
UDTRegistration.register(
38+
MultiPoint.class.getCanonicalName(),
39+
MultiPointUDT.class.getCanonicalName());
40+
UDTRegistration.register(
41+
MultiPolygon.class.getCanonicalName(),
42+
MultiPolygonUDT.class.getCanonicalName());
43+
44+
}
45+
}

analytics/spark/src/main/java/mil/nga/giat/geowave/analytic/spark/sparksql/SimpleFeatureDataFrame.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,20 @@
11
package mil.nga.giat.geowave.analytic.spark.sparksql;
22

3-
import org.apache.spark.api.java.JavaPairRDD;
3+
import mil.nga.giat.geowave.analytic.spark.sparksql.udf.GeomFunctionRegistry;
44
import org.apache.spark.api.java.JavaRDD;
55
import org.apache.spark.sql.Dataset;
66
import org.apache.spark.sql.Row;
77
import org.apache.spark.sql.SparkSession;
88
import org.apache.spark.sql.types.StructType;
9-
import org.opengis.feature.simple.SimpleFeature;
109
import org.opengis.feature.simple.SimpleFeatureType;
1110
import org.slf4j.Logger;
1211
import org.slf4j.LoggerFactory;
1312

1413
import mil.nga.giat.geowave.adapter.vector.util.FeatureDataUtils;
1514
import mil.nga.giat.geowave.analytic.spark.GeoWaveRDD;
16-
import mil.nga.giat.geowave.analytic.spark.sparksql.udf.wkt.GeomFunctionRegistry;
1715
import mil.nga.giat.geowave.analytic.spark.sparksql.util.SchemaConverter;
1816
import mil.nga.giat.geowave.core.index.ByteArrayId;
1917
import mil.nga.giat.geowave.core.store.cli.remote.options.DataStorePluginOptions;
20-
import mil.nga.giat.geowave.mapreduce.input.GeoWaveInputKey;
2118

2219
public class SimpleFeatureDataFrame
2320
{

0 commit comments

Comments
 (0)