Skip to content

SaaS Crawler Module#5095

Merged
kkondaka merged 24 commits intoopensearch-project:mainfrom
san81:saas-sources-module
Oct 28, 2024
Merged

SaaS Crawler Module#5095
kkondaka merged 24 commits intoopensearch-project:mainfrom
san81:saas-sources-module

Conversation

@san81
Copy link
Copy Markdown
Collaborator

@san81 san81 commented Oct 21, 2024

Description

Introducing SaaS Source Plugins module and a base Jira Source plugin class

Issues Resolved

Resolves #4754

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

san81 added 2 commits October 21, 2024 12:04
…odule for all of the gradle sources

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
san81 and others added 8 commits October 21, 2024 12:32
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
full test coverage for base folder, spotless fixes
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 changed the title Saas sources module Saas Crawler Module Oct 22, 2024
@san81 san81 changed the title Saas Crawler Module SaaS Crawler Module Oct 22, 2024
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.mashape.unirest:unirest-java:1.4.9'
implementation 'com.google.code.gson:gson:2.8.9'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this dependency? Please remove if possible.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed them

implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'

testImplementation platform('org.junit:junit-bom:5.10.0')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need either of these two lines.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed them

enabled = false
}

repositories {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this block. You don't need it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


@Named
public class SaasPluginExecutorServiceProvider {
Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be private static final

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed it

… the review input

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Iterator<ItemInfo> itemInfoIterator = client.listItems();
log.info("Starting to crawl the source");
long updatedPollTime = 0;
log.info("Creating Partitions");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a log of logs throughout the PR that are info logs that look like they should be debug

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted the log level for some and removed a few log statements.

updatedPollTime = Math.max(updatedPollTime, niUpdated);
log.info("updated poll time {}", updatedPollTime);
}
createPartition(itemInfoList, coordinator);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we passing a full list to this method? Can we just create each partition inline or do they all need to be stored first?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are passing maxItemsPerPage number of items to this method. i.e. the page size in our paginated crawling. All the items in this page will go into one partition (or a work item). Like a partition per page.

} else {
// Unable to acquire other partitions.
// Probably we will introduce Global state in the future but for now, we don't expect to reach here.
throw new RuntimeException("Unable to acquire other partitions. " +
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe print out the partitionType here if we get to this point.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the exception message.

if(leaderPartition != null) {
// Extend the timeout
// will always be a leader until shutdown
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should catch exceptions that can come from this call so your thread doesn't shut down.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapped this statement around try catch now. I don't see that this method is throwing any exception though!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't most of the time but it can. This was a bug in Dynamo at one time (#4850)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dynamo store hit a 5xx

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for clarifying 👍

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 requested a review from sb2k16 as a code owner October 22, 2024 22:40
san81 added 2 commits October 22, 2024 16:21
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 requested a review from dlvenable October 23, 2024 00:18
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
graytaylor0
graytaylor0 previously approved these changes Oct 23, 2024
this.buffer = buffer;

boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition());
log.info("Leader partition creation status: {}", isPartitionCreated);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will result in one of these logs whenever a new data prepper instance starts

Leader partition creation status: false

and isn't really helpful. Can be debug

processPartition(partition.get(), buffer, sourceConfig);

} else {
log.info("No partition available. Going to Sleep for a while ");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may also be a little noisy. Maybe a metric tracking this would be better?

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
san81 added 2 commits October 23, 2024 11:00
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Iterator<ItemInfo> listItems();


void setLastPollTime(long lastPollTime);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc comments for all API

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added 👍

@@ -0,0 +1,81 @@
package org.opensearch.dataprepper.plugins.source.saas_crawler.base;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you not see a need for common interface for all crawlers? I was thinking there would be an interface with some default implementation and so on.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crawler relay on a source plugin specific iterator implementation and dispatch the work to source plugin specific client implementation. Crawler itself has generic logic for pagination.

}
itemInfoList.add(nextItem);
Map<String, String> metadata = nextItem.getMetadata();
long niCreated = Long.parseLong(metadata.get(CREATED)!=null? metadata.get(CREATED):"0");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the fallback value of 0 or current time?

createPartition(itemInfoList, coordinator);
}while (itemInfoIterator.hasNext());
log.debug("Crawling completed in {} ms", System.currentTimeMillis() - startTime);
return updatedPollTime != 0 ? updatedPollTime : startTime;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, looks like it is falling back to current time here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

private final Crawler crawler;


@DataPrepperPluginConstructor
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think you use @DataPrepperPluginConstructor for abstract source classes. see ./data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpSource.java

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. No use of this annotation here. Removed it.


@Override
public boolean areAcknowledgementsEnabled() {
return Source.super.areAcknowledgementsEnabled();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a good idea. This should be left to the derived classes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Removed this method. Each source plugin will implement their own version.

* contents itself which can be used to apply regex filtering, change data capture etc. general
* assumption here is that fetching metadata should be faster than fetching entire Item
*/
Map<String, String> metadata;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to make it Map<String, Object>, it is probably unrealistic to expect all of metadata values to be Strings.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering each source may have different needs, agree to make it more generic. Converted the map as suggested.

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
* JiraConnector connector entry point.
*/

public abstract class SaasSourcePlugin implements Source<Record<Event>>, UsesEnhancedSourceCoordination {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also rename this to SourceCrawler.

Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @san81 for the improvements. This is very good. I think just a few more changes and I'll be good to go.

executorService.submit(workerScheduler);

//Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute
Thread.sleep(11000);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to lower this sleep.

executorService.submit(workerScheduler);

//Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute
Thread.sleep(11000);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to lower this sleep.

@san81 san81 requested a review from dlvenable October 25, 2024 21:26
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with getting the other changes in a follow-on PR.

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

converting last_poll_time to java Instant type

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

we are now capturing Crawling times

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

ItemInfo long timestamp is now using Instant type

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

addressing review comments

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

Instant conversion

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

addressing review comments

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

code formatting

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

removed long polling by enabling setter on the leader scheduler timer

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

reducing wait times

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 force-pushed the saas-sources-module branch from 0087491 to 39d98e7 Compare October 25, 2024 23:46
@kkondaka kkondaka merged commit 675864d into opensearch-project:main Oct 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Jira Connector - to seamlessly sync all the ticket details to OpenSearch

5 participants