Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move stream.subscribe() to be part of the constructor #1997

Open
KyleAMathews opened this issue Nov 19, 2024 · 22 comments
Open

Move stream.subscribe() to be part of the constructor #1997

KyleAMathews opened this issue Nov 19, 2024 · 22 comments

Comments

@KyleAMathews
Copy link
Contributor

e.g. new ShapeStream({ subscribe: () => {} })

The problem with .subscribe() is then people get the sense that they can subscribe whenever and they'll get the full stream where if you delay calling .subscribe then you'll miss the early messages. So to avoid this footgun and in general, keep construction of streams fixed & declarative, let's move subscriptions to the constructor.

@msfstef
Copy link
Contributor

msfstef commented Nov 19, 2024

I think we want this to be a declarative list of subscribers, soo something like:

type ShapeStreamSubscriber = {
  onMessages: (messages: Message[]) => void | Promise<void>,
  onError: (err: unknown) => void
}

subscribers: ShapeStreamSubscriber[] | ShapeStreamSubscriber

@thruflo
Copy link
Contributor

thruflo commented Nov 19, 2024

Is there not a slight chicken and egg thing of needing to instantiate a Shape to have the subscribers to instantiate the ShapeStream which is needed to instantiate the Shape ...?

@samwillis
Copy link
Contributor

The alternative is an explicit .start() method on a shape stream. It solves two issues:

  • allowing multiple or lazy subscribing to a stream (passing it to a Shape)
  • catching exceptions during the initial connection

.start() would return a promise that resolves/fails depending on initial connection - such as auth failing. This is imposible to do with the sync constructor (the alternative is an async static .create() but that doesn't solve lazy subscription)

After calling .start() we can lock the subscriptions and throw on any further .subscribe() calls. (Although there is an argument for late subscription for power users and edge cases...)

@msfstef
Copy link
Contributor

msfstef commented Nov 20, 2024

Is there not a slight chicken and egg thing of needing to instantiate a Shape to have the subscribers to instantiate the ShapeStream which is needed to instantiate the Shape ...?

This is part of the problem in how we currently use ShapeStream - as soon as we instantiate it it starts streaming, which means that if you "feed it" to a Shape asynchronously it will have missed messages and have inconsistent state. Passing an ongoing stream to a shape to materialize it is odd, since the materialization requires the shape to have access to the stream from the start (and that is not guaranteed when being passed an instantiated stream).

The way to do it with the declarative approach would be that Shape actually accepts Omit<ShapeStreamOptions, 'subscribers'> rather than a ShapeStream instance, and it internally creates a shape stream with itself as a subscriber.

There is also another way, using a sort of ShapeStreamBuilder, see below:

The alternative is an explicit .start() method on a shape stream.

We had a discussion around this with @KyleAMathews @kevin-dp @balegas - currently we are mixing two patterns, a declarative one and a builder one.

If we opt for the fully declarative one as the core pattern, then we can very easily code up a ShapeStreamBuilder that uses a builder pattern, where you would do something like:

const builder = new ShapeStreamBuilder()
   ..setUrl(url)
   ..setShapeDefinition(shapeDef)
   ..addSubscriber(sub)
 
// ... stuff happens ...

builder.addSubscriber(sub)

const shapeStream = builder.build() // or .run()
// internally the build call would create a declarative `ShapeStream`

This solves the cases where people might want to lazily add subscribers without 1) having unclear semantics as to when the stream starts or whether they can add more subscribers after etc, 2) having any externally mutable state on the stream itself and having to deal with special cases and flags and what not.

catching exceptions during the initial connection

We had discussed that this might not make that much sense, since a stream is a continuous thing and you might get auth errors or invalid relation errors at any point during the runtime of the stream - so separating the "initial connection" is extra overhead that could be handled by one generic declarative global onError handler.

We can have a static .validateShape API that is a single async call that ensures you can create the stream and everything is correct if that is important as a step, or we can have a .started promise that resolves perhaps if someone wants to await it to ensure that the stream has at least done one request or something - but errors will be caught in both the global onError handler if provided as well as in the individual subscriber error handlers.

