在 golang 中是如何对 epoll 进行封装的?

2022年3月20日 306点热度 0人点赞 0条评论

大家好,我是飞哥!

在协程没有流行以前,传统的网络编程中,同步阻塞是性能低下的代名词,一次切换就得是 3 us 左右的 CPU 开销。各种基于 epoll 的异步非阻塞的模型虽然提高了性能,但是基于回调函数的编程方式却非常不符合人的的直线思维模式。开发出来的代码的也不那么容易被人理解。

Golang 的出现,可以说是将协程编程模式推向了一个高潮。这种新的编程方式既兼顾了同步编程方式的简单易用,也在底层通过协程和 epoll 的配合避免了线程切换的性能高损耗。换句话说就是既简单易用,性能又还不挺错。

飞哥当年也是相中的 golang 的这个特点,开始带领团队转型 golang 开发的。那么今天我们来深刻地和大家分享一下 golang 官方提供的 net 包,来看看它是如何达成上面所说的这样的效果的。

一、Golang net的使用方式

考虑到不少读者没有使用过 golang,那么开头我先把一个基于官方 net 包的 golang 服务的简单使用代码给大家列出来。为了方便大家理解,我只保留骨干代码。

func main() {
 //构造一个listener
 listener, _ := net.Listen("tcp""127.0.0.1:9008")
 for {
  //接收请求
  conn, err := listener.Accept()
  //启动一个协程来处理
  go process(conn)
 }
}

func process(conn net.Conn) {
 //结束时关闭连接
 defer conn.Close()
 //读取连接上的数据
 var buf [1024]byte
 len, err := conn.Read(buf[:])
 //发送数据
 _, err = conn.Write([]byte("I am server!"))
 ...
}

在这个示例服务程序中,先是使用 net.Listen 来监听了本地的 9008 这个端口。然后调用 Accept 进行接收连接处理。如果接收到了连接请求,通过go process 来启动一个协程进行处理。在连接的处理中我展示了读写操作(Read 和 Write)。

整个服务程序看起来,妥妥的就是一个同步模型,包括 Accept、Read 和 Write 都会将当前协程给“阻塞”掉。比如 Read 函数这里,如果服务器调用时客户端数据还没有到达,那么 Read 是不带返回的,会将当前的协程 park 住。直到有了数据 Read 才会返回,处理协程继续执行。

你如果在其它语言,例如 C 和 Java 中写出这样类似的服务器代码,估计会被打死的。因为每一次同步的 Accept、Read、Write 都会导致你当前的线程被阻塞掉,会浪费大量的 CPU 进行线程上下文的切换。

但是在 golang 中这样的代码运行性能却是非常的不错,为啥呢?我们继续看本文接下来的内容。

二、Listen 底层过程

在传统的 C、Java 等传统语言中,listen 所做的事情就是直接调用内核的 listen 系统调用。参见《为什么服务端程序都需要先 listen 一下?》。但是如果你也这么同等地理解 golang net 包里的 Listen, 那可就大错特错了。

和其它语言不同,在 golang net 的 listen 中,会完成如下几件事:

  • 创建 socket 并设置非阻塞,
  • bind 绑定并监听本地的一个端口
  • 调用 listen 开始监听
  • epoll_create 创建一个 epoll 对象
  • epoll_etl 将 listen 的 socket 添加到 epoll 中等待连接到来

一次 Golang 的 Listen 调用,相当于在 C 语言中的 socket、bind、listen、epoll_create、epoll_etl 等多次函数调用的效果。封装度非常的高,更大程度地对程序员屏蔽了底层的实现细节。

插一句题外话:现在的各种开发工具的封装程度越来越高,真不知道对码农来说是好事还是坏事。好处是开发效率更高了,缺点是将来的程序员想了解底层也越来越难了,越来越像传统企业里流水线上的工人。

口说无凭,我们挖开 Golang 的内部源码瞅一瞅,这样更真实。

图片

Listen 的入口在 golang 源码的 net/dial.go 文件中,让我们展开来看更细节的逻辑。

2.1 Listen 入口执行流程

源码不用细看,看懂大概流程就可以。

//file:go1.14.4/src/net/dial.go
func Listen(network, address string) (Listener, error) {
 var lc ListenConfig
 return lc.Listen(context.Background(), network, address)
}

可见,这个 Listen 只是一个入口。接下来会进入到 ListenConfig 下的 Listen 方法中。在 ListenConfig 的 Listen 中判断这是一个 TCP 类型的话,会进入到 sysListener 下的 listenTCP 方法里(src/net/tcpsock_posix.go)。然后再经过两三次的函数调用跳转,会进入到 net/sock_posix.go 文件下的 socket 函数中。我们直接看它。

