Skip to content

Implement the Distributed Spatial Join Algorithm#1289

Merged
rfecher merged 1 commit intomasterfrom
spatial-join
Mar 22, 2018
Merged

Implement the Distributed Spatial Join Algorithm#1289
rfecher merged 1 commit intomasterfrom
spatial-join

Conversation

@JWileczek
Copy link
Copy Markdown
Contributor

Commits have not been squashed because I expect I may need to make a few changes before everything gets pulled in. Half expecting tests to fail because local testing is broken (see below).

A few pain points in this pull request that need to be addressed:

  1. After updating with master and cleaning my unit tests further I went to do a final run locally of them locally and ran into the Jackson dependency version issue. I couldn't remember if this was addressed in the recent changes on master or if it will be fixed after this PR.
  2. Currently implementing SpatialJoinRunner to abstract join selection logic and work into query runner and cli interface, but this is really work related to exposing the join to those various api's that will come in later user stories. Should this class be removed before the release since it isn't effectively usable/tested? I can stash it and add it in later requests when it is more fleshed out.
  3. Semi-related to above, There is a second join implementation using the Dataset api in spark that is moved into a experimental package within the project. This join works, but is less efficient, however the Dataset api is supposed to replace the RDD api in Spark, so there could be some value in pursuing this implementation further. I feel there is less pain keeping this implementation around in code for release because it does produce correct results just isn't the recommended one at the time. However it too can be removed and added at a later point.
  4. Not happy with the BucketOperation/BufferOperation interface for dealing with spatial operations that require buffering indices before the join (aka distance). I think it could potentially be done a cleaner way that I don't currently see. Could use a short discussion on the issue to see if a cleaner api can be made quickly.

Copy link
Copy Markdown
Contributor

@rfecher rfecher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see line comments

}

@Override
public Boolean call(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unnecessary


}
//Remove duplicates between tiers
//JavaPairRDD<GeoWaveInputKey, ByteArrayId> swappedResults = this.combinedResults.mapToPair(t -> t.swap()).reduceByKey((id1, id2) -> id1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented out code

//JavaPairRDD<GeoWaveInputKey, ByteArrayId> swappedResults = this.combinedResults.mapToPair(t -> t.swap()).reduceByKey((id1, id2) -> id1);
this.combinedResults = this.combinedResults.reduceByKey((id1,id2) -> id1);

//swappedResults.cache();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented out code

}
}

// List<ByteArrayId> insertIds =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented out code

//Cogroup groups on same tier ByteArrayId and pairs them into Iterable sets.
JavaPairRDD<ByteArrayId, Tuple2<Iterable<Tuple2<GeoWaveInputKey, Geometry>>, Iterable<Tuple2<GeoWaveInputKey, Geometry>>>> joinedTiers = leftTier.cogroup(rightTier, highestPartitionCount);
//Filter only the pairs that have data on both sides, bucket strategy should have been accounted for by this point.
//joinedTiers.cache();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented out code


GeomFunction predicate = geomPredicate.value();

//HashSet<Tuple2<GeoWaveInputKey, Geometry>> resultSet = Sets.newHashSet();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented out code

throw new ParameterException(
"HDFS Base path must start with forward slash /");
}
// if (!basePath.startsWith("/")) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented out code

throw new ParameterException(
"HDFS Base path must start with forward slash /");
}
// if (!basePath.startsWith("/")) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented out code

LOGGER.error("Async error in join");
e.printStackTrace();
}
// typedJoin.join(session, hailRDD, tornadoRDD, distancePredicate,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented out code

FeatureSerializer simpleSerializer = new FeatureSerializer();
PersistableSerializer persistSerializer = new PersistableSerializer();

kryo.register(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be easy to register all persistables using PersistableFactory.getInstance().getClassIdMapping().entrySet().forEach(e -> kryo.register(e.getKey, simpleSerializer, e.getValue())); in place of specific concrete class registration.


return new CustomIdIndex(
XZHierarchicalIndexFactory.createFullIncrementalTieredStrategy(
TieredSFCIndexFactory.createFullIncrementalTieredStrategy(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this change our indexing approach for everything

Copy link
Copy Markdown
Contributor

@rfecher rfecher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check out the comment

@rfecher rfecher merged commit 199660d into master Mar 22, 2018
@rfecher rfecher deleted the spatial-join branch March 22, 2018 14:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants