Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Subject to rxgo #392

Open
TongChia opened this issue Dec 30, 2022 · 0 comments
Open

Add Subject to rxgo #392

TongChia opened this issue Dec 30, 2022 · 0 comments
Labels
enhancement Enhancement.

Comments

@TongChia
Copy link

I miss the subject under rxjs

It is a bidirectional stream, could be emit new item via the next() method.
https://rxjs.dev/guide/subject
when that. i can share this subject object to other method to emit item or observe it.

I tried to simulate this method
But it is inevitable that one more channel is created

type Subject interface {
	rxgo.Observable
	Next(interface{})
}

type SubjectImpl struct {
	rxgo.Observable
	channel chan rxgo.Item
}

func (subject *SubjectImpl) Next(i interface{}) {
	rxgo.Of(i).SendBlocking(subject.channel)
}

func NewSubject(opts ...rxgo.Option) Subject {
	ch := make(chan rxgo.Item)
	obs := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
		defer close(ch)
		for {
			select {
			case item := <-ch:
				item.SendBlocking(next)
			case <-ctx.Done():
				return
			}
		}
	}}, opts...)

	return &SubjectImpl{
		channel:    ch,
		Observable: obs,
	}
}
@TongChia TongChia added the enhancement Enhancement. label Dec 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement.
Projects
None yet
Development

No branches or pull requests

1 participant