//file:go1.14.4/src/net/sock_posix.go
func socket(ctx context.Context, net string, family, ...) (fd *netFD, err error) {
 //创建 socket,见 2.2 小节 
 s, err := sysSocket(family, sotype, proto)

 ...

 //TCP 绑定和监听,见 2.3 小节
 //epoll对象的创建以及文件描述符的添加 见 2.4 小节
 if laddr != nil && raddr == nil {
  switch sotype {
  case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
   fd.listenStream(laddr, listenerBacklog(), ctrlFn);
  ......
 }
}

接下来我们分别在 2.2 和 2.3 小节来介绍 sysSocket 和 listenStream 这两个函数。

2.2 创建 socket

sysSocket 这个函数和其它语言中的 socket 函数有很大的不同。在这个一个函数内就完成了三件事,创建 socket、bind 和 listen 监听。我们来看 sysSocket的具体代码。

//file:net/sys_cloexec.go
func sysSocket(family, sotype, proto int) (int, error) {
 //创建 socket
 s, err := socketFunc(family, sotype, proto)

 //设置为非阻塞模式
 syscall.SetNonblock(s, true)
}

在 sysSocket 中,调用的 socketFunc 其实就是 socket 系统调用。见如下代码。

//file:net/hook_unix.go
var (
 // Placeholders for socket system calls.
 socketFunc        func(intintint) (int, error)  = syscall.Socket
 connectFunc       func(int, syscall.Sockaddr) error = syscall.Connect
 listenFunc        func(intint) error              = syscall.Listen
 getsockoptIntFunc func(intintint) (int, error)  = syscall.GetsockoptInt
)

创建完 socket 之后,再调用 syscall.SetNonblock 将其设置为非阻塞模式。

//file:syscall/exec_unix.go
func SetNonblock(fd int, nonblocking bool) (err error) {
 ...
 if nonblocking {
  flag |= O_NONBLOCK
 } 
 fcntl(fd, F_SETFL, flag)
}

2.3 绑定和监听

我们接着再来看 listenStream。这个函数一进来就调用了系统调用 bind 和 listen 来完成了绑定和监听。

//file:net/sock_posix.go
func (fd *netFD) listenStream(laddr sockaddr,...) error 
{
 ...

 //等同于 c 语言中的:bind(listenfd, ...)
 syscall.Bind(fd.pfd.Sysfd, lsa);

 //等同于 c 语言中的:listen(listenfd, ...)
 listenFunc(fd.pfd.Sysfd, backlog);

 //这里非常关键:初始化socket与异步IO相关的内容
 if err = fd.init(); err != nil {
  return err
 }
}

其中 listenFunc 是一个宏,指向的就是 syscall.Listen 系统调用

//file:go1.14.4/src/net/hook_unix.go
import "syscall"
var (
 // Placeholders for socket system calls.
 socketFunc        func(intintint) (int, error)  = syscall.Socket
 connectFunc       func(int, syscall.Sockaddr) error = syscall.Connect
 listenFunc        func(intint) error              = syscall.Listen
 getsockoptIntFunc func(intintint) (int, error)  = syscall.GetsockoptInt
)

2.4 epoll创建和初始化

接下来在 fd.init 这一行,经过多次的函数调用展开以后会执行到 epoll 对象的创建,并还把在 listen 状态的 socket 句柄添加到了 epoll 对象中来管理其网络事件。

我们来看它是如何完成的。

//file:go1.14.4/src/internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {

 serverInit.Do(runtime_pollServerInit)
 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
 ...
 return nil
}

serverInit.Do 这个是用来保证参数内的函数只执行一次的。不过多展开介绍。其参数 runtime_pollServerInit 是对 runtime 包的函数 poll_runtime_pollServerInit 的调用,其源码位于 runtime/netpoll.go 下。

//file:runtime/netpoll.go
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
 netpollGenericInit()
}

该函数会执行到 netpollGenericInit, epoll 就是在它的内部创建的。

//file:netpoll_epoll.go
func netpollinit() {
 // epoll 对象的创建
 epfd = epollcreate1(_EPOLL_CLOEXEC)
 ...
}

再来看 runtime_pollOpen。它的参数就是前面 listen 好了的 socket 的文件描述符。在这个函数里,它将被放到 epoll 对象中。

//file:runtime/netpoll_epoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
 ...
 errno = netpollopen(fd, pd)
 return pd, int(errno)
}

//file:runtime/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 {
 var ev epollevent
 ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd

 // listen 状态的 socket 被添加到了 epoll 中。
 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

三、Accept 过程

服务端在 Listen 完了之后,就是对 Accept 的调用了。该函数主要做了三件事

  • 调用 accept 系统调用接收一个连接
  • 如果没有连接到达,把当前协程阻塞掉
  • 新连接到来的话,将其添加到 epoll 中管理,然后返回

图片

