Wire workflows fetching and add cache persistence#3508
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3508 +/- ##
==========================================
+ Coverage 79.91% 80.00% +0.08%
==========================================
Files 369 370 +1
Lines 14934 15040 +106
Branches 2058 2080 +22
==========================================
+ Hits 11934 12032 +98
Misses 2163 2163
- Partials 837 845 +8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Add three methods for caching workflows list response: - getWorkflowsListResponseCache(): retrieves cached response - cacheWorkflowsListResponse(): stores response - clearWorkflowsListResponseCache(): removes cached response Follows the same pattern as offerings response caching. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ager Adds DeviceCache and DateProvider dependencies to WorkflowManager, implements getWorkflowsList() with in-memory staleness check and prefetch triggering, and exposes workflowIdForOfferingId() for offering-to-workflow resolution. WorkflowPreWarmer typealias is retained pending Task 5 call-site cleanup. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… in OfferingsManager Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Ensure workflows list fetch fires after cacheOfferings() to match spec ordering — "after offerings are fetched and cached". Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Mirrors the offerings behaviour: if getWorkflows() fails, restore the offeringId → workflowId map from DeviceCache so the UI can still resolve the correct workflow to display. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ld config flag Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
149be93 to
179b7d0
Compare
- Add USE_WORKFLOWS_ENDPOINT guard inside getWorkflowsList itself - Add isFetchingWorkflowsList flag to prevent concurrent requests - Cache in-memory object after disk-cache restore to prevent re-fetch loop - Extract buildOfferingIdMap helper with warnLog on duplicate offeringId - Add tests for concurrent calls, disk-cache fallback staleness, duplicate offeringId Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Call clearWorkflowsListResponseCache in OfferingsCache.clearCache so the workflows disk cache is wiped on logout/user switch alongside offerings - Add stub for clearWorkflowsListResponseCache in OfferingsCacheTest - Add test: onAppForeground triggers getWorkflowsList when offerings are stale - Add test: getOfferings succeeds with workflowManager = null (flag disabled) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…Success
- Add onComplete: () -> Unit = {} to getWorkflowsList
- Concurrent callers while in-flight queue their onComplete and wait
for the single in-flight request rather than firing a second
- onComplete fires after list fetch AND all prefetch CDN fetches complete
(AtomicInteger counter drains when last prefetch workflow resolves)
- OfferingsManager delays onSuccess dispatch until getWorkflowsList
completes; falls through immediately when workflowManager is null
- Add tests: onComplete timing, prefetch wait, concurrent dedup,
getOfferings blocks until onComplete, null workflowManager fast path
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ot import Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Relaxed mock silently swallowed the onComplete callback, so dispatchSuccess
was never called and receivedOfferings was never delivered in any test that
goes through createAndCacheOfferings. Also fix four verify calls that omitted
onComplete, causing eq(lambda {}) to fail by reference against the actual
dispatchSuccess lambda.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ility - Drop CancellationException rethrow in getWorkflow: rethrowing skipped both onSuccess/onError, leaving the prefetch AtomicInteger counter un-decremented and drainCompletionCallbacks() unreachable whenever the scope was cancelled (e.g. during Purchases.configure() reconfiguration). The broad Exception catch now covers CancellationException too. - Add @volatile to InMemoryCachedObject.lastUpdatedAt so the pre-lock fast-path staleness check in getWorkflowsList always reads the latest written value across threads, preventing spurious redundant fetches. - Add WorkflowsListResponse encode→decode round-trip tests to guard against silent breakage if JsonTools.json config or serializers drift. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add WorkflowsCache as the single owner of in-memory workflow state: the resolved per-workflow WorkflowDataResults and the workflows list with its offeringId->workflowId map. getWorkflow now serves a fresh cached result without a backend round-trip, so a prefetched workflow is reused when a paywall opens instead of being re-fetched. The cache is cleared at the same identity transitions as the offerings cache (login, alias, switch/logout), so switching users drops the previous user's workflows and forces a re-fetch. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Each prefetched workflow kicks off CDN/asset downloads on Dispatchers.IO, so a long workflows list could fan out into an unbounded number of concurrent downloads. Gate the prefetch loop with a Semaphore so at most 4 workflows are prefetched at a time. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Workflow detail fetches are issued many-at-a-time when prefetching a workflows list, but Backend's default dispatcher is single-threaded, so they serialized. Add a concurrentDispatcher to Backend (defaults to the main dispatcher so other callers are unchanged) and route getWorkflow through it. PurchasesFactory backs it with a fixed pool of 4, matching the WorkflowManager prefetch concurrency cap, so prefetching a list fans out instead of running one detail call at a time. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…llelism Document that the prefetch cap is a Semaphore rather than a limitedParallelism dispatcher (as TopicFetcher uses) because a prefetch is suspended almost the whole time, so a dispatcher-level cap would bound nothing. Note the detail HTTP calls are separately capped by the Backend concurrent dispatcher pool. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Move the concurrent dispatcher out of Backend (where it was an all-or-nothing field defaulting to the serial dispatcher) and into WorkflowManager, which owns the prefetch use case. Backend.getWorkflow now takes an optional per-call callbackDispatcher, defaulting to the standard dispatcher; only the prefetch path passes the concurrent pool, so on-demand getWorkflow stays serial as before. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Filter out workflow summaries with a null offeringId up front in getWorkflowsList, so the cache (memory and disk) only holds offering-tied workflows. They can't be reached via workflowIdForOfferingId anyway, so caching or prefetching them is wasted work; this is a defensive guard on top of the backend's prefetch flag. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Workflows only had a time-based TTL, so a forced or locale-driven offerings refetch could leave workflowIdForOfferingId on a stale map until the workflows TTL expired. Force the workflows list stale before getWorkflowsList whenever offerings are freshly fetched from the network (skipped on the disk-cache fallback, where offerings did not change), keeping the workflow map aligned with the offerings the caller just received. The current map is retained until the refetch lands, so lookups still resolve in the interim. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The in-memory offerings cache-hit path delivered onSuccess without calling getWorkflowsList, so workflowIdForOfferingId could be null right after a successful getOfferings even though offerings were valid — unlike the network path, which gates success on workflows. Route the cache-hit success through getWorkflowsList too. It no-ops when the workflows list is already fresh (the common case, since offerings and workflows are fetched together and share a TTL), so it only fetches when the map is missing or stale. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
PurchasesOrchestrator.getWorkflow now routes every outcome through dispatch so the callback always lands on the main thread, matching the SDK's other callback APIs (getOfferings, getCustomerInfo). WorkflowManager.getWorkflow intentionally has no fixed delivery thread — a cache hit calls back synchronously on the caller's thread, a miss resolves on its IO scope, and the prefetch path routes detail callbacks onto a dedicated dispatcher. Normalizing at the orchestrator (the consumer boundary) gives callers, including awaitGetWorkflow, a stable thread without disturbing the prefetch plumbing. Adds the first orchestrator-level coverage for getWorkflow. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…atcher Backend.getWorkflow's callbackDispatcher is now non-nullable and defaults to the standard single-threaded dispatcher instead of null with an elvis fallback. WorkflowManager forwards its own dispatcher only when the prefetch path supplies one, otherwise letting the backend default apply. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 28d5104. Configure here.
…lism scope Move workflow prefetch concurrency limits to the dispatchers where the blocking work runs, instead of a pipeline-spanning Semaphore in WorkflowManager: - The detail fetch was already capped at 4 by prefetchDispatcher's thread pool, so the semaphore was redundant for it. - The CDN download now runs on a dedicated FileRepository instance whose ioScope uses Dispatchers.IO.limitedParallelism(4), capping workflow CDN downloads without affecting the FileRepository instances used elsewhere (images, video). - Remove the redundant withContext(Dispatchers.IO) in DefaultFileRepository.downloadFile so the injected dispatcher governs the whole download (no-op for existing callers, already on Dispatchers.IO). - Drop the WorkflowManager prefetchSemaphore and its now-stale comment. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
| fileRepository = DefaultFileRepository( | ||
| fileCacheManager = DefaultFileCache(contextForStorage, "rc_compiled_workflows"), | ||
| ioScope = CoroutineScope( | ||
| Dispatchers.IO.limitedParallelism(MAX_CONCURRENT_WORKFLOW_CDN_FETCHES) + |
There was a problem hiding this comment.
I don't know if this is too much and we should just not limit the CDN requests. I added paralelization to the getWorfklow backend calls but also here. What do you think @tonidero
There was a problem hiding this comment.
Hmm my main concern is that downloading too many things simultaneously can increase memory usage and actually slow down everything else... Might be early optimizing but I think having some limit is good 🙏 .
I guess we weren't limiting parallel downloads of paywall assets though... but I guess there could potentially be more offerings/workflows than assets in a paywall? I guess this could actually be either way 😅. So yeah, I don't feel strongly, but now that we have it, we might as well limit it a bit?
tonidero
left a comment
There was a problem hiding this comment.
This looks great to me. Thank you so much for all the iterations!! 🙇
| fileRepository = DefaultFileRepository( | ||
| fileCacheManager = DefaultFileCache(contextForStorage, "rc_compiled_workflows"), | ||
| ioScope = CoroutineScope( | ||
| Dispatchers.IO.limitedParallelism(MAX_CONCURRENT_WORKFLOW_CDN_FETCHES) + |
There was a problem hiding this comment.
Hmm my main concern is that downloading too many things simultaneously can increase memory usage and actually slow down everything else... Might be early optimizing but I think having some limit is good 🙏 .
I guess we weren't limiting parallel downloads of paywall assets though... but I guess there could potentially be more offerings/workflows than assets in a paywall? I guess this could actually be either way 😅. So yeah, I don't feel strongly, but now that we have it, we might as well limit it a bit?
| offeringFontPreDownloader = offeringFontPreDownloader, | ||
| ), | ||
| workflowsCache = it, | ||
| prefetchDispatcher = Dispatcher( |
There was a problem hiding this comment.
[Not related to this line]
I was thinking that, if it's still downloading the workflows, and the customer opens a specific workflow that hasn't been downloaded yet, we will hook to the existing callback and wait for that to finish right? I guess this means that it could potentially slow down loading if it's on a slow connection and needs to finish downloading other workflows...
Not sure if there would be a good way to prioritize the "on demand" workflows, but it's probably going to complicate things for an edge case, so I'm ok leaving as is for now.
## Summary - Adds `WorkflowSummary` and `WorkflowsListResponse` models (`id`, `display_name`, `offering_id`, `prefetch`) with kotlinx.serialization and `ignoreUnknownKeys` for forward compatibility - Adds `WorkflowJsonParser.parseWorkflowsListResponse()` - Adds `Endpoint.GetWorkflows` and `Backend.getWorkflows()` with background-aware callback deduplication (same pattern as the existing `getWorkflow()` implementation) Part 1 of 2 — Part 2: RevenueCat#3508 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Low Risk** > Additive networking and parsing alongside existing workflow detail APIs, with broad test coverage and no changes to purchase or auth flows. > > **Overview** > Adds client support for **listing subscriber workflows** via `GET /v1/subscribers/{userId}/workflows`, including an optional `type` query filter. > > New kotlinx-serialization models `WorkflowSummary` and `WorkflowsListResponse` (with `parseWorkflowsListResponse`) describe list entries (`id`, `display_name`, `offering_id`, `prefetch`). `Endpoint.GetWorkflows` is wired for signature verification like the single-workflow endpoint, and **`Backend.getWorkflows`** performs the request with the same background-aware callback deduplication pattern as `getWorkflow`. Tests cover path encoding, deserialization edge cases, HTTP errors, and concurrent call coalescing. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit b27d7cd. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

Summary
Wires paywall workflows fetching end-to-end and adds in-memory + disk caching for both the workflows list and individual workflow details.
Workflows list (
WorkflowManager.getWorkflowsList)type=paywall), persists the response toDeviceCache, and builds an in-memoryofferingId → workflowIdmap exposed viaworkflowIdForOfferingId().prefetch == true.onCompletefires only after the list fetch and all prefetch CDN fetches finish, soworkflowIdForOfferingId()is safe to call as soon asgetOfferingssucceeds.offeringId → workflowIdmap is restored from the disk cache.Workflow details + caching (
WorkflowsCache)WorkflowsCacheis the single owner of in-memory workflow state: resolved per-workflowWorkflowDataResults and the workflows list with itsofferingId → workflowIdmap.getWorkflowserves a fresh cached result without a backend round-trip, so a prefetched workflow is reused when its paywall opens instead of being re-fetched. Uses the same 5 min / 25 hr foreground/background TTL as offerings.DeviceCachegainscacheWorkflowsListResponse/getWorkflowsListResponseCache/clearWorkflowsListResponseCachefor list persistence, following the offerings-response pattern.Lifecycle (mirrors offerings)
getWorkflowsList()runs after offerings are cached, and offeringsonSuccessnow waits for it to complete when the endpoint is enabled.WorkflowsCacheand the disk list cache (viaOfferingsCache.clearCache), so a user switch never serves the previous user's workflows.Gating
USE_WORKFLOWS_ENDPOINTbuild config flag (driven by therevenuecat.useWorkflowsEndpointproperty, off on CI). When off,WorkflowManageris passed asnulltoOfferingsManager, so offerings timing is unchanged.WorkflowPreWarmer.See this generated image for helpful context
Note
Medium Risk
Changes when getOfferings completes and adds concurrent prefetch/network paths; identity cache clearing is aligned with offerings but wrong timing could briefly serve stale workflow maps after user switch (documented same race as offerings).
Overview
Replaces the per-offering
WorkflowPreWarmerwith a full workflows pipeline gated byUSE_WORKFLOWS_ENDPOINT: when enabled,WorkflowManager+ newWorkflowsCacheown list/detail caching (memory + disk list inDeviceCache), prefetch ofprefetch=trueworkflows, deduped in-flight list fetches, and anofferingId → workflowIdmap.OfferingsManagernow waits ongetWorkflowsListbefore delivering offerings success (cache hits and network paths), and forces the workflows list stale when offerings are freshly fetched from the network (not disk fallback).IdentityManagerclearsWorkflowsCacheon login, alias, switch, and logout like offerings.getWorkflowgains detail caching/TTL, broader resolve error handling, and optionalcallbackDispatcherfor bounded prefetch (4-thread pool + dedicatedDefaultFileRepositorywithlimitedParallelism(4)for CDN).PurchasesOrchestrator.getWorkflowis nullable when workflows are off, always **dispatch**es callbacks to the main thread, and reports a configuration error if disabled.DefaultFileRepositoryaccepts an injectableioScope; downloads no longer wrapwithContext(IO)so concurrency limits apply.InMemoryCachedObject.lastUpdatedAtis@Volatile.Reviewed by Cursor Bugbot for commit d5ae7df. Bugbot is set up for automated code reviews on this repo. Configure here.