Skip to content

Commit 4886bdf

Browse files
authored
[CdapIO] Complete examples for CDAP Zendesk plugins (#24589)
* Add examples for Cdap Zendesk plugins * Move common classes to Examples Cdap module * Fix readme
1 parent 7d793c9 commit 4886bdf

12 files changed

Lines changed: 587 additions & 1 deletion

File tree

examples/java/cdap/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ Supported CDAP plugins:
2424
- [ServiceNow](https://github.com/data-integrations/servicenow-plugins). More info in the ServiceNow example [README](servicenow/src/main/java/org/apache/beam/examples/complete/cdap/servicenow/README.md).
2525
- [Salesforce](https://github.com/data-integrations/salesforce)
2626
- [Hubspot](https://github.com/data-integrations/hubspot)
27-
- [Zendesk](https://github.com/data-integrations/zendesk)
27+
- [Zendesk](https://github.com/data-integrations/zendesk). More info in the ServiceNow example [README](zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md).
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* License); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an AS IS BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import groovy.json.JsonOutput
20+
21+
plugins {
22+
id 'java'
23+
id 'org.apache.beam.module'
24+
id 'com.github.johnrengelman.shadow'
25+
}
26+
27+
applyJavaNature(
28+
exportJavadoc: false,
29+
automaticModuleName: 'org.apache.beam.examples.complete.cdap.zendesk',
30+
)
31+
32+
description = "Apache Beam :: Examples :: Java :: CDAP :: Zendesk"
33+
ext.summary = """Apache Beam SDK provides a simple, Java-based
34+
interface for processing virtually any size data. This
35+
artifact includes CDAP Zendesk Apache Beam Java SDK examples."""
36+
37+
/** Define the list of runners which execute a precommit test.
38+
* Some runners are run from separate projects, see the preCommit task below
39+
* for details.
40+
*/
41+
def preCommitRunners = ["directRunner", "flinkRunner"]
42+
for (String runner : preCommitRunners) {
43+
configurations.create(runner + "PreCommit")
44+
}
45+
46+
dependencies {
47+
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
48+
implementation project(path: ":sdks:java:core", configuration: "shadow")
49+
implementation project(":examples:java:cdap")
50+
implementation project(":sdks:java:io:cdap")
51+
implementation project(":sdks:java:io:hadoop-common")
52+
implementation library.java.cdap_api
53+
implementation library.java.cdap_api_commons
54+
implementation library.java.cdap_etl_api
55+
permitUnusedDeclared library.java.cdap_etl_api
56+
implementation library.java.cdap_hydrator_common
57+
implementation library.java.cdap_plugin_zendesk
58+
implementation library.java.hadoop_common
59+
implementation library.java.slf4j_api
60+
implementation library.java.vendored_guava_26_0_jre
61+
runtimeOnly project(path: ":runners:direct-java", configuration: "shadow")
62+
63+
// Add dependencies for the PreCommit configurations
64+
// For each runner a project level dependency on the examples project.
65+
for (String runner : preCommitRunners) {
66+
delegate.add(runner + "PreCommit", project(":examples:java:cdap:zendesk"))
67+
delegate.add(runner + "PreCommit", project(path: ":examples:java:cdap:zendesk", configuration: "testRuntimeMigration"))
68+
}
69+
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
70+
flinkRunnerPreCommit project(":runners:flink:${project.ext.latestFlinkVersion}")
71+
}
72+
73+
/*
74+
* Create a ${runner}PreCommit task for each runner which runs a set
75+
* of integration tests for WordCount and WindowedWordCount.
76+
*/
77+
def preCommitRunnerClass = [
78+
directRunner: "org.apache.beam.runners.direct.DirectRunner",
79+
flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner"
80+
]
81+
82+
for (String runner : preCommitRunners) {
83+
tasks.create(name: runner + "PreCommit", type: Test) {
84+
def preCommitBeamTestPipelineOptions = [
85+
"--runner=" + preCommitRunnerClass[runner],
86+
]
87+
classpath = configurations."${runner}PreCommit"
88+
forkEvery 1
89+
maxParallelForks 4
90+
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(preCommitBeamTestPipelineOptions)
91+
}
92+
}
93+
94+
/* Define a common precommit task which depends on all the individual precommits. */
95+
task preCommit() {
96+
for (String runner : preCommitRunners) {
97+
dependsOn runner + "PreCommit"
98+
}
99+
}
100+
101+
task executeCdap (type:JavaExec) {
102+
mainClass = System.getProperty("mainClass")
103+
classpath = sourceSets.main.runtimeClasspath
104+
systemProperties System.getProperties()
105+
args System.getProperty("exec.args", "").split()
106+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples.complete.cdap.zendesk;
19+
20+
import static org.apache.beam.examples.complete.cdap.zendesk.transforms.FormatInputTransform.readFromCdapZendesk;
21+
22+
import io.cdap.cdap.api.data.format.StructuredRecord;
23+
import java.util.Map;
24+
import org.apache.beam.examples.complete.cdap.utils.StructuredRecordUtils;
25+
import org.apache.beam.examples.complete.cdap.zendesk.options.CdapZendeskOptions;
26+
import org.apache.beam.examples.complete.cdap.zendesk.utils.PluginConfigOptionsConverter;
27+
import org.apache.beam.sdk.Pipeline;
28+
import org.apache.beam.sdk.PipelineResult;
29+
import org.apache.beam.sdk.coders.KvCoder;
30+
import org.apache.beam.sdk.coders.NullableCoder;
31+
import org.apache.beam.sdk.coders.SerializableCoder;
32+
import org.apache.beam.sdk.coders.StringUtf8Coder;
33+
import org.apache.beam.sdk.io.TextIO;
34+
import org.apache.beam.sdk.io.hadoop.WritableCoder;
35+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
36+
import org.apache.beam.sdk.transforms.MapValues;
37+
import org.apache.beam.sdk.transforms.Values;
38+
import org.apache.beam.sdk.values.TypeDescriptors;
39+
import org.apache.hadoop.io.NullWritable;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
/**
44+
* The {@link CdapZendeskToTxt} pipeline is a batch pipeline which ingests data in JSON format from
45+
* CDAP Zendesk, and outputs the resulting records to .txt file. Zendesk parameters and output txt
46+
* file path are specified by the user as template parameters. <br>
47+
*
48+
* <p><b>Example Usage</b>
49+
*
50+
* <pre>
51+
* # Gradle preparation
52+
*
53+
* To run this example your {@code build.gradle} file should contain the following task
54+
* to execute the pipeline:
55+
* {@code
56+
* task executeCdap (type:JavaExec) {
57+
* mainClass = System.getProperty("mainClass")
58+
* classpath = sourceSets.main.runtimeClasspath
59+
* systemProperties System.getProperties()
60+
* args System.getProperty("exec.args", "").split()
61+
* }
62+
* }
63+
*
64+
* This task allows to run the pipeline via the following command:
65+
* {@code
66+
* gradle clean executeCdap -DmainClass=org.apache.beam.examples.complete.cdap.zendesk.CdapZendeskToTxt \
67+
* -Dexec.args="--<argument>=<value> --<argument>=<value>"
68+
* }
69+
*
70+
* # Running the pipeline
71+
* To execute this pipeline, specify the parameters in the following format:
72+
* {@code
73+
* --zendeskBaseUrl=your-url \
74+
* --adminEmail=your-email \
75+
* --apiToken=your-token \
76+
* --objectsToPull=Groups \
77+
* --referenceName=your-reference-name \
78+
* --outputTxtFilePathPrefix=your-path-to-output-folder-with-filename-prefix
79+
* }
80+
*
81+
* By default this will run the pipeline locally with the DirectRunner. To change the runner, specify:
82+
* {@code
83+
* --runner=YOUR_SELECTED_RUNNER
84+
* }
85+
* </pre>
86+
*/
87+
public class CdapZendeskToTxt {
88+
89+
/* Logger for class.*/
90+
private static final Logger LOG = LoggerFactory.getLogger(CdapZendeskToTxt.class);
91+
92+
/**
93+
* Main entry point for pipeline execution.
94+
*
95+
* @param args Command line arguments to the pipeline.
96+
*/
97+
public static void main(String[] args) {
98+
CdapZendeskOptions options =
99+
PipelineOptionsFactory.fromArgs(args).withValidation().as(CdapZendeskOptions.class);
100+
101+
// Create the pipeline
102+
Pipeline pipeline = Pipeline.create(options);
103+
run(pipeline, options);
104+
}
105+
106+
/**
107+
* Runs a pipeline which reads message from CDAP and writes it to .txt file.
108+
*
109+
* @param options arguments to the pipeline
110+
*/
111+
public static PipelineResult run(Pipeline pipeline, CdapZendeskOptions options) {
112+
Map<String, Object> pluginConfigParams =
113+
PluginConfigOptionsConverter.zendeskOptionsToParamsMap(options);
114+
LOG.info("Starting Cdap-Zendesk-To-Txt pipeline with parameters: {}", pluginConfigParams);
115+
116+
/*
117+
* Steps:
118+
* 1) Read messages in from Cdap Zendesk
119+
* 2) Extract values only
120+
* 3) Write successful records to .txt file
121+
*/
122+
123+
pipeline
124+
.apply("readFromCdapZendesk", readFromCdapZendesk(pluginConfigParams))
125+
.setCoder(
126+
KvCoder.of(
127+
NullableCoder.of(WritableCoder.of(NullWritable.class)),
128+
SerializableCoder.of(StructuredRecord.class)))
129+
.apply(
130+
MapValues.into(TypeDescriptors.strings())
131+
.via(StructuredRecordUtils::structuredRecordToString))
132+
.setCoder(
133+
KvCoder.of(
134+
NullableCoder.of(WritableCoder.of(NullWritable.class)), StringUtf8Coder.of()))
135+
.apply(Values.create())
136+
.apply("writeToTxt", TextIO.write().to(options.getOutputTxtFilePathPrefix()));
137+
138+
return pipeline.run();
139+
}
140+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
Unless required by applicable law or agreed to in writing,
11+
software distributed under the License is distributed on an
12+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
KIND, either express or implied. See the License for the
14+
specific language governing permissions and limitations
15+
under the License.
16+
-->
17+
18+
## Gradle preparation
19+
20+
To run this example your `build.gradle` file should contain the following task to execute the pipeline:
21+
22+
```
23+
task executeCdap (type:JavaExec) {
24+
mainClass = System.getProperty("mainClass")
25+
classpath = sourceSets.main.runtimeClasspath
26+
systemProperties System.getProperties()
27+
args System.getProperty("exec.args", "").split()
28+
}
29+
```
30+
31+
## Running the CdapZendeskToTxt pipeline example
32+
33+
Gradle 'executeCdap' task allows to run the pipeline via the following command:
34+
35+
```bash
36+
gradle clean executeCdap -DmainClass=org.apache.beam.examples.complete.cdap.zendesk.CdapZendeskToTxt \
37+
-Dexec.args="--<argument>=<value> --<argument>=<value>"
38+
```
39+
40+
To execute this pipeline, specify the parameters in the following format:
41+
42+
```bash
43+
--zendeskBaseUrl=zendesk-url-key-followed-by-/%s/%s (example: https://support.zendesk.com/%s/%s) \
44+
--adminEmail=your-admin-admin-email \
45+
--apiToken=your-api-token \
46+
--subdomains=your-subdomains (example: api/v2) \
47+
--maxRetryCount=your-max-retry-count \
48+
--maxRetryWait=your-max-retry-wait \
49+
--maxRetryJitterWait=your-max-retry-jitter-wait \
50+
--connectTimeout=your-connection-timeout \
51+
--readTimeout=your-read-timeout \
52+
--objectsToPull=your-objects-to-pull (example: Groups) \
53+
--outputTxtFilePathPrefix=your-path-to-output-folder-with-filename-prefix
54+
```
55+
Please see CDAP [Zendesk Batch Source](https://github.com/data-integrations/zendesk/blob/develop/docs/Zendesk-batchsource.md) for more information.

0 commit comments

Comments
 (0)