通过 Golang 里的单步调试可以看到它进入到了 TCPListener 下的 Accept 里了。

//file: net/tcpsock.go
func (l *TCPListener) Accept() (Conn, error) {
 c, err := l.accept()
 ...
}
func (ln *TCPListener) accept() (*TCPConn, error) {
 //以 netFD 的形式返回一个新连接
 fd, err := ln.fd.accept()
}

我们上面说的三步都是在 netFD 的 accept 函数里处理的。

//file:net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
 //3.1 接收一个连接
 //3.2 如果连接没有到达阻塞当前协程
 d, rsa, errcall, err := fd.pfd.Accept()

 //3.2 将新到的连接也添加到 epoll 中进行管理
 netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
 netfd.init();

 ...
 return netfd, nil
}

接下来我们详细看每一步的细节。

3.1 接收一个连接

经过单步跟踪后发现 Accept 进入到了 FD 对象的 Accept 方法下。在这里将调用操作系统的 accept 系统调用。

//file:internal/poll/fd_unix.go
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {

 for {
  //调用 accept 系统调用接收一个连接
  s, rsa, errcall, err := accept(fd.Sysfd)

  //接收到了连接就返回它
  if err == nil {
   return s, rsa, "", err
  }

  switch err {
  case syscall.EAGAIN:
   //如果没有获取到,那就把协程给阻塞起来
   if fd.pd.pollable() {
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  ... 
 }
 ...
}

其中 accept 方法内部会触发 linux 操作系统的 accept 系统调用,我们就不过度展开了。调用 accept 目的是获取一个来自客户端的连接。如果接收到了,就把他返回回去。

3.2 阻塞当前协程

我们来说说如果没 accept 调用的时候,客户端的连接请求还一个都没有过来怎么办。

这时候,accept 系统调用会返回 syscall.EAGAIN。Golang 在对这个状态的处理中,会把当前协程给阻塞起来。关键代码在这里

//file: internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitRead(isFile bool) error {
 return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
 if pd.runtimeCtx == 0 {
  return errors.New("waiting for unsupported file type")
 }
 res := runtime_pollWait(pd.runtimeCtx, mode)
 return convertErr(res, isFile)
}

runtime_pollWait 的源码在 runtime/netpoll.go 下。gopark(协程的阻塞)就是在这里完成的。

//file:runtime/netpoll.go
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    ...
    for !netpollblock(pd, int32(mode), false) {
    }
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    ...
    if waitio || netpollcheckerr(pd, mode) == 0 {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
}

gopark 这个函数就是 golang 内部阻塞协程的入口。

3.3 将新连接添加到 epoll 中。

我们再来说说假如客户端连接已经到来了的情况。这时 fd.pfd.Accept 会返回新建的连接。然后会将该新连接也一并加入到 epoll 中进行高效的事件管理。

//file:net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
 //3.1 接收一个连接
 //3.2 如果连接没有到达阻塞当前协程
 d, rsa, errcall, err := fd.pfd.Accept()

 //3.2 将新到的连接也添加到 epoll 中进行管理
 netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
 netfd.init();

 ...
 return netfd, nil
}

我们来看 netfd.init

//file:internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
 ...
 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
 ...
}

runtime_pollOpen 这个runtime 函数我们在上面的 2.4 节介绍过了,就是把文件句柄添加到 epoll 对象中。

//file:runtime/netpoll_epoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
 ...
 errno = netpollopen(fd, pd)
 return pd, int(errno)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
 var ev epollevent
 ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd

 //新连接的 socket 也被添加到了 epoll 中。
 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

四、Read 和 Write 内部过程

当连接接收完成后,剩下的就是在连接上的读写了。

4.1 Read 内部过程

我们先来看 Read 大体过程。

图片

来看详细的代码。

//file:/Users/zhangyanfei/sdk/go1.14.4/src/net/net.go
func (c *conn) Read(b []byte) (int, error) {
 ...
 n, err := c.fd.Read(b)
}

Read 函数会进入到 FD 的 Read 中。在这个函数内部调用 Read 系统调用来读取数据。如果数据还尚未到达则也是把自己阻塞起来。

//file:internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {
 for {
  //调用 Read 系统调用
  n, err := syscall.Read(fd.Sysfd, p)
  if err != nil {
   n = 0

   //将自己添加到 epoll 中等待事件,然后阻塞掉。
   if err == syscall.EAGAIN && fd.pd.pollable() {
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  ...... 
 }  
}

其中 waitRead 是如何将当前协程阻塞掉的,这个和我们前面 3.2 节介绍的是一样的,就不过多展开叙述了。

4.2 Write 内部过程

Write 的大体过程和 Read 是类似的。先是调用 Write 系统调用发送数据,如果内核发送缓存区不足的时候,就把自己先阻塞起来,然后等可写时间发生的时候再继续发送。其源码入口位于 net/net.go。

//file:net/net.go
func (c *conn) Write(b []byte) (int, error) {
 ...
 n, err := c.fd.Write(b)
}
//file:internal/poll/fd_unix.go
func (fd *FD) Write(p []byte) (int, error) {
 for {
  n, err := syscall.Write(fd.Sysfd, p[nn:max])
  if err == syscall.EAGAIN && fd.pd.pollable() {
   if err = fd.pd.waitWrite(fd.isFile); err == nil {
    continue
   }
  }
 }
}
//file:internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitWrite(isFile bool) error {
 return pd.wait('w', isFile)
}

