导语 | 现如今提起网络大家的第一反应就是epoll,而实际工程开发中绝大部分的情况都会优先考虑采用已有的一些开源网络框架来做功能的开发。网络框架不同的语言有不同的实现,例如java中大名鼎鼎的netty,再比如c++中的libevent、boost::asio、muduo等,golang中目前在开源社区比较有影响力的网络框架有gnet、evio、netpoll这几个。在之前研究完gnet后差不多一年多的时间了,近期机缘巧合又抽空研究了下netpoll的源码实现。在此总结一篇源码分析的文章,方便日后回顾。








  • mainReactor主要负责接收客户端的连接请求,建立新连接,接收完连接后mainReactor就会按照一定的负载均衡策略分发给其中一个subReactor进行管理。 

  • subReactor会将新的客户端连接进行管理,负责后续该客户端的请求处理。 

  • 通常Reactor线程主要负责IO的操作(数据读写)、而业务逻辑的处理会由专门的工作线程来执行。



二、 netpoll整体框架

(一)netpoll client和server端的交互过程




  • Serve():启动服务端,监听等待客户端的请求。

  • OnPrepare():主要做一些初始化、准备的工作,创建连接前回调。

  • OnConnect():在连接创建后回调。

  • OnRequest():业务逻辑方法回调,实现业务逻辑异步处理。

(二)netpoll server端内部结构





Poll: 是抽象出的一套接口,屏蔽底层不同操作系统平台接口的差异,linux下采用epoll来实现、bsd平台下则采用kqueue来实现。 



三、netpoll Server端源码分析

(一) server使用示例