Although there is an argument for late subscription for power users and edge cases...

I am of the opinion that if someone is this much of a power user and requires this sort of undefined behaviour they should be able to fairly easily write up their custom client implementation of the protocol - but also I think it's better to optimize the client for the intended use cases with minimal API surface and clear semantics and wait until we get requests for anything else!

@samwillis
Copy link
Contributor

samwillis commented Nov 20, 2024

Initial reaction to builder concept was a -1, but it actually translates really nicely to a builder pattern for client side processing of the streams - so I'm a +1 now:

Includes:

const comments = new ShapeStreamBuilder()
   .setUrl(url)
   .setShapeDefinition(shapeDef)

const issues = new ShapeStreamBuilder()
   .setUrl(url)
   .setShapeDefinition(shapeDef)
   .query() // Start building a query (may not be needed)
   .include({
     // Client side include of the two shapes
     shape: comments,
     key: 'id', // column on issues
     on: 'issue_id'  // column on comments
     as: 'comments' // added the comments as a 'comments' array prop on all issues
   })
   .subscribe((msg) => doStuff(msg))
   .run() // implies .run() on comments

Joins:

const issues = new ShapeStreamBuilder()
   .setUrl(url)
   .setShapeDefinition(shapeDef)
   
const comments = new ShapeStreamBuilder()
   .setUrl(url)
   .setShapeDefinition(shapeDef)
   .query() // Start building a query (may not be needed)
   .join({
     // Client side join of the two shapes
     shape: join,
     key: 'issue_id', // column on comments
     on: 'id'  // column on issues
     as: 'issue' // added the issue as a 'issue' prop on all comments
   })
   .subscribe((msg) => doStuff(msg))
   .run() // implies .run() on issues

@KyleAMathews
Copy link
Contributor Author

@samwillis huh that is pretty compelling

@msfstef
Copy link
Contributor

msfstef commented Nov 20, 2024

@samwillis happy that it works with that pattern as well! (although I'm unclear if the comments builder .run() call would also "build" the issues stream as well - i.e. "deep run")

I do want to stress the point that the declarative pattern is easier to deal with and does not preclude us from having the builder pattern/ lazy sub as well - but I think our "core" API being declarative makes it easier to test/maintain (and it's my personal preference hehe... objective opinions only)

The main issue with our current approach is that we allow people to subscribe to an ongoing stream, for which there are very limited and particular use cases and is much more likely to cause hard to catch bugs (it's always possible to just start a stream with a given offset if someone want's to "resume" mid-stream)

@KyleAMathews
Copy link
Contributor Author

@samwillis
Copy link
Contributor

@msfstef agreed, in general I prefer a declarative api.

Thinking about it further, we don't need the implied "deep run", but not that the output of the join include won't start until both streams have started.

I wander if the solution of passing a ShapeStream to a Shape is not to - rather than:

const stream = new ShapeStream({
  url: `http://localhost:3000/v1/shape`,
  table: 'items'
})
const shape = new Shape(stream)

we do:

const shape = new Shape({
  shape: {
    url: `http://localhost:3000/v1/shape`,
    table: 'items'
  }
})

This is what we did with the PGlite sync plugin: https://pglite.dev/docs/sync#using-the-sync-plugin-alpha

@samwillis
Copy link
Contributor

@KyleAMathews note that we could add multiple subscribe calls at intermediate steps in a shape query:

const comments = new ShapeStreamBuilder()
   .setUrl(url)
   .setShapeDefinition(shapeDef)
   .query()

// we don't need to save this bit, just subscribe
// count all comments
comments.count(). subscribe((msg) => ...)

const issues = new ShapeStreamBuilder()
   .setUrl(url)
   .setShapeDefinition(shapeDef)
   .query() // Start building a query (may not be needed)
   .include({
     // Client side include of the two shapes
     shape: comments,
     key: 'id', // column on issues
     on: 'issue_id'  // column on comments
     as: 'comments' // added the comments as a 'comments' array prop on all issues
   })
   .subscribe((msg) => doStuff(msg))
   .count()
   .subscribe((msg) => ...) // count all issues
   .run() // implies .run() on comments

@msfstef
Copy link
Contributor

msfstef commented Nov 20, 2024

@samwillis re:Shape instantiation - absolutely agree, see above comment:

The way to do it with the declarative approach would be that Shape actually accepts Omit<ShapeStreamOptions, 'subscribers'> rather than a ShapeStream instance, and it internally creates a shape stream with itself as a subscriber.

Ultimately a shape owns the stream, as it needs to be a subscriber to it from the start and start from offset -1 - perhaps you could even just specify new Shape(opts: ShapeStreamOptions) and the shape just adds itself as a subscriber on top of any subscribers that you have provided - but that might just be confusing.

@KyleAMathews
Copy link
Contributor Author

KyleAMathews commented Nov 20, 2024

Here's another thought — we don't want either declarative or builder — the nature of our system is we create streams than process streams. Stream is the only class. Everything else just processes those streams.

Shape should just be:

import { ShapeStream } from '@electric-sql/client';
import { reduce } from '@electric-sql/stream';

const issueStream = new ShapeStream({...})

issueStream
  .pipe(
    reduce()
  )
  .subscribe(issues) => console.log(issues)); // Output: [issue1, issue2, issue3]

// A declarative Shape
function shape(shapeStreamOptions) {
  const stream = new ShapeStream(shapeStreamOptions)

  return stream.pipe(reduce())
}

const issues = shape({...})
issues.subscribe(issues) => console.log(issues)); // Output: [issue1, issue2, issue3]

This relates to what we were talking about on discord about a generic stream lib. We have stream readers (one of which of course is ShapeStream) which provide a stream of operations against a fixed schema. And then stream operators which can map, filter, reduce, join, various aggregators.

On autostart or not — in the stream processing world — most streams don't start moving bytes until there's a reader — so creating a stream and then not using it until it's piped in a processing pipeline or subscribed to directly makes sense to me.

There's also the idea of replayable streams — we have that already with our offset-based caching scheme — so the result of any stream operator would also be URL-addressable and http-cachable as each stream operator just emits processed operations e.g. reduce just keeps update operations with the latest value. So if you want to do some pre-processing of tables to join and tweak the data structures a bit — you could do that and then expose as a standard shape URL.

@msfstef
Copy link
Contributor

msfstef commented Nov 21, 2024

@KyleAMathews I like the idea of the stream being "pull based" - and if you want multiple subscribers, you can first setup a subscriber to forward messages to e.g. 3 other subscribers "down the pipe" and then subscribe to the stream and you achieve the same effect.

Making our stream match a standard stream library behaviour would be really nice

@KyleAMathews
Copy link
Contributor Author

Making our stream match a standard stream library behaviour would be really nice

I'm not sure we can w/o a lot of acrobatics — but it's definitely worth giving it a serious look cause yeah, it'd save a lot of work

@KyleAMathews
Copy link
Contributor Author

if you want multiple subscribers, you can first setup a subscriber to forward messages to e.g. 3 other subscribers "down the pipe"

"tee" is a typical term here for splitting the stream — then if you have one fork in the stream that's reading faster, the tee will buffer for the slower reader.

If we all our stream stuff can be fronted with http and can cache logs to disk or other places, then the buffering & replay happens pretty much automatically.

@thruflo
Copy link
Contributor

thruflo commented Nov 22, 2024

I like the reduce API.

thruflo added a commit that referenced this issue Nov 22, 2024
This is what I requested in
#2018 (comment)

The client now uses the new onError to handle 401/403 and reconnect with
a new auth token.

One observation: with the onError being defined before the shape is
instantiated, I'm not sure what the best control flow is to handle a
`405` must refetch. Certainly it's hard to handle in the onError
callback. I guess this is what's being discussed in
#1997 etc.
@msfstef
Copy link
Contributor

msfstef commented Nov 27, 2024

Bumping this issue to move it along - how are we feeling about ShapeStream needing extra stream processing?

If we make the change such that ShapeStream only starts streaming when subscribe is called on it, that has the benefit of not breaking the API completely, but we'd still have to block it from allowing subsequent subscribe calls (as the first one is effectively consuming it).

We could also not block it and allow mid-stream subscribing like we already do, and push the responsibility of correct usage to the developer (and allow "advanced" uses). In that case our API stays exactly the same, and the changes are only internal such that the stream starts on the first subscribe call.

If we want to rename it to pipe, would we still allow multiple pipe calls? or consider it a consumable resource with a single subscriber and throw for subsequent calls?

@KyleAMathews
Copy link
Contributor Author

Waiting for the first subscribe call seems pretty reasonable — but still allowing mid-stream subscribes is also something I think we should allow e.g. that's what we're using for matchStream.

I think pipe could be a separate function perhaps e.g.

import { pipe, map, sum } from "@electric-sql/stream"

const stream = new Shape({...})

const mappedStream = pipe(stream, map((value) => {...value, count: value.count * 2}))

// Sam's idea to add built-in `materialize` function to do what Shape does now.
mappedStream.materialize().subscribe(rows => {
  // do something with materialized mapped rows
})

const sumStream = pipe(stream, sum(value => value.count))

subStream.subscribe() // sum of count

@samwillis
Copy link
Contributor

I think materialise may be a separate import too, we are likely to have multiple implementations: in-memory, SQLite, PGlite, any other persistence.

We would then also likely have a .toStream() on a materialized shape. In the case of a shape materialised to SQLite it would then provide an api that can be wrapped in a http api to subscribe to the shape. We have to persist the same in a way that a client can then request all changes from an offset.

We also need to take a look at the underlying D2 lib, it may need some refactoring to enable the pipe style api, and currently needs a way to finalise a full graph covering all branches of interconnected queries/operators, before then being manually steps though as things are written to its inputs. All possible, but needs consideration.

@samwillis
Copy link
Contributor

@msfstef whether we end up with the pipe api, or something similar, it will be a separate lib/import, and internally subscribe. As the underlying fetch is async, if multiple subscribes are added syncruonasly at the same time (in the same micro task), they won't miss any messages. A setTimeout(start, 0) would be belts and braces as it would ensure the start happens in a new task after any micro tasks. This will benimportant when constructing a pipeline what uses the same stream so do multiple computations. (See @KyleAMathews example above with one doing a map and the other doing the count)

@msfstef
Copy link
Contributor

msfstef commented Dec 4, 2024

@samwillis re:microtasks - indeed this is the only reason that our test suite and most use cases are mostly working, my issue is that this behaviour will undoubtedly lead to weird, hard-to-discover bugs in the future, both for us and anyone using it.

It seems to me that as an immediate action, we can change the ShapeStream to start on the first subscribe, potentially adding a setTimeout to ensure it starts in a separate event, and continue to allow subscribing mid-stream. As a first step this should eliminate all issues where a stream would only have 1 subscriber, and reduce issues with multiple subscribers, all without changing the API at all - so I think this is a win-win.

I think this issue should stay open for us to decide on more major changes, but I think it's worth actioning the above and potentially save us annoying issues asap, for single sub cases at least.

@KyleAMathews
Copy link
Contributor Author

👍 to implementing immediately waiting for the first subscriber to start fetching.

msfstef added a commit that referenced this issue Dec 19, 2024
Addresses part of #1997

Had to change some tests around - obviously this is not a full move
towards the desired behaviour, e.g. we might even want for the stream to
stop when it has _no_ subscribers, but I think this provides some
beneficial properties to the stream (e.g. if subscribing to it after
async gap no messages lost)

If we think this just complicates the client and that if we want to
address the issue we should do it in one comprehensive go, I'm happy to
close this as well
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants