Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Counting items of a Connectable Observable #385

Open
matejpavlovic opened this issue Nov 7, 2022 · 0 comments
Open

Counting items of a Connectable Observable #385

matejpavlovic opened this issue Nov 7, 2022 · 0 comments
Labels
question Question regarding how RxGo is working etc.

Comments

@matejpavlovic
Copy link

matejpavlovic commented Nov 7, 2022

Hello, I am trying to count events produced by a Connectable Observable in 2 ways:

  • The total number of events and
  • The number of events that pass a filter.

Here is some example code:

package main

import (
	"context"
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func main() {
	events := rxgo.Create([]rxgo.Producer{func(_ context.Context, next chan<- rxgo.Item) {
		next <- rxgo.Of(expensiveReadFromDisk(0))
		next <- rxgo.Of(expensiveReadFromDisk(1))
		next <- rxgo.Of(expensiveReadFromDisk(2))
	}}, rxgo.WithPublishStrategy())

	total := events.Count()
	filtered := events.Filter(func(i interface{}) bool {
		return i.(int) > 0
	}).Count()

	events.Connect(context.Background())

	t, _ := total.Get()
	fmt.Printf("   Total: %d\n", t.V)
	f, _ := filtered.Get()
	fmt.Printf("Filtered: %d\n", f.V)
}

func expensiveReadFromDisk(e int) int {
	fmt.Printf("Reading event: %d\n", e)
	return e
}

I expected the code to output

Reading event: 0
Reading event: 1
Reading event: 2
   Total: 3
Filtered: 2

Instead, however, the code outputs only this:

Reading event: 0
Reading event: 1
Reading event: 2
   Total: 3

Then it blocks on the following line and gets stuck forever.

	f, _ := filtered.Get()

Is this the intended behavior? If yes, what would be the correct way of achieving the intended result?
Thank you very much!

@matejpavlovic matejpavlovic added the question Question regarding how RxGo is working etc. label Nov 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question regarding how RxGo is working etc.
Projects
None yet
Development

No branches or pull requests

1 participant