Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to fun out #340

Open
imalkov82 opened this issue Jan 5, 2022 · 2 comments
Open

How to fun out #340

imalkov82 opened this issue Jan 5, 2022 · 2 comments
Assignees
Labels
question Question regarding how RxGo is working etc.

Comments

@imalkov82
Copy link

imalkov82 commented Jan 5, 2022

I'm trying to fun out the final step of my reactive flow to achieve parallel execution of the final step using DoOnNext.

Running the code bellow I expect that thirdCounter = 2 and every "first DoOnNext", "second DoOnNext" and "third DoOnNext" will be printer twice (total 6 times)

The printing are as expected and also the Map concat the strings correctly. However, the thirdCounter = 7 hence the steps are over invoked.

What I'm missing here?

My code:

var thirdCounter int32
func localRun(names ...string) {
	observable := rxgo.Just(names)().
		Map(func(_ context.Context, i interface{}) (interface{}, error) {
			s := i.(string)
			s = fmt.Sprintf("%s,%s", s, "one")
			return s, nil
		}).
		Map(func(_ context.Context, i interface{}) (interface{}, error) {
			s := i.(string)
			s = fmt.Sprintf("%s,%s", s, "two")
			return s, nil
		}).
		Map(func(_ context.Context, i interface{}) (interface{}, error) {
			atomic.AddInt32(&thirdCounter, 1)
			s := i.(string)
			s = fmt.Sprintf("%s,%s", s, "three")
			return s, nil
		})

	observable.DoOnNext(func(i interface{}) {
		fmt.Println("first DoOnNext", i)
	})

	observable.DoOnNext(func(i interface{}) {
		fmt.Println("second DoOnNext", i)
	})

	observable.DoOnNext(func(i interface{}) {
		fmt.Println("third DoOnNext", i)
	})

	for item := range observable.Last().Observe() {
		fmt.Println(item.V)
	}
	fmt.Printf("Third Counter = %d\n", thirdCounter)
}
func TestMocktFlow(t *testing.T) {
	cs := make([]string, 0)
	cs = append(cs, "Hello")
	cs = append(cs, "Hi")
	localRun(cs...)
}
@imalkov82 imalkov82 added the question Question regarding how RxGo is working etc. label Jan 5, 2022
@imalkov82 imalkov82 changed the title How to fun out results How to fun out Jan 6, 2022
@imalkov82
Copy link
Author

@teivah , kind reminder

@cp949
Copy link

cp949 commented Jun 19, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question regarding how RxGo is working etc.
Projects
None yet
Development

No branches or pull requests

3 participants