-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[go function] support localrun and cluster mode for go function #4174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
sijie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy great move for supporting go function!
overall looks pretty good. left a few suggestions. PTAL.
It would be good to have an integration test for go function since now it supports running in cluster mode.
| @Parameter( | ||
| names = "--go", | ||
| description = "Path to the main Go file for the function (if the function is written in Go)") | ||
| protected String goFile; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think we are beginning having more and more branches in the pulsar runtime. I guess it might be the time to do a refactor to make the language runtime pluggable in Pulsar Function. So that we don't need to touch the runtime each time we introduce a new language. You don't need to do it right now. but It might be worth creating a task to track that.
| protected String pyFile; | ||
| @Parameter( | ||
| names = "--go", | ||
| description = "Path to the main Go file for the function (if the function is written in Go)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is the go file. It should be the built go program.
| if (isNotBlank(functionConfig.getJar()) && isNotBlank(functionConfig.getPy())) { | ||
| throw new ParameterException("Either a Java jar or a Python file needs to" | ||
| if (isNotBlank(functionConfig.getJar()) && isNotBlank(functionConfig.getPy()) && isNotBlank(functionConfig.getGo())) { | ||
| throw new ParameterException("Either a Java jar or a Python file or a Go file needs to" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest using "Go executable binary" rather than "Go file". A "Go file" is a bit confusing, since it typically means the source files suffixed with .go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, in here, we uploaded a source files suffixed with .go.
args.add("go");
args.add("run");
args.add(originalCodeFileName);
We use go run to run this go file, go run can be divided into two steps:
- build
.gofile is an executable binary - run this executable binary
In this way, the user only needs to upload the .go file, and it is not necessary to upload a compiled binary file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I forgot adding the comment there. I don't think we should use "go run", because not every machine installed go. so I would prefer uploading the executable binary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome, will fix it
pulsar-function-go/conf/conf.go
Outdated
| Tenant string `yaml:"tenant"` | ||
| NameSpace string `yaml:"nameSpace"` | ||
| Name string `yaml:"name"` | ||
| ClassName string `yaml:"className"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think Go needs ClassName.
Go has a completely different way to run functions than Java and Python.
In Java or Python, the instance code is isolated from user code since they support dynamically loading the code. So you need two parameters: --jar or --py to specify whether the function code; and --className to tell the function runtime to the entrypoint (aka className) to invoke the function.
However Go is a static-linking language. The instance is compiled with user function. There is only one entrypoint (which is the main func) for invoking the user function. Hence we only need the code file for go but we don't need a className. So I would suggest not adding a field if it is not used.
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pulsar.functions.instance.functionforgo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just calling this package "org.apache.pulsar.functions.instance.go"? That means it is the package related to go instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent, go is a better choice than functionforgo, will fix it.
| return ClearTextSecretsProvider.class.getName(); | ||
| case PYTHON: | ||
| return "secretsprovider.ClearTextSecretsProvider"; | ||
| case GO: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new UnsupportedOperationException if a feature is not implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new UnsupportedOperationException();
| case PYTHON: | ||
| return "secretsprovider.EnvironmentBasedSecretsProvider"; | ||
| case GO: | ||
| return ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new UnsupportedOperationException if a feature is not implemented.
| } | ||
|
|
||
| if (functionConfig.getWindowConfig() != null) { | ||
| throw new IllegalArgumentException("There is currently no support windowing in golang"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| throw new IllegalArgumentException("There is currently no support windowing in golang"); | |
| throw new IllegalArgumentException("Windowing is not supported in Go function yet"); |
|
|
||
| private static void doGolangChecks(FunctionConfig functionConfig) { | ||
| if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { | ||
| throw new RuntimeException("Effectively-once processing guarantees not yet supported in Golang"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| throw new RuntimeException("Effectively-once processing guarantees not yet supported in Golang"); | |
| throw new RuntimeException("Effectively-once processing guarantees not yet supported in Go function"); |
| } | ||
|
|
||
| if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) { | ||
| throw new IllegalArgumentException("Message retries not yet supported in golang"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| throw new IllegalArgumentException("Message retries not yet supported in golang"); | |
| throw new IllegalArgumentException("Message retries not yet supported in Go function"); |
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
pom.xml
Outdated
| <version>1.48</version> | ||
| </dependency> | ||
|
|
||
| <dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about using Jackson instead
| import java.io.BufferedReader; | ||
| import java.io.IOException; | ||
| import java.io.InputStreamReader; | ||
| import java.io.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't use wildcards
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, good suggestion
| classLoader = loadJar(file); | ||
| } | ||
| } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) { | ||
| userCodeFile = functionConfig.getGo(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also check for python runtime here and throw an exception if its not Java, Go, or python
| String output = yaml.dumpAs(goInstanceConfig, Tag.MAP, null); | ||
| String fileName = String.format("%s_%s_%s", goInstanceConfig.getTenant(), goInstanceConfig.getNameSpace(), | ||
| goInstanceConfig.getName()); | ||
| File ymlFile = File.createTempFile(fileName, ".yml"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file needs to be cleaned somehow.
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
|
@jerrypeng PTAL again thanks |
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
|
@sijie @jerrypeng PTAL again |
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
|
run java8 tests |
| goInstanceConfig.setKillAfterIdleMs(0); | ||
|
|
||
| // Parse the contents of goInstanceConfig into json form string | ||
| ObjectMapper objectMapper = new ObjectMapper(new JsonFactory()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use ObjectMapperFactory.getThreadLocal() instead of creating a new ObjectMapper
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
sijie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks great! left one mintor comment - UnsupportedOperationException should be thrown when a method is not implemented.
| return ClearTextSecretsProvider.class.getName(); | ||
| case PYTHON: | ||
| return "secretsprovider.ClearTextSecretsProvider"; | ||
| case GO: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new UnsupportedOperationException();
|
run java8 tests |
| if (StringUtils.isNotEmpty(extraDependenciesDir)) { | ||
| args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir); | ||
| } | ||
| } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this check is necessary
|
LGTM @srkukarni you want to take a look as well? |
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
|
run java8 tests |
1 similar comment
|
run java8 tests |
|
ping @srkukarni |
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com
Motivation
Master Issue: #3767
support local-run and cluster mode for go function.
in go function, we can use:
Different from
--jaror--py,--gouploads a complete executable file(including: instance file + user code file)