Skip to content

Commit

Permalink
feat: support locking each I/O event-loop goroutine to an OS thread
Browse files Browse the repository at this point in the history
Fixes #133
  • Loading branch information
panjf2000 committed Sep 13, 2020
1 parent f8287a8 commit 6fd6413
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 14 deletions.
12 changes: 10 additions & 2 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,17 @@

package gnet

import "time"
import (
"runtime"
"time"
)

func (svr *server) listenerRun(lockOSThread bool) {
if lockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

func (svr *server) listenerRun() {
var err error
defer func() { svr.signalShutdown(err) }()
var packet [0x10000]byte
Expand Down
2 changes: 2 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package errors
import "errors"

var (
// ErrTooManyEventLoopThreads occurs when attempting to set up more than 10,000 event-loop goroutines under LockOSThread mode.
ErrTooManyEventLoopThreads = errors.New("too many event-loops under LockOSThread mode")
// ErrUnsupportedProtocol occurs when trying to use protocol that is not supported.
ErrUnsupportedProtocol = errors.New("only unix, tcp/tcp4/tcp6, udp/udp4/udp6 are supported")
// ErrUnsupportedTCPProtocol occurs when trying to use an unsupported TCP protocol.
Expand Down
8 changes: 7 additions & 1 deletion eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package gnet

import (
"os"
"runtime"
"time"

"github.com/panjf2000/gnet/errors"
Expand All @@ -51,7 +52,12 @@ func (el *eventloop) closeAllConns() {
}
}

func (el *eventloop) loopRun() {
func (el *eventloop) loopRun(lockOSThread bool) {
if lockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer func() {
el.closeAllConns()
el.ln.close()
Expand Down
8 changes: 7 additions & 1 deletion eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package gnet

import (
"runtime"
"time"

"github.com/panjf2000/gnet/pool/bytebuffer"
Expand All @@ -39,7 +40,12 @@ type eventloop struct {
calibrateCallback func(*eventloop, int32) // callback func for re-adjusting connCount
}

func (el *eventloop) loopRun() {
func (el *eventloop) loopRun(lockOSThread bool) {
if lockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

var err error
defer func() {
if el.idx == 0 && el.svr.opts.Ticker {
Expand Down
9 changes: 9 additions & 0 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/logging"
)

Expand Down Expand Up @@ -238,6 +239,14 @@ func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err err
}
defer logging.Cleanup()

// The maximum number of operating system threads that the Go program can use is initially set to 10000,
// which should be the maximum amount of I/O event-loops locked to OS threads users can start up.
if options.LockOSThread && options.NumEventLoop > 10000 {
logging.DefaultLogger.Errorf("too many event-loops under LockOSThread mode, should be less than 10,000 "+
"while you are trying to set up %d\n", options.NumEventLoop)
return errors.ErrTooManyEventLoopThreads
}

network, addr := parseProtoAddr(protoAddr)

var ln *listener
Expand Down
11 changes: 10 additions & 1 deletion gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func testServe(network, addr string, reuseport, multicore, async bool, nclients
nclients: nclients,
workerPool: goroutine.Default(),
}
must(Serve(ts, network+"://"+addr, WithMulticore(multicore), WithReusePort(reuseport), WithTicker(true),
must(Serve(ts, network+"://"+addr, WithLockOSThread(async), WithMulticore(multicore), WithReusePort(reuseport), WithTicker(true),
WithTCPKeepAlive(time.Minute*1), WithLoadBalancing(lb)))
}

Expand Down Expand Up @@ -1022,3 +1022,12 @@ func testCloseConnection(network, addr string) {
events := &testCloseConnectionServer{network: network, addr: addr}
must(Serve(events, network+"://"+addr, WithTicker(true)))
}

func TestServerOptionsCheck(t *testing.T) {
if err := Serve(&EventServer{}, "tcp://:3500", WithNumEventLoop(10001), WithLockOSThread(true)); err != errors.ErrTooManyEventLoopThreads {
t.Fail()
t.Log("error returned with LockOSThread option")
} else {
t.Log("got expected result")
}
}
13 changes: 13 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ type Options struct {
// assigned to the value of logical CPUs usable by the current process.
Multicore bool

// LockOSThread is used to determine whether each I/O event-loop is associated to an OS thread, it is useful when you
// need some kind of mechanisms like thread local storage, or invoke certain C libraries (such as graphics lib: GLib)
// that require thread-level manipulation via cgo, or want all I/O event-loops to actually run in parallel for a
// potential higher performance.
LockOSThread bool

// LB represents the load-balancing algorithm used when assigning new connections.
LB LoadBalancing

Expand Down Expand Up @@ -83,6 +89,13 @@ func WithMulticore(multicore bool) Option {
}
}

// WithLockOSThread sets up lockOSThread mode for I/O event-loops.
func WithLockOSThread(lockOSThread bool) Option {
return func(opts *Options) {
opts.LockOSThread = lockOSThread
}
}

// WithLoadBalancing sets up the load-balancing algorithm in gnet server.
func WithLoadBalancing(lb LoadBalancing) Option {
return func(opts *Options) {
Expand Down
17 changes: 15 additions & 2 deletions reactor_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,20 @@
package gnet

import (
"runtime"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/netpoll"
)

func (svr *server) activateMainReactor() {
func (svr *server) activateMainReactor(lockOSThread bool) {
if lockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer svr.signalShutdown()

switch err := svr.mainLoop.poller.Polling(func(fd int, filter int16) error { return svr.acceptNewConnection(fd) }); err {
case errors.ErrServerShutdown:
svr.logger.Infof("Main reactor is exiting normally on the signal error: %v", err)
Expand All @@ -38,7 +46,12 @@ func (svr *server) activateMainReactor() {
}
}

func (svr *server) activateSubReactor(el *eventloop) {
func (svr *server) activateSubReactor(el *eventloop, lockOSThread bool) {
if lockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer func() {
el.closeAllConns()
if el.idx == 0 && svr.opts.Ticker {
Expand Down
17 changes: 15 additions & 2 deletions reactor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@
package gnet

import (
"runtime"

"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/netpoll"
)

func (svr *server) activateMainReactor() {
func (svr *server) activateMainReactor(lockOSThread bool) {
if lockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer svr.signalShutdown()

switch err := svr.mainLoop.poller.Polling(func(fd int, ev uint32) error { return svr.acceptNewConnection(fd) }); err {
case errors.ErrServerShutdown:
svr.logger.Infof("Main reactor is exiting normally on the signal error: %v", err)
Expand All @@ -35,7 +43,12 @@ func (svr *server) activateMainReactor() {
}
}

func (svr *server) activateSubReactor(el *eventloop) {
func (svr *server) activateSubReactor(el *eventloop, lockOSThread bool) {
if lockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer func() {
el.closeAllConns()
if el.idx == 0 && svr.opts.Ticker {
Expand Down
6 changes: 3 additions & 3 deletions server_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (svr *server) startEventLoops() {
svr.subEventLoopSet.iterate(func(i int, el *eventloop) bool {
svr.wg.Add(1)
go func() {
el.loopRun()
el.loopRun(svr.opts.LockOSThread)
svr.wg.Done()
}()
return true
Expand All @@ -88,7 +88,7 @@ func (svr *server) startSubReactors() {
svr.subEventLoopSet.iterate(func(i int, el *eventloop) bool {
svr.wg.Add(1)
go func() {
svr.activateSubReactor(el)
svr.activateSubReactor(el, svr.opts.LockOSThread)
svr.wg.Done()
}()
return true
Expand Down Expand Up @@ -163,7 +163,7 @@ func (svr *server) activateReactors(numEventLoop int) error {
// Start main reactor in background.
svr.wg.Add(1)
go func() {
svr.activateMainReactor()
svr.activateMainReactor(svr.opts.LockOSThread)
svr.wg.Done()
}()
} else {
Expand Down
4 changes: 2 additions & 2 deletions server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (svr *server) signalShutdown(err error) {
func (svr *server) startListener() {
svr.listenerWG.Add(1)
go func() {
svr.listenerRun()
svr.listenerRun(svr.opts.LockOSThread)
svr.listenerWG.Done()
}()
}
Expand All @@ -97,7 +97,7 @@ func (svr *server) startEventLoops(numEventLoop int) {

svr.loopWG.Add(svr.subEventLoopSet.len())
svr.subEventLoopSet.iterate(func(i int, el *eventloop) bool {
go el.loopRun()
go el.loopRun(svr.opts.LockOSThread)
return true
})
}
Expand Down

0 comments on commit 6fd6413

Please sign in to comment.