-
Notifications
You must be signed in to change notification settings - Fork 23
feat: Streamed List Objects support ( consumer callback ) #252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…-list-objects-consumer
Feat/streamed list objects consumer
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughThis pull request adds streaming support to the OpenFGA Java SDK for the list objects API endpoint. New model classes, API methods, and client-level wrappers enable non-blocking, line-by-line streaming of results via CompletableFuture. Includes comprehensive unit and integration tests, and a working example project demonstrating the feature. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant Client as OpenFgaClient
participant API as StreamedListObjectsApi
participant HTTP as HttpClient
participant Parser as NDJSON Parser
User->>Client: streamedListObjects(request, consumer)
Client->>Client: validate & build ListObjectsRequest
Client->>API: new StreamedListObjectsApi()
Client->>API: streamedListObjects(storeId, body, consumer, errorConsumer)
API->>HTTP: sendAsync(POST /stores/{id}/streamed-list-objects)
rect rgb(200, 220, 240)
Note over HTTP,Parser: Streaming Response
HTTP-->>API: HttpResponse (NDJSON body stream)
loop for each line
API->>Parser: processLine(line)
alt contains result
Parser->>Consumer: accept(object)
else contains error
Parser->>ErrorConsumer: accept(exception)
end
end
end
API-->>Client: CompletableFuture<Void> (completes when stream ends)
Client-->>User: CompletableFuture<Void>
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Tip 📝 Customizable high-level summaries are now available in beta!You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.
Example:
Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is ❌ Your project status has failed because the head coverage (36.89%) is below the target coverage (80.00%). You can increase the head coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #252 +/- ##
============================================
+ Coverage 36.47% 36.89% +0.42%
- Complexity 1144 1187 +43
============================================
Files 188 192 +4
Lines 7192 7410 +218
Branches 824 854 +30
============================================
+ Hits 2623 2734 +111
- Misses 4462 4553 +91
- Partials 107 123 +16 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements streaming support for the ListObjects API in the OpenFGA Java SDK, allowing objects to be processed asynchronously as they arrive from the server.
Key Changes
- Adds
StreamedListObjectsApifor handling NDJSON streaming responses - Implements
streamedListObjectsmethods inOpenFgaClientwith consumer-based callbacks - Includes comprehensive unit and integration tests for streaming functionality
Reviewed Changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
StreamedListObjectsApi.java |
New API layer handling asynchronous streaming with HttpClient and consumer callbacks |
OpenFgaClient.java |
Added client-level methods for streaming with various overloads for options and error handling |
ClientStreamedListObjectsOptions.java |
New options class for configuring streaming requests |
StreamedListObjectsResponse.java |
Model class for individual streamed object responses |
StreamResultOfStreamedListObjectsResponse.java |
Wrapper model for stream results containing either data or errors |
StreamedListObjectsTest.java |
Comprehensive unit tests covering success, errors, and edge cases |
OpenFgaClientIntegrationTest.java |
Integration tests validating streaming with real data |
StreamedListObjectsExample.java |
Example demonstrating streaming usage |
Comments suppressed due to low confidence (2)
src/main/java/dev/openfga/sdk/api/OpenFgaApi.java:947
- Redundant call to 'toString' on a String object.
.replace("{store_id}", StringUtil.urlEncode(storeId.toString()));
src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java:138
- Redundant call to 'toString' on a String object.
.replace("{store_id}", StringUtil.urlEncode(storeId.toString()));
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java
Show resolved
Hide resolved
src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java
Show resolved
Hide resolved
src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java
Show resolved
Hide resolved
src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
examples/streamed-list-objects/README.md (1)
61-74: Add language specification to the code fence.The code fence showing example output should specify a language identifier for better rendering and to satisfy linting rules.
Apply this diff:
-``` +```text Created temporary store (01HXXX...) Created temporary authorization model (01GXXX...) Writing 100 mock tuples to store.examples/streamed-list-objects/Makefile (1)
1-2: Declareallas a phony target.
allis a synthetic aggregator; if a file namedallever shows up, make will skip rebuilding. Put it in the phony list to prevent that and quiet the checkmake warning.Apply this diff:
-.PHONY: build run run-openfga +.PHONY: all build run run-openfgasrc/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java (1)
1155-1203: Fail fast when callbacks are missing.The streaming contract relies on
consumer, yet we accept null and only discover it later via a deferredNullPointerExceptionwhen the first object arrives. Please guard this parameter up front (and optionallyerrorConsumer) so the API fails deterministically at the call site.@@ - public CompletableFuture<Void> streamedListObjects( + public CompletableFuture<Void> streamedListObjects( ClientListObjectsRequest request, ClientStreamedListObjectsOptions options, Consumer<String> consumer, Consumer<Throwable> errorConsumer) throws FgaInvalidParameterException { + Objects.requireNonNull(consumer, "consumer must not be null"); configuration.assertValid(); @@ - StreamedListObjectsApi streamingApi = new StreamedListObjectsApi(configuration, apiClient); + StreamedListObjectsApi streamingApi = new StreamedListObjectsApi(configuration, apiClient);(Don’t forget to add
import java.util.Objects;at the top.)src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java (1)
158-167: Consider asynchronous stream processing to avoid blocking.The current implementation uses
forEach()on the stream withinthenCompose(), which blocks the HttpClient's executor thread while processing all lines. For large response streams, this could tie up the thread pool.Consider using
thenComposeAsync()or processing the stream asynchronously:- .thenCompose(response -> { + .thenComposeAsync(response -> { // Check response status int statusCode = response.statusCode(); if (statusCode < 200 || statusCode >= 300) {Alternatively, if you need to maintain sequential processing on the calling thread, the current implementation is acceptable but may impact throughput under load.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
.openapi-generator/FILES(2 hunks)README.md(2 hunks)docs/OpenFgaApi.md(2 hunks)docs/StreamResultOfStreamedListObjectsResponse.md(1 hunks)docs/StreamedListObjectsResponse.md(1 hunks)examples/streamed-list-objects/Makefile(1 hunks)examples/streamed-list-objects/README.md(1 hunks)examples/streamed-list-objects/build.gradle(1 hunks)examples/streamed-list-objects/gradle.properties(1 hunks)examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties(1 hunks)examples/streamed-list-objects/gradlew(1 hunks)examples/streamed-list-objects/gradlew.bat(1 hunks)examples/streamed-list-objects/settings.gradle(1 hunks)examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java(1 hunks)src/main/java/dev/openfga/sdk/api/OpenFgaApi.java(2 hunks)src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java(1 hunks)src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java(1 hunks)src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java(1 hunks)src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java(1 hunks)src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java(1 hunks)src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java(2 hunks)src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-19T19:43:18.975Z
Learnt from: jimmyjames
Repo: openfga/java-sdk PR: 211
File: examples/opentelemetry/src/main/java/dev/openfga/sdk/example/opentelemetry/OpenTelemetryExample.java:8-10
Timestamp: 2025-08-19T19:43:18.975Z
Learning: The examples/opentelemetry/src/main/java/dev/openfga/sdk/example/opentelemetry/OpenTelemetryExample.java file will be generated from mustache templates, so the auto-generation header is correct and should be retained.
Applied to files:
examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.javaexamples/streamed-list-objects/build.gradle
🧬 Code graph analysis (7)
src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java (1)
src/main/java/dev/openfga/sdk/api/client/model/ClientTupleKey.java (1)
ClientTupleKey(9-69)
examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java (7)
src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java (1)
OpenFgaClient(23-1412)src/main/java/dev/openfga/sdk/api/client/model/ClientListObjectsRequest.java (1)
ClientListObjectsRequest(5-64)src/main/java/dev/openfga/sdk/api/client/model/ClientTupleKey.java (1)
ClientTupleKey(9-69)src/main/java/dev/openfga/sdk/api/client/model/ClientWriteRequest.java (1)
ClientWriteRequest(5-34)src/main/java/dev/openfga/sdk/api/configuration/ClientConfiguration.java (1)
ClientConfiguration(8-137)src/main/java/dev/openfga/sdk/api/configuration/ClientCredentials.java (1)
ClientCredentials(7-67)src/main/java/dev/openfga/sdk/api/configuration/Credentials.java (1)
Credentials(5-57)
src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java (6)
src/main/java/dev/openfga/sdk/util/StringUtil.java (1)
StringUtil(12-51)src/main/java/dev/openfga/sdk/util/Validation.java (1)
Validation(7-15)src/main/java/dev/openfga/sdk/api/client/ApiClient.java (1)
ApiClient(36-327)src/main/java/dev/openfga/sdk/api/configuration/Configuration.java (1)
Configuration(21-326)src/main/java/dev/openfga/sdk/errors/ApiException.java (1)
ApiException(6-94)src/main/java/dev/openfga/sdk/errors/FgaInvalidParameterException.java (1)
FgaInvalidParameterException(3-19)
src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java (6)
src/main/java/dev/openfga/sdk/api/client/model/ClientListObjectsRequest.java (1)
ClientListObjectsRequest(5-64)src/main/java/dev/openfga/sdk/api/client/model/ClientStreamedListObjectsOptions.java (1)
ClientStreamedListObjectsOptions(24-56)src/main/java/dev/openfga/sdk/api/configuration/ClientConfiguration.java (1)
ClientConfiguration(8-137)src/main/java/dev/openfga/sdk/api/configuration/Credentials.java (1)
Credentials(5-57)src/main/java/dev/openfga/sdk/constants/FgaConstants.java (1)
FgaConstants(19-104)src/main/java/dev/openfga/sdk/errors/FgaInvalidParameterException.java (1)
FgaInvalidParameterException(3-19)
src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java (1)
src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java (1)
JsonPropertyOrder(24-168)
src/main/java/dev/openfga/sdk/api/OpenFgaApi.java (2)
src/main/java/dev/openfga/sdk/util/StringUtil.java (1)
StringUtil(12-51)src/main/java/dev/openfga/sdk/telemetry/Attributes.java (1)
Attributes(19-204)
src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java (1)
src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java (1)
JsonPropertyOrder(26-139)
🪛 checkmake (0.2.2)
examples/streamed-list-objects/Makefile
[warning] 1-1: Missing required phony target "all"
(minphony)
[warning] 1-1: Missing required phony target "clean"
(minphony)
[warning] 1-1: Missing required phony target "test"
(minphony)
[warning] 2-2: Target "all" should be declared PHONY.
(phonydeclared)
🪛 Gitleaks (8.28.0)
src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java
[high] 31-31: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
🪛 markdownlint-cli2 (0.18.1)
examples/streamed-list-objects/README.md
61-61: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
docs/OpenFgaApi.md
2370-2370: Tables should be surrounded by blank lines
(MD058, blanks-around-tables)
2455-2455: Tables should be surrounded by blank lines
(MD058, blanks-around-tables)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: CodeQL analysis (java)
- GitHub Check: Test and Build OpenFGA (17)
- GitHub Check: Test and Build OpenFGA (11)
- GitHub Check: Test and Build OpenFGA (21)
- GitHub Check: Analyze (java)
🔇 Additional comments (23)
examples/streamed-list-objects/settings.gradle (1)
1-1: LGTM!Standard Gradle settings file correctly configuring the root project name.
examples/streamed-list-objects/gradle.properties (1)
1-1: LGTM!Standard Gradle property configuration.
.openapi-generator/FILES (1)
70-71: LGTM!The manifest correctly tracks the new streaming response models and their documentation.
Also applies to: 160-161
examples/streamed-list-objects/gradle/wrapper/gradle-wrapper.properties (1)
1-7: LGTM!Standard Gradle wrapper configuration with appropriate security settings (
validateDistributionUrl=true) and timeout.examples/streamed-list-objects/gradlew.bat (1)
1-89: LGTM!Standard Gradle wrapper script for Windows with proper Java detection and error handling.
examples/streamed-list-objects/README.md (2)
1-58: LGTM!The documentation is well-structured, providing clear prerequisites, configuration options, and usage instructions.
76-82: LGTM!The notes section clearly articulates the benefits of the streaming API approach.
docs/StreamResultOfStreamedListObjectsResponse.md (1)
1-14: LGTM!Clear documentation for the streaming response wrapper model. The optional
resultanderrorfields appropriately model the either/or nature of streaming responses.docs/StreamedListObjectsResponse.md (1)
1-14: LGTM!Clear and concise documentation for the individual streaming response object. The
_objectproperty naming appropriately avoids Java keyword conflicts.src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java (4)
33-37: LGTM!The constructor properly initializes all required fields and extracts the ObjectMapper from the ApiClient for JSON processing.
51-121: LGTM!The public API overloads provide a clean, fluent interface with sensible defaults. The progressive parameter addition pattern (consumer → consumer + configOverride → consumer + errorConsumer → all parameters) makes the API intuitive and flexible.
201-228: LGTM!The line processing logic correctly:
- Parses NDJSON lines into structured objects
- Handles stream-level errors with informative fallback messages for null/missing error data
- Delivers results to the consumer
- Gracefully catches and routes exceptions to the error consumer without failing the entire stream
The null-safety for error messages (lines 211-213) is particularly well done.
230-246: LGTM!The request builder correctly:
- Serializes the request body using ObjectMapper
- Delegates to ApiClient's static builder for proper configuration
- Applies request interceptors for consistent behavior with other API calls
- Handles exceptions by wrapping them in ApiException
src/test/java/dev/openfga/sdk/api/client/StreamedListObjectsTest.java (9)
41-61: LGTM!The test setup properly mocks all dependencies (HttpClient, ApiClient, OpenFgaClient) with appropriate stub behavior for the streaming API tests.
63-93: LGTM!This test validates the core streaming functionality: multiple NDJSON lines are streamed, consumed, and collected correctly. The assertions confirm all objects are received in order with a single HTTP call.
170-200: LGTM!Excellent test coverage for mixed success/error scenarios. The test correctly validates that:
- Results continue to be processed after encountering errors
- Errors are routed to the error consumer
- The stream doesn't fail on individual line errors
This is the right behavior for resilient streaming.
202-243: LGTM!Great edge case testing! This validates that null error messages and missing error data are handled gracefully without NPEs. The assertions confirm appropriate fallback messages are generated.
278-306: LGTM!This test validates that the consumer is invoked exactly once per streamed item, which is critical for ensuring no duplicates or dropped items. Testing with 100 items provides good confidence in the implementation.
308-337: LGTM!Excellent test demonstrating CompletableFuture composition! This validates that the streaming API properly integrates with Java's async primitives, allowing users to chain additional operations.
339-375: LGTM!This test ensures that additional headers from ClientStreamedListObjectsOptions are properly propagated through the HTTP client, which is essential for custom authentication, tracing, and other header-based concerns.
377-411: LGTM!Critical test ensuring that ApiException type information is preserved through the async error handling pipeline, not wrapped in CompletionException. This allows consumers to properly handle API-specific errors with status codes.
30-31: Static analysis false positive - test constant, not a secret.The Gitleaks tool flagged
DEFAULT_AUTH_MODEL_IDas a potential API key. However, this is clearly a test constant (indicated by theDEFAULT_prefix and usage in test setup), not an actual secret. Test fixtures commonly use fixed IDs that resemble real data formats.src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java (1)
1-168: LGTM! Auto-generated model class follows standard patterns.This auto-generated OpenAPI model class correctly implements:
- Jackson JSON serialization with proper annotations
- Fluent builder pattern with getters/setters
- Standard equals/hashCode/toString implementations
- URL query string encoding helpers
As indicated in the file header (line 8), this class should not be manually edited. The generated code follows best practices for model objects.
.../streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java
Outdated
Show resolved
Hide resolved
jimmyjames
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Left a couple comments, only potential issue I see straight away is if we are guarding against a null consumer param. Otherwise adding some example code to the javadocs would be great but not as large a concern.
Description
API model is generated using openfga/sdk-generator#656
What problem is being solved?
The Java SDK currently lacks support for the
streamedListObjectsAPI endpoint, which prevents developers from efficiently processing large lists of objects in an asynchronous, streaming manner. This limitation can cause memory issues and performance bottlenecks when dealing with large result sets.How is it being solved?
This PR implements support for the
streamedListObjectsAPI endpoint using a consumer callback pattern. The implementation allows clients to process objects as they are streamed from the server, rather than waiting for the entire response to be loaded into memory.What changes are made to solve it?
streamedListObjectsAPI method with consumer callback support for asynchronous streamingReferences
Review Checklist
mainSummary by CodeRabbit
Release Notes
New Features
Documentation
Tests