pd.wait 之后的事情就又和 3.2 节介绍的过程一样了。调用 runtime_pollWait 来讲当前协程阻塞掉。

func (pd *pollDesc) wait(mode int, isFile bool) error {
 ...
 res := runtime_pollWait(pd.runtimeCtx, mode)
}

五、Golang 唤醒

前面我们讨论的很多步骤里都涉及到协程的阻塞。例如 Accept 时如果新连接还尚未到达。再比如像 Read 数据的时候对方还没有发送,当前协程都不会占着 cpu 不放,而是会阻塞起来。

那么当要等待的事件就绪的时候,被阻塞掉的协程又是如何被重新调度的呢?相信大家一定会好奇这个问题。

Go 语言的运行时会在调度或者系统监控中调用 sysmon,它会调用 netpoll,来不断地调用 epoll_wait 来查看 epoll 对象所管理的文件描述符中哪一个有事件就绪需要被处理了。如果有,就唤醒对应的协程来进行执行。

其实除此之外还有几个地方会唤醒协程,如

  • startTheWorldWithSema
  • findrunnable  在 schedule 中调用
    有top 和 stop 之分。其中 stop 中会导致阻塞。
  • pollWork

不过为了简便起见,我们只选择 sysmon 来作为一个切入口。sysmon 是一个周期性的监控协程,来看源码。

//file:src/runtime/proc.go
func sysmon() {
 ...
 list := netpoll(0
}

它会不断触发对 netpoll 的调用,在 netpoll 会调用 epollwait 看查看是否有网络事件发生。

//file:runtime/netpoll_epoll.go
func netpoll(delay int64) gList {
 ...
retry:
 n := epollwait(epfd, &events[0], int32(len(events)), waitms)
 if n < 0 {
  //没有网络事件
  goto retry
 }

 for i := int32(0); i < n; i++ {

  //查看是读事件还是写事件发生
  var mode int32
  if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
   mode += 'r'
  }
  if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
   mode += 'w'
  }

  if mode != 0 {

   pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
   pd.everr = false
   if ev.events == _EPOLLERR {
    pd.everr = true
   }
   netpollready(&toRun, pd, mode)
  }
 }
}

在 epoll 返回的时候,ev.data 中是就绪的网络 socket 的文件描述符。根据网络就绪 fd 拿到 pollDesc。在 netpollready 中,将对应的协程推入可运行队列等待调度执行。

//file:runtime/netpoll.go
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
 var rg, wg *g
 if mode == 'r' || mode == 'r'+'w' {
  rg = netpollunblock(pd, 'r'true)
 }
 if mode == 'w' || mode == 'r'+'w' {
  wg = netpollunblock(pd, 'w'true)
 }
 if rg != nil {
  toRun.push(rg)
 }
 if wg != nil {
  toRun.push(wg)
 }
}

本文总结

同步编码方式的优点是符合人的直线思维。在这种模式下的代码很容易写,写出来也容易理解,但是缺点就是性能奇差。因为会导致频繁的线程上下文切换。

所以现在 epoll 是 Linux 下网络程序工作的最主要的模式。现在各种语言下的流行的网络框架模型都是基于 epoll 来工作的。区别就是各自对 epoll 的使用方式上存在一些差别。主流各种基于 epoll 的异步非阻塞的模型虽然提高了性能,但是基于回调函数的编程方式却非常不符合人的的直线思维模式。开发出来的代码的也不那么容易被人理解。

Golang开辟了一种新的网络编程模型。这种模型在应用层看来仍然是同步的方式。但是在底层确实通过协程和 epoll 的配合避免了线程切换的性能高损耗,因此并不会阻塞用户线程。代替的是切换开销更小的协程。协程的切换开销大约只有线程切换的三十分之一,参见《协程究竟比线程牛在什么地方?》

我个人一直觉得,Golang 封装的网络编程模型非常之精妙,是世界级的代码。它非常值得你好好学习一下。学完了觉得好的话,转发给你的朋友们一起来了解了解吧!

图片

往期相关文章

5500在 golang 中是如何对 epoll 进行封装的?

root

这个人很懒,什么都没留下

文章评论