Skip to content

lxzan/event_emitter

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

27 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

EventEmitter

Simple, Fast and Thread-Safe Event Pub/Sub Library for Golang

Build Status codecov

Introduction

EventEmitter is a simple, fast and thread-safe event Pub/Sub library for Golang.

EventEmitter only supports in memory storage, and does not support persistence, so it is not suitable for scenarios that require persistence. It can only be used in the application itself, does not support distribution, if you need distribution, please use RabittMQ, Kafka etc.

EventEmitter designed for scenarios that require high concurrency and high performance.

Install

go get -v github.com/lxzan/event_emitter@latest

Quick Start

It very easy to use, just create a event emitter, then subscribe and publish messages. It can be used in any place of your application.

The following is a simple example.

package main

import (
	"fmt"
	"github.com/lxzan/event_emitter"
	"sync/atomic"
)

func main() {
	// create a event emitter
	var em = event_emitter.New[int64, event_emitter.Subscriber[int64]](&event_emitter.Config{
		BucketNum:  16,
		BucketSize: 128,
	})

	var increaser = new(atomic.Int64)

	// create a subscriber
	var suber1 = em.NewSubscriber(increaser.Add(1))

	// subscribe topic "greet"
	em.Subscribe(suber1, "greet", func(subscriber event_emitter.Subscriber[int64], msg any) {
		fmt.Printf("recv: sub_id=%d, msg=%v\n", subscriber.GetSubscriberID(), msg)
	})
	// subscribe topic "greet1"
	em.Subscribe(suber1, "greet1", func(subscriber event_emitter.Subscriber[int64], msg any) {
		fmt.Printf("recv: sub_id=%d, msg=%v\n", subscriber.GetSubscriberID(), msg)
	})

	// create another subscriber
	var suber2 = em.NewSubscriber(increaser.Add(1))

	// subscribe topic "greet1"
	em.Subscribe(suber2, "greet1", func(subscriber event_emitter.Subscriber[int64], msg any) {
		fmt.Printf("recv: sub_id=%d, msg=%v\n", subscriber.GetSubscriberID(), msg)
	})

	// publish message to topic "greet"
	em.Publish("greet1", "hello!")
}

Wildcard

package main

import (
	"github.com/lxzan/event_emitter"
	"sync/atomic"
)

func main() {
	var em = event_emitter.New[int64, event_emitter.Subscriber[int64]](nil)
	var increaser = new(atomic.Int64)
	em.Subscribe(em.NewSubscriber(increaser.Add(1)), "coin.btc.usdt.1m", func(subscriber event_emitter.Subscriber[int64], msg any) {
		println("coin.btc.usdt.1m")
	})
	em.Subscribe(em.NewSubscriber(increaser.Add(1)), "coin.btc.usdt.1h", func(subscriber event_emitter.Subscriber[int64], msg any) {
		println("coin.btc.usdt.1h")
	})
	em.Subscribe(em.NewSubscriber(increaser.Add(1)), "coin.eth.usdt.1m", func(subscriber event_emitter.Subscriber[int64], msg any) {
		println("coin.eth.usdt.1m")
	})
	em.Publish("coin.*.usdt.*", nil)
}

More Examples

GWS Broadcast

Use the event_emitter package to implement the publish-subscribe model. Wrap gws.Conn in a structure and implement the GetSubscriberID method to get the subscription ID, which must be unique. The subscription ID is used to identify the subscriber, who can only receive messages on the subject of his subscription.

This example is useful for building chat rooms or push messages using gws. This means that a user can subscribe to one or more topics via websocket, and when a message is posted to that topic, all subscribers will receive the message.

package main

import (
	"github.com/lxzan/event_emitter"
	"github.com/lxzan/gws"
)

type Socket gws.Conn

func NewSocket(conn *gws.Conn) *Socket { return (*Socket)(conn) }

func (c *Socket) GetSubscriberID() int64 {
	userId, _ := c.GetMetadata().Load("userId")
	return userId.(int64)
}

func (c *Socket) GetMetadata() event_emitter.Metadata { return c.Conn().Session() }

func (c *Socket) Conn() *gws.Conn { return (*gws.Conn)(c) }

func Sub(em *event_emitter.EventEmitter[int64, *Socket], socket *Socket, topic string) {
	em.Subscribe(socket, topic, func(subscriber *Socket, msg any) {
		_ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn())
	})
}

func Pub(em *event_emitter.EventEmitter[int64, *Socket], topic string, op gws.Opcode, msg []byte) {
	var broadcaster = gws.NewBroadcaster(op, msg)
	defer broadcaster.Close()
	em.Publish(topic, broadcaster)
}