Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observe(rxgo.WithContext(ctx)) does not cancel observable? #342

Open
torepaulsson opened this issue Jan 31, 2022 · 3 comments
Open

Observe(rxgo.WithContext(ctx)) does not cancel observable? #342

torepaulsson opened this issue Jan 31, 2022 · 3 comments
Assignees
Labels
question Question regarding how RxGo is working etc.

Comments

@torepaulsson
Copy link

Hi,
I am not sure I'm using RxGo the correct way or if this use case is not supported. I have one producer and multiple consumers which want to subscribe and unsubscribe for messages. Consumers can come and go, my idea was to use the rxgo.WithContext to "cancel" consumer subscriptions, but they don't get cancelled? Is this the intended behavior or am I missing something? I've been testing out some different approaches to no success.

The test I try with his this:

func TestCtxCancel(t *testing.T) {
	ch := make(chan rxgo.Item)
	ob := rxgo.FromEventSource(ch, rxgo.WithBackPressureStrategy(rxgo.Drop))

	producerCtx, cancelProducer := context.WithCancel(context.Background())
	cancel1 := observe(1, ob)
	cancel2 := observe(2, ob)

	// Producer
	go func() {
		defer func() {
			log.Print("producer finished")
			close(ch)
		}()
		ctr := 0
		t := time.NewTicker(time.Second)
		for {
			select {
			case <-producerCtx.Done():
				return
			case <-t.C:
				ctr++
				select {
				case ch <- rxgo.Of(ctr):
				default:
					log.Printf("Send channel blocked, msg %d dropped", ctr) // This happend when I tried some other methods
				}
			}
		}
	}()

	log.Printf("Sleep 3")
	time.Sleep(time.Second * 3)
	log.Printf("Cancel Observer 1")
	cancel1()

	log.Printf("Sleep 3")
	time.Sleep(time.Second * 3)
	log.Printf("Cancel Observer 2")
	cancel2()

	log.Printf("Sleep 3")
	time.Sleep(time.Second * 3)
	log.Printf("Cancel Producer")
	cancelProducer()

	log.Printf("Sleep 1")
	time.Sleep(time.Second * 1)
}

func observe(index int, ob rxgo.Observable) context.CancelFunc {
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		for i := range ob.Observe(rxgo.WithContext(ctx)) {
			log.Printf("Observer[%d]: %v", index, i.V)
		}
		log.Printf("Observer[%d] stopped", index)
	}()
	return cancel
}

Produces the following output:

2022/01/31 08:43:59 Sleep 3
2022/01/31 08:44:00 Observer[2]: 1
2022/01/31 08:44:00 Observer[1]: 1
2022/01/31 08:44:01 Observer[2]: 2
2022/01/31 08:44:01 Observer[1]: 2
2022/01/31 08:44:02 Observer[2]: 3
2022/01/31 08:44:02 Observer[1]: 3
2022/01/31 08:44:02 Cancel Observer 1
2022/01/31 08:44:02 Sleep 3
2022/01/31 08:44:03 Observer[2]: 4
2022/01/31 08:44:03 Observer[1]: 4
2022/01/31 08:44:04 Observer[2]: 5
2022/01/31 08:44:04 Observer[1]: 5
2022/01/31 08:44:05 Observer[2]: 6
2022/01/31 08:44:05 Observer[1]: 6
2022/01/31 08:44:05 Cancel Observer 2
2022/01/31 08:44:05 Sleep 3
2022/01/31 08:44:06 Observer[2]: 7
2022/01/31 08:44:06 Observer[1]: 7
2022/01/31 08:44:07 Observer[2]: 8
2022/01/31 08:44:07 Observer[1]: 8
2022/01/31 08:44:08 Observer[2]: 9
2022/01/31 08:44:08 Observer[1]: 9
2022/01/31 08:44:08 Cancel Producer
2022/01/31 08:44:08 Sleep 1
2022/01/31 08:44:08 producer finished
2022/01/31 08:44:08 Observer[2] stopped
2022/01/31 08:44:08 Observer[1] stopped

Have I just made a mistake somewhere which leads to the observable not being cancelled? Any help is much appreciated, thanks for a great package.

@torepaulsson torepaulsson added the question Question regarding how RxGo is working etc. label Jan 31, 2022
@nlm
Copy link

nlm commented May 19, 2022

According to

func (i *eventSourceIterable) Observe(opts ...Option) <-chan Item {

The current code for observing the eventSourceIterable is the following:

func (i *eventSourceIterable) Observe(opts ...Option) <-chan Item {
	option := parseOptions(append(i.opts, opts...)...)
	next := option.buildChannel()

	i.Lock()
	if i.disposed {
		close(next)
	} else {
		i.observers = append(i.observers, next)
	}
	i.Unlock()
	return next
}

It indeed seems it doesn't handle context cancellations from options.
I see two options here.

Either you handle it from your code:

func observe(index int, ob rxgo.Observable) context.CancelFunc {
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		itemCh := ob.Observe(rxgo.WithContext(ctx))
	OuterLoop:
		for {
			select {
			case <-ctx.Done():
				// close(itemCh) ?
				break OuterLoop
			case i, ok := <-itemCh:
				if !ok {
					break OuterLoop
				}
				log.Printf("Observer[%d]: %v", index, i.V)
			}
		}
		log.Printf("Observer[%d] stopped", index)
	}()
	return cancel
}

Or maybe patching the RxGo code with something like this might do the trick (untested):

func (i *eventSourceIterable) Observe(opts ...Option) <-chan Item {
	option := parseOptions(append(i.opts, opts...)...)
	next := option.buildChannel()

	i.Lock()
	if i.disposed {
		close(next)
	} else {
		i.observers = append(i.observers, next)
		ctx := option.buildContext(context.Background())
		if ctx.Done() != nil {
			go func() {
				<-ctx.Done()
				close(next)
			}()
		}
	}
	i.Unlock()
	return next
}

Edit: fixed observe example to properly handle closed itemCh

@dayaftereh
Copy link

I think i had the same issue #318.

@harningt
Copy link

I've noticed in many places that the context is not chained through (either processed from options list or from the parent observable). I've managed to work through most of my issues with cancellations not taking effect with careful selection of which operators/sources and passing through the context option at places where I can and know the parent won't honor the parent cancellation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question regarding how RxGo is working etc.
Projects
None yet
Development

No branches or pull requests

5 participants