func main(){  // 1.创建listener  var listener, _ = CreateListener(network, address)  // 2.初始化EventLoop  var eventLoop, _ = NewEventLoop(func(ctx context.Context, connection Connection) error {    time.Sleep(time.Duration(rand.Intn(3)) * time.Second)    if l := connection.Reader().Len(); l > 0 {      var data, err = connection.Reader().Next(l)      if err != nil {        return err      }      fmt.Printf("data:%+v\n", string(data))    }    return nil  })  // 3.阻塞等待客户端请求  eventLoop.Serve(listener)


  • 创建Listener 

  • 创建EventLoop 

  • Serve() 




// A EventLoop is a network server.type EventLoop interface {  // Serve registers a listener and runs blockingly to provide services, including listening to ports,  // accepting connections and processing trans data. When an exception occurs or Shutdown is invoked,  // Serve will return an error which describes the specific reason.  Serve(ln net.Listener) error
// Shutdown is used to graceful exit. // It will close all idle connections on the server, but will not change the underlying pollers. // // Argument: ctx set the waiting deadline, after which an error will be returned, // but will not force the closing of connections in progress. Shutdown(ctx context.Context) error}

// OnRequest defines the function for handling connection. When data is sent from the connection peer,// netpoll actively reads the data in LT mode and places it in the connection's input buffer.// Generally, OnRequest starts handling the data in the following way://// func OnRequest(ctx context, connection Connection) error {// input := connection.Reader().Next(n)// handling input data...// send, _ := connection.Writer().Malloc(l)// copy(send, output)// connection.Flush()// return nil// }//// OnRequest will run in a separate goroutine and// it is guaranteed that there is one and only one OnRequest running at the same time.// The underlying logic is similar to://// go func() {// for !connection.Reader().IsEmpty() {// OnRequest(ctx, connection)// }// }()//// PLEASE NOTE:// OnRequest must either eventually read all the input data or actively Close the connection,// otherwise the goroutine will fall into a dead loop.//// Return: error is unused which will be ignored directly.type OnRequest func(ctx context.Context, connection Connection) error


// NewEventLoop .func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) {  opts := &options{    onRequest: onRequest,  }  for _, do := range ops {    do.f(opts)  }  return &eventLoop{    opts: opts,    stop: make(chan error, 1),  }, nil}
type eventLoop struct { sync.Mutex svr *server
opts *options stop chan error}
// Serve implements EventLoop.func (evl *eventLoop) Serve(ln net.Listener) error { npln, err := ConvertListener(ln) if err != nil { return err } evl.Lock() evl.svr = newServer(npln, evl.opts, evl.quit) // 开启所有的epoll,然后异步协程阻塞等待 evl.svr.Run() evl.Unlock()
// 阻塞住 err = evl.waitQuit() // ensure evl will not be finalized until Serve returns runtime.SetFinalizer(evl, nil) return err}


  • 转换Listener

  • 创建一个server对象

  • 调用server的run()方法


// newServer wrap listener into server, quit will be invoked when server exit.func newServer(ln Listener, opts *options, onQuit func(err error)) *server {  return &server{    ln:     ln,    opts:   opts,    onQuit: onQuit,  }}
type server struct { operator FDOperator ln Listener opts *options onQuit func(err error) connections sync.Map // key=fd, value=connection}
// Run this server.func (s *server) Run() (err error) { s.operator = FDOperator{ FD: s.ln.Fd(), OnRead: s.OnRead, OnHup: s.OnHup, } // 从pollmanager中选择出来一个epoll,来管理server fd,也就是设置mainReactor s.operator.poll = pollmanager.Pick() // 服务端设置可读 err = s.operator.Control(PollReadable)
if err != nil { s.onQuit(err) } return err}

// OnRead implements FDOperator.// 服务端读就绪时,处理接收客户端连接数据func (s *server) OnRead(p Poll) error { // accept socket // 接收客户端连接 conn, err := s.ln.Accept() if err != nil { // shut down if strings.Contains(err.Error(), "closed") { s.operator.Control(PollDetach) s.onQuit(err) return err } log.Println("accept conn failed:", err.Error()) return err } if conn == nil { return nil } // store & register connection // 构建一个新的连接 var connection = &connection{} // 加入到epoll中,并传入opts, // 其中包含处理业务逻辑的OnRequest connection.init(conn.(Conn), s.opts)
if !connection.IsActive() { return nil } var fd = conn.(Conn).Fd()
// 存储新的连接 connection.AddCloseCallback(func(connection Connection) error { s.connections.Delete(fd) return nil }) s.connections.Store(fd, connection)
// trigger onConnect asynchronously connection.onConnect() return nil}



  s.operator.poll = pollmanager.Pick()    // 服务端设置可读    err = s.operator.Control(PollReadable)





 connection.init(conn.(Conn), s.opts)





// init initialize the connection with optionsfunc (c *connection) init(conn Conn, opts *options) (err error) { // init buffer, barrier, finalizer c.readTrigger = make(chan struct{}, 1) c.writeTrigger = make(chan error, 1) c.bookSize, c.maxSize = block1k/2, pagesize c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer() c.inputBarrier, c.outputBarrier = barrierPool.Get().(*barrier), barrierPool.Get().(*barrier)
// 初始化fd c.initNetFD(conn) // conn must be *netFD{} // 初始化FDOperator c.initFDOperator() c.initFinalizer()
syscall.SetNonblock(c.fd, true) // enable TCP_NODELAY by default switch c.network { case "tcp", "tcp4", "tcp6": setTCPNoDelay(c.fd, true) } // check zero-copy if setZeroCopy(c.fd) == nil && setBlockZeroCopySend(c.fd, defaultZeroCopyTimeoutSec, 0) == nil { c.supportZeroCopy = true }
// connection initialized and prepare options // 做连接初始化和准备工作 return c.onPrepare(opts)}
func (c *connection) initNetFD(conn Conn) { if nfd, ok := conn.(*netFD); ok { c.netFD = *nfd return } c.netFD = netFD{ fd: conn.Fd(), localAddr: conn.LocalAddr(), remoteAddr: conn.RemoteAddr(), }}

func (c *connection) initFDOperator() { // 分配一个FDOperator op := allocop() op.FD = c.fd op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup // 设置输入处理方法和输入的响应回调接口 op.Inputs, op.InputAck = c.inputs, c.inputAck // 当outputs为空时,监听可读事件 op.Outputs, op.OutputAck = c.outputs, c.outputAck
// if connection has been registered, must reuse poll here. if c.pd != nil && c.pd.operator != nil { op.poll = c.pd.operator.poll } c.operator = op}

// OnPrepare supports close connection, but not read/write data.// connection will be registered by this call after preparing.func (c *connection) onPrepare(opts *options) (err error) { if opts != nil { c.SetOnConnect(opts.onConnect) c.SetOnRequest(opts.onRequest) c.SetReadTimeout(opts.readTimeout) c.SetIdleTimeout(opts.idleTimeout)
// calling prepare first and then register. if opts.onPrepare != nil { // 执行onPrepare回调方法 c.ctx = opts.onPrepare(c) } }
if c.ctx == nil { c.ctx = context.Background() } // prepare may close the connection. if c.IsActive() { // 注册客户端,注册到Poll(epoll)中 return c.register() } return nil}
// register only use for connection register into poll.// 注册客户端到epoll中func (c *connection) register() (err error) { // 将当前的客户端加入到epoll中管理,此处对应的是subReactor if c.operator.poll != nil { err = c.operator.Control(PollModReadable) } else { c.operator.poll = pollmanager.Pick() err = c.operator.Control(PollReadable) } if err != nil { log.Println("connection register failed:", err.Error()) c.Close() return Exception(ErrConnClosed, err.Error()) } return nil}




// 设置loop数量func setNumLoops(numLoops int) error { return pollmanager.SetNumLoops(numLoops)}
func setLoadBalance(lb LoadBalance) error { return pollmanager.SetLoadBalance(lb)}
// manage all pollersvar pollmanager *manager
// 启动的时候进行初始化func init() { pollmanager = &manager{} // 设置负载均衡 pollmanager.SetLoadBalance(RoundRobin) // 设置了循环个数,也就是epoll个数 pollmanager.SetNumLoops(defaultNumLoops())}
func defaultNumLoops() int { procs := runtime.GOMAXPROCS(0) loops := 1 // Loops produce events that handlers consume, // so the producer should be faster than consumer otherwise it will have a bottleneck. // But there is no universal option that could be appropriate for any use cases, // plz use `SetNumLoops` if you do know what you want. if procs > 4 { loops = procs } return loops}
// LoadBalance is used to do load balancing among multiple pollers.// a single poller may not be optimal if the number of cores is large (40C+).type manager struct { NumLoops int balance loadbalance // load balancing method // 维护所有的Poll对象 polls []Poll // all the polls}
// SetNumLoops will return error when set numLoops < 1func (m *manager) SetNumLoops(numLoops int) error { if numLoops < 1 { return fmt.Errorf("set invaild numLoops[%d]", numLoops) } // if less than, reset all; else new the delta. if numLoops < m.NumLoops { m.NumLoops = numLoops return m.Reset() } m.NumLoops = numLoops return m.Run()}
// SetLoadBalance set load balance.func (m *manager) SetLoadBalance(lb LoadBalance) error { if m.balance != nil && m.balance.LoadBalance() == lb { return nil } m.balance = newLoadbalance(lb, m.polls) return nil}
// Run all pollers.func (m *manager) Run() error { // new poll to fill delta. for idx := len(m.polls); idx < m.NumLoops; idx++ { // 创建epoll var poll = openPoll() m.polls = append(m.polls, poll) // 每个epoll阻塞等待 go poll.Wait() } // LoadBalance must be set before calling Run, otherwise it will panic. m.balance.Rebalance(m.polls) return nil}

// Pick will select the poller for use each time based on the LoadBalance.func (m *manager) Pick() Poll { return m.balance.Pick()}


  • openPoll(): 根据字面意思来理解,就是打开一个Poll,大概猜测下内部肯定就是调用了epoll_create()方法来实现。

  • poll.Wait(): 该方法是通过go协程异步调用的,猜想内部肯定是调用epoll的epoll_wait()方法。




// Poll monitors fd(file descriptor), calls the FDOperator to perform specific actions,// and shields underlying differences. On linux systems, poll uses epoll by default,// and kevent by default on bsd systems.type Poll interface {  // Wait will poll all registered fds, and schedule processing based on the triggered event.  // The call will block, so the usage can be like:  //  //  go wait()  //  Wait() error
// Close the poll and shutdown Wait(). Close() error
// Trigger can be used to actively refresh the loop where Wait is located when no event is triggered. // On linux systems, eventfd is used by default, and kevent by default on bsd systems. Trigger() error
// Control the event of file descriptor and the operations is defined by PollEvent. Control(operator *FDOperator, event PollEvent) error}
// PollEvent defines the operation of poll.Control.type PollEvent int
const ( // PollReadable is used to monitor whether the FDOperator registered by // listener and connection is readable or closed. PollReadable PollEvent = 0x1
// PollWritable is used to monitor whether the FDOperator created by the dialer is writable or closed. // ET mode must be used (still need to poll hup after being writable) PollWritable PollEvent = 0x2
// PollDetach is used to remove the FDOperator from poll. PollDetach PollEvent = 0x3
// PollModReadable is used to re-register the readable monitor for the FDOperator created by the dialer. // It is only used when calling the dialer's conn init. PollModReadable PollEvent = 0x4
// PollR2RW is used to monitor writable for FDOperator, // which is only called when the socket write buffer is full. PollR2RW PollEvent = 0x5
// PollRW2R is used to remove the writable monitor of FDOperator, generally used with PollR2RW. PollRW2R PollEvent = 0x6)


const EPOLLET = -syscall.EPOLLET
type epollevent struct { events uint32 data [8]byte // unaligned uintptr}
// EpollCtl implements epoll_ctl.func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) if err == syscall.Errno(0) { err = nil } return err}
// EpollWait implements epoll_wait.func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { var r0 uintptr var _p0 = unsafe.Pointer(&events[0]) if msec == 0 { r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) } else { r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) } if err == syscall.Errno(0) { err = nil } return int(r0), err}

// Includes defaultPoll/multiPoll/uringPoll...func openPoll() Poll { return openDefaultPoll()}
func openDefaultPoll() *defaultPoll { var poll = defaultPoll{} poll.buf = make([]byte, 8) // 创建epoll var p, err = syscall.EpollCreate1(0) if err != nil { panic(err) } poll.fd = p var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) if e0 != 0 { syscall.Close(p) panic(err) }
poll.Reset = poll.reset poll.Handler = poll.handler
poll.wop = &FDOperator{FD: int(r0)} poll.Control(poll.wop, PollReadable) return &poll}
type defaultPoll struct { pollArgs fd int // epoll fd wop *FDOperator // eventfd, wake epoll_wait buf []byte // read wfd trigger msg trigger uint32 // trigger flag // fns for handle events Reset func(size, caps int) Handler func(events []epollevent) (closed bool)}
type pollArgs struct { size int caps int events []epollevent barriers []barrier}
func (a *pollArgs) reset(size, caps int) { a.size, a.caps = size, caps a.events, a.barriers = make([]epollevent, size), make([]barrier, size) for i := range a.barriers { a.barriers[i].bs = make([][]byte, a.caps) a.barriers[i].ivs = make([]syscall.Iovec, a.caps) }}
// Wait implements Poll.func (p *defaultPoll) Wait() (err error) { // init var caps, msec, n = barriercap, -1, 0 p.Reset(128, caps) // wait for { if n == p.size && p.size < 128*1024 { p.Reset(p.size<<1, caps) } // 调用epoll_wait() n, err = EpollWait(p.fd, p.events, msec) if err != nil && err != syscall.EINTR { return err } if n <= 0 { msec = -1 runtime.Gosched() continue } msec = 0 if p.Handler(p.events[:n]) { return nil } }}
func (p *defaultPoll) handler(events []epollevent) (closed bool) { var hups []*FDOperator // TODO: maybe can use sync.Pool for i := range events { var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data)) // trigger or exit gracefully if operator.FD == p.wop.FD { // must clean trigger first syscall.Read(p.wop.FD, p.buf) atomic.StoreUint32(&p.trigger, 0) // if closed & exit if p.buf[0] > 0 { syscall.Close(p.wop.FD) syscall.Close(p.fd) return true } continue } if !operator.do() { continue }
evt := events[i].events switch { // check hup first case evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0: hups = append(hups, operator) case evt&syscall.EPOLLERR != 0: // Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN. // So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup. if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN { hups = append(hups, operator) } default: // 读事件 if evt&syscall.EPOLLIN != 0 { if operator.OnRead != nil { // for non-connection operator.OnRead(p) } else { // for connection var bs = operator.Inputs(p.barriers[i].bs) if len(bs) > 0 { var n, err = readv(operator.FD, bs, p.barriers[i].ivs) // 内部会有处理业务逻辑的调用 operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) hups = append(hups, operator) break } } } } // 写事件 if evt&syscall.EPOLLOUT != 0 { if operator.OnWrite != nil { // for non-connection operator.OnWrite(p) } else { // for connection var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) if len(bs) > 0 { // TODO: Let the upper layer pass in whether to use ZeroCopy. var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) hups = append(hups, operator) break } } } } } operator.done() } // hup conns together to avoid blocking the poll. if len(hups) > 0 { p.detaches(hups) } return false}
// Close will write 10000000func (p *defaultPoll) Close() error { _, err := syscall.Write(p.wop.FD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) return err}
// Trigger implements Poll.func (p *defaultPoll) Trigger() error { if atomic.AddUint32(&p.trigger, 1) > 1 { return nil } // MAX(eventfd) = 0xfffffffffffffffe _, err := syscall.Write(p.wop.FD, []byte{0, 0, 0, 0, 0, 0, 0, 1}) return err}
// Control implements Poll.func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { var op int var evt epollevent *(**FDOperator)(unsafe.Pointer(&evt.data)) = operator switch event { case PollReadable: operator.inuse() op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollModReadable: operator.inuse() op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollDetach: defer operator.unused() op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollWritable: operator.inuse() op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollR2RW: op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR case PollRW2R: op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR } // epoll_ctl return EpollCtl(p.fd, op, operator.FD, &evt)}
func (p *defaultPoll) detaches(hups []*FDOperator) error { var onhups = make([]func(p Poll) error, len(hups)) for i := range hups { onhups[i] = hups[i].OnHup p.Control(hups[i], PollDetach) } go func(onhups []func(p Poll) error) { for i := range onhups { if onhups[i] != nil { onhups[i](p) } } }(onhups) return nil}




// inputAck implements FDOperator.func (c *connection) inputAck(n int) (err error) {  if n < 0 {    n = 0  }  // Auto size bookSize.  if n == c.bookSize && c.bookSize < mallocMax {    c.bookSize <<= 1  }
length, _ := c.inputBuffer.bookAck(n) if c.maxSize < length { c.maxSize = length } if c.maxSize > mallocMax { c.maxSize = mallocMax }
var needTrigger = true if length == n { // first start onRequest // 读完数据后调用处理逻辑方法 needTrigger = c.onRequest() } if needTrigger && length >= int(atomic.LoadInt32(&c.waitReadSize)) { c.triggerRead() } return nil}
// onRequest is responsible for executing the closeCallbacks after the connection has been closed.func (c *connection) onRequest() (needTrigger bool) { var onRequest, ok = c.onRequestCallback.Load().(OnRequest) if !ok { return true } processed := c.onProcess( // only process when conn active and have unread data func(c *connection) bool { return c.Reader().Len() > 0 && c.IsActive() }, func(c *connection) { // 执行业务逻辑方法 _ = onRequest(c.ctx, c) }, ) // if not processed, should trigger read return !processed}
// onProcess is responsible for executing the process function serially,// and make sure the connection has been closed correctly if user call c.Close() in process function.func (c *connection) onProcess(isProcessable func(c *connection) bool, process func(c *connection)) (processed bool) { if process == nil { return false } // task already exists if !c.lock(processing) { return false } // add new task var task = func() { START: // Single request processing, blocking allowed. for isProcessable(c) { process(c) } // Handling callback if connection has been closed. if !c.IsActive() { c.closeCallback(false) return } c.unlock(processing) // Double check when exiting. if isProcessable(c) && c.lock(processing) { goto START } // task exits return }
runTask(c.ctx, task) return true}
var runTask = gopool.CtxGo



// Flush will send all malloc data to the peer,// so must confirm that the allocated bytes have been correctly assigned.//// Flush first checks whether the out buffer is empty.// If empty, it will call syscall.Write to send data directly,// otherwise the buffer will be sent asynchronously by the epoll trigger.func (c *connection) Flush() error { if !c.lock(flushing) { return Exception(ErrConnClosed, "when flush") } defer c.unlock(flushing) c.outputBuffer.Flush() return c.flush()}
// Write will Flush soon.func (c *connection) Write(p []byte) (n int, err error) { if !c.lock(flushing) { return 0, Exception(ErrConnClosed, "when write") } defer c.unlock(flushing)
dst, _ := c.outputBuffer.Malloc(len(p)) n = copy(dst, p) c.outputBuffer.Flush() err = c.flush() return n, err}

// flush write data directly.func (c *connection) flush() error { if c.outputBuffer.IsEmpty() { return nil } // TODO: Let the upper layer pass in whether to use ZeroCopy. var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs) var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy) if err != nil && err != syscall.EAGAIN { return Exception(err, "when flush") } if n > 0 { err = c.outputBuffer.Skip(n) c.outputBuffer.Release() if err != nil { return Exception(err, "when flush") } } // return if write all buffer. if c.outputBuffer.IsEmpty() { return nil } //更新描述符状态为可读写 err = c.operator.Control(PollR2RW) if err != nil { return Exception(err, "when flush") }
err = <-c.writeTrigger return err}


这一节中重点分析了netpoll server端的源码实现,主要从server的Serve入口开始分析,然后内部介绍了server接收客户端连接的实现逻辑、处理客户端请求的逻辑、以及pollmanager模块逻辑、Poll封装逻辑等内容。实际框架源码内容更多,本文是按照阅读代码的习惯,精简了核心代码进行了介绍。感兴趣的读者看完后可以直接打开项目进行阅读。

四、netpoll client端源码分析



// Dialer extends net.Dialer's API, just for interface compatibility.// DialConnection is recommended, but of course all functions are practically the same.// The returned net.Conn can be directly asserted as Connection if error is nil.type Dialer interface {  DialConnection(network, address string, timeout time.Duration) (connection Connection, err error)
DialTimeout(network, address string, timeout time.Duration) (conn net.Conn, err error)}
// DialConnection is a default implementation of Dialer.func DialConnection(network, address string, timeout time.Duration) (connection Connection, err error) { return defaultDialer.DialConnection(network, address, timeout)}
// NewDialer only support TCP and unix socket now.func NewDialer() Dialer { return &dialer{}}
var defaultDialer = NewDialer()
type dialer struct{}
// DialTimeout implements Dialer.func (d *dialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) { conn, err := d.DialConnection(network, address, timeout) return conn, err}
// DialConnection implements Dialer.func (d *dialer) DialConnection(network, address string, timeout time.Duration) (connection Connection, err error) { ctx := context.Background() if timeout > 0 { subCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() ctx = subCtx }
switch network { case "tcp", "tcp4", "tcp6": var raddr *TCPAddr raddr, err = ResolveTCPAddr(network, address) if err != nil { return nil, err } connection, err = DialTCP(ctx, network, nil, raddr) // case "udp", "udp4", "udp6": // TODO: unsupport now case "unix", "unixgram", "unixpacket": var raddr *UnixAddr raddr, err = ResolveUnixAddr(network, address) if err != nil { return nil, err } connection, err = DialUnix(network, nil, raddr) default: return nil, net.UnknownNetworkError(network) } return connection, err}
// sysDialer contains a Dial's parameters and configuration.type sysDialer struct { net.Dialer network, address string}


// TCPConnection implements Connection.type TCPConnection struct { connection}
// newTCPConnection wraps *TCPConnection.func newTCPConnection(conn Conn) (connection *TCPConnection, err error) { connection = &TCPConnection{} err = connection.init(conn, nil) if err != nil { return nil, err } return connection, nil}
// DialTCP acts like Dial for TCP networks.//// The network must be a TCP network name; see func Dial for details.//// If laddr is nil, a local address is automatically chosen.// If the IP field of raddr is nil or an unspecified IP address, the// local system is assumed.func DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error) { switch network { case "tcp", "tcp4", "tcp6": default: return nil, &net.OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: net.UnknownNetworkError(network)} } if raddr == nil { return nil, &net.OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: nil, Err: errMissingAddress} } if ctx == nil { ctx = context.Background() } sd := &sysDialer{network: network, address: raddr.String()} c, err := sd.dialTCP(ctx, laddr, raddr) if err != nil { return nil, &net.OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: err} } return c, nil}
func (sd *sysDialer) dialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConnection, error) { // 内容实现建立连接的逻辑 conn, err := internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, "dial")
// TCP has a rarely used mechanism called a 'simultaneous connection' in // which Dial("tcp", addr1, addr2) run on the machine at addr1 can // connect to a simultaneous Dial("tcp", addr2, addr1) run on the machine // at addr2, without either machine executing Listen. If laddr == nil, // it means we want the kernel to pick an appropriate originating local // address. Some Linux kernels cycle blindly through a fixed range of // local ports, regardless of destination port. If a kernel happens to // pick local port 50001 as the source for a Dial("tcp", "", "localhost:50001"), // then the Dial will succeed, having simultaneously connected to itself. // This can only happen when we are letting the kernel pick a port (laddr == nil) // and when there is no listener for the destination address. // It's hard to argue this is anything other than a kernel bug. If we // see this happen, rather than expose the buggy effect to users, we // close the conn and try again. If it happens twice more, we relent and // use the result. See also: // https://golang.org/issue/2690 // https://stackoverflow.com/questions/4949858/ // // The opposite can also happen: if we ask the kernel to pick an appropriate // originating local address, sometimes it picks one that is already in use. // So if the error is EADDRNOTAVAIL, we have to try again too, just for // a different reason. // // The kernel socket code is no doubt enjoying watching us squirm. for i := 0; i < 2 && (laddr == nil || laddr.Port == 0) && (selfConnect(conn, err) || spuriousENOTAVAIL(err)); i++ { if err == nil { conn.Close() } conn, err = internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, "dial") }
if err != nil { return nil, err } return newTCPConnection(conn)}


func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (conn *netFD, err error) {  if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && raddr.isWildcard() {    raddr = raddr.toLocal(net)  }  family, ipv6only := favoriteAddrFamily(net, laddr, raddr)  // 创建socket  return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr)}
// socket returns a network file descriptor that is ready for// asynchronous I/O using the network poller.func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (netfd *netFD, err error) { // syscall.Socket & set socket options var fd int // 调用Socket系统调用函数 fd, err = sysSocket(family, sotype, proto) if err != nil { return nil, err } err = setDefaultSockopts(fd, family, sotype, ipv6only) if err != nil { syscall.Close(fd) return nil, err }
netfd = newNetFD(fd, family, sotype, net) // 调用握手方法 // 内部调用connect方法 err = netfd.dial(ctx, laddr, raddr) if err != nil { netfd.Close() return nil, err } return netfd, nil}

// Wrapper around the socket system call that marks the returned file// descriptor as nonblocking and close-on-exec.func sysSocket(family, sotype, proto int) (int, error) { // See ../syscall/exec_unix.go for description of ForkLock. syscall.ForkLock.RLock() // 调用Socket系统调用 s, err := syscall.Socket(family, sotype, proto) if err == nil { syscall.CloseOnExec(s) } syscall.ForkLock.RUnlock() if err != nil { return -1, os.NewSyscallError("socket", err) } // 设置非阻塞 if err = syscall.SetNonblock(s, true); err != nil { syscall.Close(s) return -1, os.NewSyscallError("setnonblock", err) } return s, nil}
func newNetFD(fd, family, sotype int, net string) *netFD { var ret = &netFD{} ret.fd = fd ret.network = net ret.family = family ret.sotype = sotype ret.isStream = sotype == syscall.SOCK_STREAM ret.zeroReadIsEOF = sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW return ret}

// if dial connection error, you need exec netFD.Close activelyfunc (c *netFD) dial(ctx context.Context, laddr, raddr sockaddr) (err error) { var lsa syscall.Sockaddr if laddr != nil { if lsa, err = laddr.sockaddr(c.family); err != nil { return err } else if lsa != nil { // bind local address // 调用bind方法 if err = syscall.Bind(c.fd, lsa); err != nil { return os.NewSyscallError("bind", err) } } } var rsa syscall.Sockaddr // remote address from the user var crsa syscall.Sockaddr // remote address we actually connected to if raddr != nil { if rsa, err = raddr.sockaddr(c.family); err != nil { return err } } // 调用connect()连接方法 // remote address we actually connected to if crsa, err = c.connect(ctx, lsa, rsa); err != nil { return err } c.isConnected = true
// Record the local and remote addresses from the actual socket. // Get the local address by calling Getsockname. // For the remote address, use // 1) the one returned by the connect method, if any; or // 2) the one from Getpeername, if it succeeds; or // 3) the one passed to us as the raddr parameter. lsa, _ = syscall.Getsockname(c.fd) c.localAddr = sockaddrToAddr(lsa) if crsa != nil { c.remoteAddr = sockaddrToAddr(crsa) } else if crsa, _ = syscall.Getpeername(c.fd); crsa != nil { c.remoteAddr = sockaddrToAddr(crsa) } else { c.remoteAddr = sockaddrToAddr(rsa) } return nil}
func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error) { // Do not need to call c.writing here, // because c is not yet accessible to user, // so no concurrent operations are possible. // 调用Connect系统调用 switch err := syscall.Connect(c.fd, ra); err { case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR: case nil, syscall.EISCONN: select { case <-ctx.Done(): return nil, mapErr(ctx.Err()) default: } return nil, nil case syscall.EINVAL: // On Solaris we can see EINVAL if the socket has // already been accepted and closed by the server. // Treat this as a successful connection--writes to // the socket will see EOF. For details and a test // case in C see https://golang.org/issue/6828. if runtime.GOOS == "solaris" { return nil, nil } fallthrough default: return nil, os.NewSyscallError("connect", err) }
// TODO: can't support interrupter now. // Start the "interrupter" goroutine, if this context might be canceled. // (The background context cannot) // // The interrupter goroutine waits for the context to be done and // interrupts the dial (by altering the c's write deadline, which // wakes up waitWrite). if ctx != context.Background() { // Wait for the interrupter goroutine to exit before returning // from connect. done := make(chan struct{}) interruptRes := make(chan error) defer func() { close(done) if ctxErr := <-interruptRes; ctxErr != nil && ret == nil { // The interrupter goroutine called SetWriteDeadline, // but the connect code below had returned from // waitWrite already and did a successful connect (ret // == nil). Because we've now poisoned the connection // by making it unwritable, don't return a successful // dial. This was issue 16523. ret = mapErr(ctxErr) c.Close() // prevent a leak } }() go func() { select { case <-ctx.Done(): // Force the runtime's poller to immediately give up // waiting for writability, unblocking waitWrite // below. c.SetWriteDeadline(aLongTimeAgo) interruptRes <- ctx.Err() case <-done: interruptRes <- nil } }() }
c.pd = newPollDesc(c.fd) for { // Performing multiple connect system calls on a // non-blocking socket under Unix variants does not // necessarily result in earlier errors being // returned. Instead, once runtime-integrated network // poller tells us that the socket is ready, get the // SO_ERROR socket option to see if the connection // succeeded or failed. See issue 7474 for further // details. if err := c.pd.WaitWrite(ctx); err != nil { return nil, err } nerr, err := syscall.GetsockoptInt(c.fd, syscall.SOL_SOCKET, syscall.SO_ERROR) if err != nil { return nil, os.NewSyscallError("getsockopt", err) } switch err := syscall.Errno(nerr); err { case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR: case syscall.EISCONN: return nil, nil case syscall.Errno(0): // The runtime poller can wake us up spuriously; // see issues 14548 and 19289. Check that we are // really connected; if not, wait again. if rsa, err := syscall.Getpeername(c.fd); err == nil { return rsa, nil } default: return nil, os.NewSyscallError("connect", err) } }}


  • 调用sysSocket()方法,该方法内部通过syscall.Socket()系统调用初始化一个socket描述。 

  • 调用netFD的dial()方法,该方法内部主要调用syscall.Connect()方法进行建立链接。 

  • 获得netFD后,最后再通过newTCPConnection()方法初始化connection信息,这部分逻辑也就和server端建立连接后调用init()初始化的过程是相同的。 











Cloud9项目组后台开发工程师,3年后台开发经验,熟悉推荐系统后台工作;对网络、存储、分布式共识算法(raft)等技术比较感兴趣。目前在PCG Cloud9项目组,主要负责后台核心模块研发工作。










