SaaS Crawler Module#5095
Conversation
…odule for all of the gradle sources Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
full test coverage for base folder, spotless fixes
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
| implementation 'com.fasterxml.jackson.core:jackson-core' | ||
| implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
| implementation 'com.mashape.unirest:unirest-java:1.4.9' | ||
| implementation 'com.google.code.gson:gson:2.8.9' |
There was a problem hiding this comment.
Do we need this dependency? Please remove if possible.
data-prepper-plugins/saas-source-plugins/jira-source/build.gradle
Outdated
Show resolved
Hide resolved
| implementation 'org.projectlombok:lombok:1.18.30' | ||
| annotationProcessor 'org.projectlombok:lombok:1.18.30' | ||
|
|
||
| testImplementation platform('org.junit:junit-bom:5.10.0') |
There was a problem hiding this comment.
You don't need either of these two lines.
| enabled = false | ||
| } | ||
|
|
||
| repositories { |
There was a problem hiding this comment.
Please remove this block. You don't need it.
.../opensearch/dataprepper/plugins/source/saas/crawler/SaasCrawlerApplicationContextMarker.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Named | ||
| public class SaasPluginExecutorServiceProvider { | ||
| Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class); |
There was a problem hiding this comment.
This should be private static final
...ensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProvider.java
Outdated
Show resolved
Hide resolved
.../main/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasSourceConfig.java
Show resolved
Hide resolved
… the review input Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
| Iterator<ItemInfo> itemInfoIterator = client.listItems(); | ||
| log.info("Starting to crawl the source"); | ||
| long updatedPollTime = 0; | ||
| log.info("Creating Partitions"); |
There was a problem hiding this comment.
There are a log of logs throughout the PR that are info logs that look like they should be debug
There was a problem hiding this comment.
Adjusted the log level for some and removed a few log statements.
| updatedPollTime = Math.max(updatedPollTime, niUpdated); | ||
| log.info("updated poll time {}", updatedPollTime); | ||
| } | ||
| createPartition(itemInfoList, coordinator); |
There was a problem hiding this comment.
Why are we passing a full list to this method? Can we just create each partition inline or do they all need to be stored first?
There was a problem hiding this comment.
We are passing maxItemsPerPage number of items to this method. i.e. the page size in our paginated crawling. All the items in this page will go into one partition (or a work item). Like a partition per page.
| } else { | ||
| // Unable to acquire other partitions. | ||
| // Probably we will introduce Global state in the future but for now, we don't expect to reach here. | ||
| throw new RuntimeException("Unable to acquire other partitions. " + |
There was a problem hiding this comment.
Maybe print out the partitionType here if we get to this point.
There was a problem hiding this comment.
Added to the exception message.
| if(leaderPartition != null) { | ||
| // Extend the timeout | ||
| // will always be a leader until shutdown | ||
| coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); |
There was a problem hiding this comment.
Should catch exceptions that can come from this call so your thread doesn't shut down.
There was a problem hiding this comment.
Wrapped this statement around try catch now. I don't see that this method is throwing any exception though!
There was a problem hiding this comment.
It won't most of the time but it can. This was a bug in Dynamo at one time (#4850)
There was a problem hiding this comment.
Thank you for clarifying 👍
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
| this.buffer = buffer; | ||
|
|
||
| boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition()); | ||
| log.info("Leader partition creation status: {}", isPartitionCreated); |
There was a problem hiding this comment.
This will result in one of these logs whenever a new data prepper instance starts
Leader partition creation status: false
and isn't really helpful. Can be debug
| processPartition(partition.get(), buffer, sourceConfig); | ||
|
|
||
| } else { | ||
| log.info("No partition available. Going to Sleep for a while "); |
There was a problem hiding this comment.
This may also be a little noisy. Maybe a metric tracking this would be better?
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
| Iterator<ItemInfo> listItems(); | ||
|
|
||
|
|
||
| void setLastPollTime(long lastPollTime); |
There was a problem hiding this comment.
Please add javadoc comments for all API
| @@ -0,0 +1,81 @@ | |||
| package org.opensearch.dataprepper.plugins.source.saas_crawler.base; | |||
There was a problem hiding this comment.
Do you not see a need for common interface for all crawlers? I was thinking there would be an interface with some default implementation and so on.
There was a problem hiding this comment.
Crawler relay on a source plugin specific iterator implementation and dispatch the work to source plugin specific client implementation. Crawler itself has generic logic for pagination.
| } | ||
| itemInfoList.add(nextItem); | ||
| Map<String, String> metadata = nextItem.getMetadata(); | ||
| long niCreated = Long.parseLong(metadata.get(CREATED)!=null? metadata.get(CREATED):"0"); |
There was a problem hiding this comment.
Should the fallback value of 0 or current time?
| createPartition(itemInfoList, coordinator); | ||
| }while (itemInfoIterator.hasNext()); | ||
| log.debug("Crawling completed in {} ms", System.currentTimeMillis() - startTime); | ||
| return updatedPollTime != 0 ? updatedPollTime : startTime; |
There was a problem hiding this comment.
Oh, looks like it is falling back to current time here.
| private final Crawler crawler; | ||
|
|
||
|
|
||
| @DataPrepperPluginConstructor |
There was a problem hiding this comment.
I do not think you use @DataPrepperPluginConstructor for abstract source classes. see ./data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpSource.java
There was a problem hiding this comment.
Agree. No use of this annotation here. Removed it.
|
|
||
| @Override | ||
| public boolean areAcknowledgementsEnabled() { | ||
| return Source.super.areAcknowledgementsEnabled(); |
There was a problem hiding this comment.
Not a good idea. This should be left to the derived classes.
There was a problem hiding this comment.
Agree. Removed this method. Each source plugin will implement their own version.
| * contents itself which can be used to apply regex filtering, change data capture etc. general | ||
| * assumption here is that fetching metadata should be faster than fetching entire Item | ||
| */ | ||
| Map<String, String> metadata; |
There was a problem hiding this comment.
Probably better to make it Map<String, Object>, it is probably unrealistic to expect all of metadata values to be Strings.
There was a problem hiding this comment.
Considering each source may have different needs, agree to make it more generic. Converted the map as suggested.
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
...ensearch/dataprepper/plugins/source/saas_crawler/coordination/state/LeaderProgressState.java
Outdated
Show resolved
Hide resolved
...awler/src/main/java/org/opensearch/dataprepper/plugins/source/saas_crawler/base/Crawler.java
Outdated
Show resolved
Hide resolved
...ler/src/main/java/org/opensearch/dataprepper/plugins/source/saas_crawler/model/ItemInfo.java
Outdated
Show resolved
Hide resolved
...ler/src/main/java/org/opensearch/dataprepper/plugins/source/saas_crawler/model/ItemInfo.java
Outdated
Show resolved
Hide resolved
...ensearch/dataprepper/plugins/source/saas_crawler/base/SaasPluginExecutorServiceProvider.java
Outdated
Show resolved
Hide resolved
data-prepper-plugins/saas-source-plugins/saas-crawler/build.gradle
Outdated
Show resolved
Hide resolved
data-prepper-plugins/saas-source-plugins/saas-crawler/build.gradle
Outdated
Show resolved
Hide resolved
...ira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraSource.java
Show resolved
Hide resolved
data-prepper-plugins/saas-source-plugins/jira-source/build.gradle
Outdated
Show resolved
Hide resolved
| * JiraConnector connector entry point. | ||
| */ | ||
|
|
||
| public abstract class SaasSourcePlugin implements Source<Record<Event>>, UsesEnhancedSourceCoordination { |
There was a problem hiding this comment.
Let's also rename this to SourceCrawler.
...ensearch/dataprepper/plugins/source/saas_crawler/coordination/state/LeaderProgressState.java
Outdated
Show resolved
Hide resolved
| executorService.submit(workerScheduler); | ||
|
|
||
| //Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute | ||
| Thread.sleep(11000); |
There was a problem hiding this comment.
We still need to lower this sleep.
| executorService.submit(workerScheduler); | ||
|
|
||
| //Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute | ||
| Thread.sleep(11000); |
There was a problem hiding this comment.
We still need to lower this sleep.
dlvenable
left a comment
There was a problem hiding this comment.
I'm ok with getting the other changes in a follow-on PR.
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> converting last_poll_time to java Instant type Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> we are now capturing Crawling times Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> ItemInfo long timestamp is now using Instant type Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> addressing review comments Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Instant conversion Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> addressing review comments Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> code formatting Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> removed long polling by enabling setter on the leader scheduler timer Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> reducing wait times Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
0087491 to
39d98e7
Compare
Description
Introducing SaaS Source Plugins module and a base Jira Source plugin class
Issues Resolved
Resolves #4754
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.