Skip to content

dynamic_modules: scheduler ABI for generic non-blocking async operations #39765

Merged
mathetake merged 7 commits intoenvoyproxy:mainfrom
mathetake:cookingdispathers
Jun 17, 2025
Merged

dynamic_modules: scheduler ABI for generic non-blocking async operations #39765
mathetake merged 7 commits intoenvoyproxy:mainfrom
mathetake:cookingdispathers

Conversation

@mathetake
Copy link
Copy Markdown
Member

@mathetake mathetake commented Jun 6, 2025

Commit Message: dynamic_modules: scheduler ABI for generic non-blocking async operations
Additional Description:

This commits adds a generic "scheduler" ABI that allows modules to schedule a generic event trigged by the module. More specifically, this adds the capability to create a "scheduler" object per HTTP filter object (== per request) that can be safely sent across threads. It has the method to schedule a generic event invoked against the HTTP filter object on the correct worker thread, and that method can be called from any threads including the ones managed by a module. At Rust SDK level, the scheduler trait implements the Send trait hence can be sent to another thread to offload the processing while freeing the worker thread.

The following functions are added to the ABI:

  • envoy_dynamic_module_on_http_filter_scheduled
  • envoy_dynamic_module_callback_http_filter_scheduler_new
  • envoy_dynamic_module_callback_http_filter_scheduler_commit
  • envoy_dynamic_module_callback_http_filter_scheduler_delete
  • envoy_dynamic_module_callback_http_filter_continue_decoding
  • envoy_dynamic_module_callback_http_filter_continue_encoding

This enables the whole bunch of new use cases. Most notably, now a module can implement their own External Processor, External Authorization, or Global Rate Limit filter style extensions. These require the blocking network calls in their own style so the scheduler allows it to avoid blocking the worker thread while doing the necessary work. One another example is an external-caching filter which talks to a database like DynamoDB, Redis, Memcached, S3, etc.

Here's the example Rust SDK usage which implements an external-caching filter (from the integration test case):

struct FakeExternalCachingFilter {
  rx: Option<std::sync::mpsc::Receiver<String>>,
}

impl<EHF: EnvoyHttpFilter> HttpFilter<EHF> for FakeExternalCachingFilter {
  fn on_request_headers(
    &mut self,
    envoy_filter: &mut EHF,
    _end_of_stream: bool,
  ) -> envoy_dynamic_module_type_on_http_filter_request_headers_status {
    // Get the cache key from the request header, which is owned by the Envoy filter.
    let cache_key_header_value = envoy_filter.get_request_header_value("cacahe-key").unwrap();
    // Construct the cache key String from the Envoy buffer so that it can be sent to the different
    // thread.
    let mut cache_key = String::new();
    cache_key.push_str(std::str::from_utf8(cache_key_header_value.as_slice()).unwrap());
    // We need to send the found cached response body back to the worker thread,
    // so we use a channel to communicate between the filter and the worker thread safely.
    //
    // Alternatively, you can use Arc<Mutex<>> or similar constructs.
    let (cx, rx) = std::sync::mpsc::channel();
    self.rx = Some(rx);
    // In real world scenarios, rather than spawning a thread per request,
    // you would typically use a thread pool or an async runtime to handle
    // the asynchronous I/O or computation.
    let scheduler = envoy_filter.new_scheduler();
    _ = std::thread::spawn(move || {
      // Simulate some processing to check if the cache key exists.
      let cache_hit = if cache_key == "existing" {
        // Do some processing to get the cached response body in real world.
        let cached_body = "cached_response_body".to_string();
        // If the cache key exists, we send it back to the Envoy filter.
        cx.send(cached_body).unwrap();
        1
      } else {
        0
      };
      // We use the event_id pased to the commit method to indicate if the cache key was found.
      scheduler.commit(cache_hit);
    });
    // Return StopIteration to indicate that we will continue the processing
    // once the scheduled event is completed.
    envoy_dynamic_module_type_on_http_filter_request_headers_status::StopIteration
  }

  fn on_scheduled(&mut self, envoy_filter: &mut EHF, event_id: u64) {
    // We use event_id to determine if the cache key was found.
    if event_id == 0 {
      // This means the cache key was not found, so we continue decoding.
      envoy_filter.continue_decoding();
    } else {
      let result = self.rx.take().unwrap().recv().unwrap();
      envoy_filter.send_response(200, vec![("cached", b"yes")], Some(result.as_bytes()));
    }
  }
}

Risk Level: low (completely new optional code path)
Testing: done
Docs Changes: done
Release Notes: n/a
Platform Specific Features: n/a

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants