|
6 | 6 | package org.opensearch.dataprepper.plugins.source.otellogs; |
7 | 7 |
|
8 | 8 | import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; |
| 9 | +import com.linecorp.armeria.server.HttpService; |
9 | 10 | import com.linecorp.armeria.server.encoding.DecodingService; |
10 | 11 | import org.opensearch.dataprepper.GrpcRequestExceptionHandler; |
11 | 12 | import org.opensearch.dataprepper.plugins.codec.CompressionOption; |
|
47 | 48 | import java.time.Duration; |
48 | 49 | import java.util.Collections; |
49 | 50 | import java.util.List; |
| 51 | +import java.util.Optional; |
50 | 52 | import java.util.concurrent.ExecutionException; |
51 | 53 | import java.util.concurrent.Executors; |
| 54 | +import java.util.function.Function; |
52 | 55 |
|
53 | 56 | @DataPrepperPlugin(name = "otel_logs_source", pluginType = Source.class, pluginConfigurationType = OTelLogsSourceConfig.class) |
54 | 57 | public class OTelLogsSource implements Source<Record<Object>> { |
@@ -144,6 +147,13 @@ public void start(Buffer<Record<Object>> buffer) { |
144 | 147 | } else { |
145 | 148 | sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator()); |
146 | 149 | } |
| 150 | + |
| 151 | + if (oTelLogsSourceConfig.getAuthentication() != null) { |
| 152 | + final Optional<Function<? super HttpService, ? extends HttpService>> optionalHttpAuthenticationService = |
| 153 | + authenticationProvider.getHttpAuthenticationService(); |
| 154 | + optionalHttpAuthenticationService.ifPresent(sb::decorator); |
| 155 | + } |
| 156 | + |
147 | 157 | sb.requestTimeoutMillis(oTelLogsSourceConfig.getRequestTimeoutInMillis()); |
148 | 158 | if(oTelLogsSourceConfig.getMaxRequestLength() != null) { |
149 | 159 | sb.maxRequestLength(oTelLogsSourceConfig.getMaxRequestLength().getBytes()); |
|
0 commit comments