Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxGo Rest api example #405

Open
DennisMuchiri opened this issue Dec 7, 2023 · 1 comment
Open

RxGo Rest api example #405

DennisMuchiri opened this issue Dec 7, 2023 · 1 comment
Labels
question Question regarding how RxGo is working etc.

Comments

@DennisMuchiri
Copy link

I'm requesting an example on how to use this library to emit items in a Rest API GET

@DennisMuchiri DennisMuchiri added the question Question regarding how RxGo is working etc. label Dec 7, 2023
@Alfex4936
Copy link

I think simplest api would be something like this

image

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"

	"github.com/reactivex/rxgo/v2"
)

type WebFramework struct {
	router map[string]rxgo.Observable
}

func NewWebFramework() *WebFramework {
	return &WebFramework{
		router: make(map[string]rxgo.Observable),
	}
}

// Observable handler
func (wf *WebFramework) Handle(path string, handler rxgo.Observable) {
	wf.router[path] = handler
}

func (wf *WebFramework) Start(port int) error {
	log.Printf("💕 Starting server at %d", port)
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		if observable, ok := wf.router[r.URL.Path]; ok {
			ch := observable.Observe()
			for item := range ch {
				if item.Error() {
					http.Error(w, item.E.Error(), http.StatusInternalServerError)
					return
				}
				fmt.Fprintln(w, item.V)
			}
		} else {
			http.NotFound(w, r)
		}
	})

	return http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}

func main() {
	wf := NewWebFramework()

	// an observable that emits integers, transforms them to strings, and filters by a condition
	observable := rxgo.Just(1, 2, 3, 4, 5)().
		Map(func(_ context.Context, item interface{}) (interface{}, error) {
			num := item.(int)
			return fmt.Sprintf("%d", num*num), nil
		}).
		Filter(func(item interface{}) bool {
			str := item.(string)
			return str[0] == '1' // simple filter to demonstrate, checks if the first char is '1'
		})

	// Register an HTTP path with the observable
	wf.Handle("/data", observable)

	// Start the server on port 8080
	err := wf.Start(8080)
	if err != nil {
		fmt.Println("Failed to start server:", err)
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Question regarding how RxGo is working etc.
Projects
None yet
Development

No branches or pull requests

2 participants