Skip to content

Conversation

@bartelink
Copy link
Collaborator

@bartelink bartelink commented Apr 13, 2023

Introduces a Propulsion.Sinks namespace to replace Propulsion.Streams.Default. Within that, the structure remains the same.

  • key change being that there's now an Event type alias, with StreamSpan being removed in favour of usage of Event[]
  • SpanResult becomes Sinks.StreamResult
  • Adds Sinks.Events helpers: index, nextIndex, which compute the base and post versions given the events supplied to the handler.
  • SpanResult.OverrideWritePosition becomes StreamResult.OverrideNextIndex
  • renames Config to Factory (various internal helpers have similar renames)
  • removes several statsInterval arguments that can be inferred based on the stats argument
  • replaces Choice<'res,exn> with Result

Also tweaks some function signatures and type aliases based on upgrading a production system that uses Propulsion V2 and Equinox V2.

  • Expose EquinoxSystemTextJsonParser.tryEnumStreamEvents to facilitate custom parsing
  • Update straggler tupled/async APIs to adhere to Func/Task

@bartelink
Copy link
Collaborator Author

bartelink commented Apr 13, 2023

In porting some prod code, I found client side was:

  • referring to FsCodec.ITimelineEvent<Propulsion.Streams.Default.EventBody> (singular of StreamSpan) a lot (the code in question actually adheres to the good practice of not referring to the Propulsion package at all)
  • not referring to StreamSpan very much

The solution on the Propulsion side was to define a Propulsion.Streams.Default.Event type alias for FsCodec.ITimelineEvent<Propulsion.Streams.Default.EventBody>, and major on that

This PR actually removes the StreamSpan term entirely, which was just a type alias (in V2 it was a record containing the base index, along with the actual data as events, but that can be obtained from event 0) - it's replaced with Event[]


On the client side, it's frequently useful to define a local set of aliases near ones one's Domain/Event parsing logic a la:

module Streams

type EventBody = System.ReadOnlyMemory<byte> // or Propulsion.Streams.Default.EventBody
type Event = FsCodec.ITimelineEvent<EventBody> // or Propulsion.Streams.Default.Event

module Event =
    let bodyAsString (b : EventBody) = System.Text.Encoding.UTF8.GetString(b.Span)
    let metaAsString (e : Event) = bodyAsString e.Meta

    open FsCodec.NewtonsoftJson
    let private settings = Newtonsoft.Json.JsonSerializerSettings()
    let private serdes = Serdes(settings)
    type UserContext = Domain.Types.UserContext
    let userContext event = serdes.Deserialize<UserContext>(metaAsString event)

This lets one refer to Stream.Event[] in type signatures without having to
a) reference the Propulsion nuget
b) having to have noisy Propulsion.Streams.Default namespacing leak into Domain code

NOTE In the above, there are also helpers associated with the encoding used with the system in question. Having a top level module that manages this allows for terse and consistent application code.

I'm still searching for a better way to name/structure things such that client code does not end up having to be littered with Propulsion.Streams.Default strings (though it is important that people appreciate that the ReadOnlyMemory<byte> bodies is only a default layered on at the very end - the engine is generic in terms of the Data/Meta format)


Alongside this module Streams pattern, a module Store pattern can make sense:

module Store =

    type EventBody = Equinox.CosmosStore.Core.EventBody
    type Event = FsCodec.ITimelineEvent<EventBody>

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type Store =
    | Memory of Equinox.MemoryStore.VolatileStore<Store.EventBody>
    | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.Core.ICache

Again, this allows one to refer to Store.EventBody, Store.Event in application/wiring code, which:
a) allows one to make the code clearer in terms of semantics, conveying whether an event is one on the way to/from the Store, or one being Streamed (in this case, Equinox.CosmosStore V4 is in use, which means the body is a JsonElement, whereas Stream.EventBody is System.ReadOnlyMemory<byte>)
b) provides for common naming - you can attach a module Store set of helpers to go with the type aliases (and DU)
c) allows one to manage migration (e.g. in Equinox.Cosmos V2, the event body was byte[], which became System.Text.Json.JsonElement)
d) allows one to define a canonical form when using the MemoryStore (this system is based on Cosmos, so JsonElement makes sense; for a DynamoStore or MessageDb one, System.ReadOnlyMemory<byte> probably makes more sense)

@bartelink
Copy link
Collaborator Author

bartelink commented Apr 14, 2023

@nordfjord related to jet/equinox#375 I am strongly considering renaming one of two Start overloads that vary only in Task vs Async sigs per 1e24930 as the overloading is triggering the need for some very ugly type annotations in jet/dotnet-templates@0b68153#diff-963825806f12f8bbae0a020b390cc47576214c1deb525818ae1e9bef361b60f0R87

... but I don't like Ex, and Async vs Task is not better either...

@bartelink bartelink changed the title Replace StreamSpan with Event[] Add Sinks Builder with Batched mode Apr 15, 2023
@bartelink bartelink changed the title Add Sinks Builder with Batched mode Add Sinks Factory, Batched mode Apr 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants