Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Subjects #407

Open
chfischerx opened this issue Jan 3, 2024 · 0 comments
Open

Adding Subjects #407

chfischerx opened this issue Jan 3, 2024 · 0 comments
Labels
enhancement Enhancement.

Comments

@chfischerx
Copy link

Is your feature request related to a problem? Please describe.
The ReactiveX "standard" defines various types of subjects to allow multiple subscribers observe an observable (see https://reactivex.io/documentation/subject.html). While the same can be achieved with a connectable Observable, there are some missing features. Most important, dynamically adding and removing subscribers. A RxGo connectable Observable allows adding multiple subscribers, and each will receive a copy of all items. But once added, subscribers cannot be removed (or unsubscribe).

Subjects are useful when trying to emulate an in-memory message broker. A typical example is creating a fan-out of a single data source to multiple clients within a web server using e.g. WebSockets. Every new WS connection will be added as a Subject subscriber. Once the client disconnects the subscription must be removed.

Describe the solution you'd like
I suggest three Subject types:

  • Subject ... simple Fanout with unsubscribe
  • BehaviorSubject ... Subject which replays the last received items to new subscribers
  • ReplaySubject ... Subject which replays the last n received items to new subscribers

Suggested Subject API:

type ISubject interface {
	Subscribe() (Subscription, rxgo.Observable)
	Next(value any)
	Error(err error)
	Complete()
}

And the Subscription API:

type Subscription interface {
	Unsubscribe()
}

Additional context
I already created all of the above in a private repo based on RxGo Observables. I would be glad to contribute (and of course maintain) my code into this project if there is interest.

@chfischerx chfischerx added the enhancement Enhancement. label Jan 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement.
Projects
None yet
Development

No branches or pull requests

1 participant