Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]: How to shut down the started tcp server and turn off port listening? #602

Closed
3 tasks done
shiyuecamus opened this issue May 14, 2024 · 2 comments
Closed
3 tasks done
Labels
help wanted Extra attention is needed inactivity needs investigation needs more info question Further information is requested waiting for response waiting for the response from commenter

Comments

@shiyuecamus
Copy link

Actions I've taken before I'm here

  • I've thoroughly read the documentations about this problem but still have no answer.
  • I've searched the Github Issues/Discussions but didn't find any similar problems that have been solved.
  • I've searched the internet for this problem but didn't find anything helpful.

Questions with details

调用eng.Stop(context.Background())后端口依然被占用。请问如何解决?一下是我的完整版代码

Code snippets (optional)

package initializer

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/panjf2000/gnet/v2"
	"github.com/sirupsen/logrus"
	"time"
)

type Discoverer struct {
	ts        tcpServer
	us        udpServer
	onInitReq func(params initParams) error
}

func NewDiscoverer() *Discoverer {
	d := &Discoverer{}
	d.ts = tcpServer{
		onMessage: d.onTcpMessage,
	}
	d.us = udpServer{
		onMessage: d.onUdpMessage,
	}
	return d
}

func (d *Discoverer) comeToLight() (err error) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	var (
		tcpDone = make(chan bool, 1)
		tcpFlag bool
		udpFlag bool
		udpDone = make(chan bool, 1)
		errCh   = make(chan error)
	)

	go func() {
		time.AfterFunc(time.Second*2, func() {
			tcpDone <- true
		})
		if e := gnet.Run(d.ts, "tcp://:12345",
			gnet.WithMulticore(true),
			gnet.WithReuseAddr(true),
			gnet.WithReusePort(true)); e != nil {
			errCh <- fmt.Errorf("discover tcp server run failed with error: %s", e)
			return
		}
	}()

	go func() {
		time.AfterFunc(time.Second*2, func() {
			udpDone <- true
		})
		if e := gnet.Run(d.us, "udp://:54321", gnet.WithMulticore(true),
			gnet.WithReuseAddr(true),
			gnet.WithReusePort(true)); e != nil {
			errCh <- fmt.Errorf("discover udp server run failed with error: %s", e)
		}
	}()

	for {
		if tcpFlag && udpFlag {
			return
		}
		select {
		case tcpFlag = <-tcpDone:
		case udpFlag = <-udpDone:
		case err = <-errCh:
			return
		case <-ctx.Done():
			return fmt.Errorf("server run operation timed out: %s", ctx.Err())
		}
	}
}

func (d *Discoverer) hide() {
	logrus.Info("Gateway Discoverer to hide.")
	d.ts.close()
	d.us.close()
}

type initParams struct {
	DeviceName            string `json:"deviceName"`
	Host                  string `json:"host"`
	Port                  int    `json:"port"`
	ProvisionType         string `json:"provisionType"`
	ProvisionDeviceKey    string `json:"provisionDeviceKey"`
	ProvisionDeviceSecret string `json:"provisionDeviceSecret"`
}

func (d *Discoverer) onTcpMessage(payload []byte, c gnet.Conn) {
	params := initParams{}
	m := map[string]any{
		"code": 1,
	}
	if err := json.Unmarshal(payload, &params); err != nil {
		logrus.Warnf("Gateway discoverer unmarshal json for tcp init request failed with err: %s", err)
		m["code"] = 0
		m["msg"] = fmt.Sprintf("Gateway discoverer unmarshal json for init request failed with err: %s", err)
		goto response
	}
	if d.onInitReq != nil {
		if err := d.onInitReq(params); err != nil {
			m["code"] = 0
			m["msg"] = fmt.Sprintf("Gateway discoverer handler init failed with err: %s", err)
		}
	}
response:
	bytes, _ := json.Marshal(&m)
	_, _ = c.Write(bytes)
}

func (d *Discoverer) onUdpMessage(_ []byte, c gnet.Conn) {
	bs, err := json.Marshal(map[string]any{
		"model": "yunqi.gateway.tb1",
	})
	if err != nil {
		logrus.Warnf("Gateway discoverer marshal json for udp reponse failed with err: %s", err)
	}
	_, err = c.Write(bs)
	if err != nil {
		logrus.Warnf("Gateway discoverer write udp reponse failed with err: %s", err)
	}
}

type tcpServer struct {
	*gnet.BuiltinEventEngine
	eng       gnet.Engine
	onMessage func([]byte, gnet.Conn)
}

type udpServer struct {
	*gnet.BuiltinEventEngine
	eng       gnet.Engine
	onMessage func([]byte, gnet.Conn)
}

func (t tcpServer) OnBoot(eng gnet.Engine) (action gnet.Action) {
	t.eng = eng
	logrus.Infof("Discoverer tcp server is boot")
	return
}

func (t tcpServer) OnShutdown(gnet.Engine) {
	logrus.Infof("Discoverer tcp server is shutdown")
	return
}

func (t tcpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
	logrus.Infof("Discoverer tcp client is open for: %s", c.RemoteAddr())
	return
}

func (t tcpServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
	logrus.Infof("Discoverer tcp client is close for: %s, error: %s", c.Close(), err)
	return
}

func (t tcpServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
	bytes, _ := c.Next(-1)
	pkg := make([]byte, len(bytes))
	copy(pkg, bytes)
	if t.onMessage != nil {
		t.onMessage(bytes, c)
	}
	logrus.Debugf("Discoverer tcp server message: %s, length: %d", c.RemoteAddr().String(), len(pkg))
	return
}

func (t tcpServer) close() {
	_ = t.eng.Stop(context.Background())
}

func (u udpServer) OnBoot(eng gnet.Engine) (action gnet.Action) {
	u.eng = eng
	logrus.Infof("Discoverer udp server is boot")
	return
}

func (u udpServer) OnShutdown(gnet.Engine) {
	logrus.Infof("Discoverer udp server is shutdown")
	return
}

func (u udpServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
	logrus.Infof("Discoverer udp client is open for: %s", c.RemoteAddr())
	return
}

func (u udpServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
	logrus.Infof("Discoverer udp client is close for: %s, error: %s", c.Close(), err)
	return
}

func (u udpServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
	bytes, _ := c.Next(-1)
	pkg := make([]byte, len(bytes))
	copy(pkg, bytes)
	logrus.Debugf("Discoverer udp server message: %s, length: %d", c.RemoteAddr().String(), len(pkg))
	if u.onMessage != nil {
		u.onMessage(bytes, c)
	}
	return
}

func (u udpServer) close() {
	_ = u.eng.Stop(context.Background())
}
@shiyuecamus shiyuecamus added help wanted Extra attention is needed question Further information is requested labels May 14, 2024
@gh-translator gh-translator changed the title [Question]: 已启动的tcp server怎么关闭并且关闭端口监听? [Question]: How to shut down the started tcp server and turn off port listening? May 14, 2024
@panjf2000
Copy link
Owner

关闭之后有等待 2*MSL 时间之后再重新启动绑定地址吗?否则的话处于 TIME_WAIT 状态的连接无法重新绑定,除非使用 SO_REUSEADDR.

@gh-translator
Copy link
Collaborator

🤖 Non-English text detected, translating...


After shutting down, do you have to wait 2*MSL time before restarting the binding address? Otherwise a connection in TIME_WAIT state cannot be rebound unless SO_REUSEADDR is used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed inactivity needs investigation needs more info question Further information is requested waiting for response waiting for the response from commenter
Projects
None yet
Development

No branches or pull requests

3 participants