| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
MarylynYvb
7年前发布

性能优化实战:百万级WebSockets和Go语言

   <p>大家好!我的名字叫Sergey Kamardin。我是来自 <strong>Mail.Ru</strong> 的一名工程师。这篇文章将讲述我们是如何用Go语言开发一个高负荷的WebSocket服务。即使你对WebSockets熟悉但对Go语言知之甚少,我还是希望这篇文章里讲到的性能优化的思路和技术对你有所启发。</p>    <h2>1. 介绍</h2>    <p>作为全文的铺垫,我想先讲一下我们为什么要开发这个服务。</p>    <p>Mail.Ru有许多包含状态的系统。用户的电子邮件存储是其中之一。有很多办法来跟踪这些状态的改变。不外乎通过定期的轮询或者系统通知来得到状态的变化。这两种方法都有它们的优缺点。对邮件这个产品来说,让用户尽快收到新的邮件是一个考量指标。邮件的轮询会产生大概每秒5万个HTTP请求,其中60%的请求会返回304状态(表示邮箱没有变化)。因此,为了减少服务器的负荷并加速邮件的接收,我们决定重写一个publisher-subscriber服务(这个服务通常也会称作bus,message broker或者event-channel)。这个服务负责接收状态更新的通知,然后还处理对这些更新的订阅。</p>    <p>重写publisher-subscriber服务之前:</p>    <p><img src="https://simg.open-open.com/show/e97ac54f54c2e90cd425401a17233ca4.png"></p>    <p>现在:</p>    <p><img src="https://simg.open-open.com/show/f4fda6b0d815d995ace439462015622e.png"></p>    <p>上面第一个图为旧的架构。浏览器(Browser)会定期轮询API服务来获得邮件存储服务(Storage)的更新。</p>    <p>第二张图展示的是新的架构。浏览器(Browser)和通知API服务(notificcation API)建立一个WebSocket连接。通知API服务会发送相关的订阅到Bus服务上。当收到新的电子邮件时,存储服务(Storage)向Bus(1)发送一个通知,Bus又将通知发送给相应的订阅者(2)。API服务为收到的通知找到相应的连接,然后把通知推送到用户的浏览器(3)。</p>    <p>我们今天就来讨论一下这个API服务(也可以叫做WebSocket服务)。在开始之前,我想提一下这个在线服务处理将近3百万个连接。</p>    <h2>2. 惯用的做法(The idiomatic way)</h2>    <p>首先,我们看一下不做任何优化会如何用Go来实现这个服务的部分功能。在使用 net/http 实现具体功能前,让我们先讨论下我们将如何发送和接收数据。这些数据是定义在WebSocket协议之上的(例如JSON对象)。我们在下文中会成他们为packet。</p>    <p>我们先来实现 Channel 结构。它包含相应的逻辑来通过WebScoket连接发送和接收packet。</p>    <h3>2.1. Channel结构</h3>    <pre>  <code class="language-go">// Packet represents application level data.  type Packet struct {      ...  }    // Channel wraps user connection.  type Channel struct {      conn net.Conn    // WebSocket connection.      send chan Packet // Outgoing packets queue.  }    func NewChannel(conn net.Conn) *Channel {      c := &Channel{          conn: conn,          send: make(chan Packet, N),      }        go c.reader()      go c.writer()        return c  }</code></pre>    <p>这里我要强调的是读和写这两个goroutines。每个goroutine都需要各自的内存栈。栈的初始大小由操作系统和Go的版本决定,通常在2KB到8KB之间。我们之前提到有3百万个在线连接,如果每个goroutine栈需要4KB的话,所有连接就需要24GB的内存。这还没算上给 Channel 结构,发送packet用的 ch.send 和其它一些内部字段分配的内存空间。</p>    <h3>2.2. I/O goroutines</h3>    <p>接下来看一下“reader”的实现:</p>    <pre>  <code class="language-go">func (c *Channel) reader() {      // We make a buffered read to reduce read syscalls.      buf := bufio.NewReader(c.conn)        for {          pkt, _ := readPacket(buf)          c.handle(pkt)      }  }</code></pre>    <p>这里我们使用了 bufio.Reader 。每次都会在 buf 大小允许的范围内尽量读取多的字节,从而减少 read() 系统调用的次数。在无限循环中,我们期望会接收到新的数据。请记住之前这句话:期望接收到新的数据。我们之后会讨论到这一点。</p>    <p>我们把packet的解析和处理逻辑都忽略掉了,因为它们和我们要讨论的优化不相关。不过 buf 值得我们的关注:它的缺省大小是4KB。这意味着所有连接将消耗掉额外的 <strong>12 GB</strong> 内存。“writer”也是类似的情况:</p>    <pre>  <code class="language-go">func (c *Channel) writer() {      // We make buffered write to reduce write syscalls.      buf := bufio.NewWriter(c.conn)        for pkt := range c.send {          _ := writePacket(buf, pkt)          buf.Flush()      }  }</code></pre>    <p>我们在待发送packet的 c.send channel上循环将packet写到缓存(buffer)里。细心的读者肯定已经发现,这又是额外的4KB内存。3百万个连接会占用 <strong>12GB</strong> 的内存。</p>    <h3>2.3. HTTP</h3>    <p>我们已经有了一个简单的 Channel 实现。现在我们需要一个WebSocket连接。因为还在 <strong>通常做法(Idiomatic Way)</strong> 的标题下,那么就先来看看通常是如何实现的。</p>    <p>注:如果你不知道WebSocket是怎么工作的,那么这里值得一提的是客户端是通过一个叫升级(Upgrade)请求的特殊HTTP机制来建立WebSocket的。在成功处理升级请求以后,服务端和客户端使用TCP连接来交换二进制的WebSocket帧(frames)。 <a href="/misc/goto?guid=4959754525344830257" rel="nofollow,noindex">这里</a> 有关于帧结构的描述。</p>    <pre>  <code class="language-go">import (      "net/http"      "some/websocket"  )    http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {      conn, _ := websocket.Upgrade(r, w)      ch := NewChannel(conn)      //...  })</code></pre>    <p>请注意这里的 http.ResponseWriter 结构包含 bufio.Reader 和 bufio.Writer (各自分别包含4KB的缓存)。它们用于 \*http.Request 初始化和返回结果。</p>    <p>不管是哪个WebSocket,在成功回应一个升级请求之后,服务端在调用 responseWriter.Hijack() 之后会接收到一个I/O缓存和对应的TCP连接。</p>    <p>注:有时候我们可以通过 net/http.putBufio{Reader,Writer} 调用把缓存释放回 net/http 里的 sync.Pool 。</p>    <p>这样,这3百万个连接又需要额外的 <strong>24 GB</strong> 内存。</p>    <p>所以,为了这个什么都不干的程序,我们已经占用了 <strong>72 GB</strong> 的内存!</p>    <h2>3. 优化</h2>    <p>我们来回顾一下前面介绍的用户连接的工作流程。在建立WebSocket之后,客户端会发送请求订阅相关事件(我们这里忽略类似 ping/pong 的请求)。接下来,在整个连接的生命周期里,客户端可能就不会发送任何其它数据了。</p>    <p>连接的生命周期可能会持续几秒钟到几天。</p>    <p>所以在大部分时间里, Channel.reader() 和 Channel.writer() 都在等待接收和发送数据。与它们一起等待的是各自分配的4 KB的I/O缓存。</p>    <p>现在,我们发现有些地方是可以做进一步优化的,对吧?</p>    <h3>3.1. Netpoll</h3>    <p>你还记得 Channel.reader() 的实现使用了 bufio.Reader.Read() 吗? bufio.Reader.Read() 又会调用 conn.Read() 。这个调用会被阻塞以等待接收连接上的新数据。如果连接上有新的数据,Go的运行环境(runtime)就会唤醒相应的goroutine让它去读取下一个packet。之后,goroutine会被再次阻塞来等待新的数据。我们来研究下Go的运行环境是怎么知道goroutine需要被唤醒的。</p>    <p>如果我们看一下 conn.Read() 的实现,就会看到它调用了 net.netFD.Read() :</p>    <pre>  <code class="language-go">// net/fd_unix.go    func (fd *netFD) Read(p []byte) (n int, err error) {      //...      for {          n, err = syscall.Read(fd.sysfd, p)          if err != nil {              n = 0              if err == syscall.EAGAIN {                  if err = fd.pd.waitRead(); err == nil {                      continue                  }              }          }          //...          break      }      //...  }</code></pre>    <p>Go使用了sockets的非阻塞模式。EAGAIN表示socket里没有数据了但不会阻塞在空的socket上,OS会把控制权返回给用户进程。</p>    <p>这里它首先对连接文件描述符进行 read() 系统调用。如果 read() 返回的是 EAGAIN 错误,运行环境就是调用 pollDesc.waitRead() :</p>    <pre>  <code class="language-go">// net/fd_poll_runtime.go    func (pd *pollDesc) waitRead() error {     return pd.wait('r')  }    func (pd *pollDesc) wait(mode int) error {     res := runtime_pollWait(pd.runtimeCtx, mode)     //...  }</code></pre>    <p>如果继续深挖,我们可以看到netpoll的实现在Linux里用的是epoll而在BSD里用的是kqueue。我们的这些连接为什么不采用类似的方式呢?只有在socket上有可读数据时,才分配缓存空间并启用读数据的goroutine。</p>    <p>在github.com/golang/go上,有一个关于开放(exporting)netpoll函数的 <a href="/misc/goto?guid=4959754525431577991" rel="nofollow,noindex">问题</a> 。</p>    <h3>3.2. 干掉goroutines</h3>    <p>假设我们用Go语言实现了netpoll。我们现在可以避免创建 Channel.reader() 的goroutine,取而代之的是从订阅连接里收到新数据的事件。</p>    <pre>  <code class="language-go">ch := NewChannel(conn)    // Make conn to be observed by netpoll instance.  poller.Start(conn, netpoll.EventRead, func() {      // We spawn goroutine here to prevent poller wait loop      // to become locked during receiving packet from ch.      go ch.Receive()  })    // Receive reads a packet from conn and handles it somehow.  func (ch *Channel) Receive() {      buf := bufio.NewReader(ch.conn)      pkt := readPacket(buf)      c.handle(pkt)  }</code></pre>    <p>Channel.writer() 相对容易一点,因为我们只需在发送packet的时候创建goroutine并分配缓存。</p>    <pre>  <code class="language-go">func (ch *Channel) Send(p Packet) {      if c.noWriterYet() {          go ch.writer()      }      ch.send <- p  }</code></pre>    <p>注意,这里我们没有处理 write() 系统调用时返回的 EAGAIN 。我们依赖Go运行环境去处理它。这种情况很少发生。如果需要的话我们还是可以像之前那样来处理。</p>    <p>从 ch.send 读取待发送的packets之后, ch.writer() 会完成它的操作,最后释放goroutine的栈和用于发送的缓存。</p>    <p>很不错!通过避免这两个连续运行的goroutine所占用的I/O缓存和栈内存,我们已经节省了 <strong>48 GB</strong> 。</p>    <h3>3.3. 控制资源</h3>    <p>大量的连接不仅仅会造成大量的内存消耗。在开发服务端的时候,我们还不停地遇到竞争条件(race conditions)和死锁(deadlocks)。随之而来的是所谓的自我分布式阻断攻击(self-DDOS)。在这种情况下,客户端会悍然地尝试重新连接服务端而把情况搞得更加糟糕。</p>    <p>举个例子,如果因为某种原因我们突然无法处理 ping/pong 消息,这些空闲连接就会不断地被关闭(它们会以为这些连接已经无效因此不会收到数据)。然后客户端每N秒就会以为失去了连接并尝试重新建立连接,而不是继续等待服务端发来的消息。</p>    <p>在这种情况下,比较好的办法是让负载过重的服务端停止接受新的连接,这样负载均衡器(例如nginx)就可以把请求转到其它的服务端上去。</p>    <p>撇开服务端的负载不说,如果所有的客户端突然(很可能是因为某个bug)向服务端发送一个packet,我们之前节省的 <strong>48 GB</strong> 内存又将会被消耗掉。因为这时我们又会和开始一样给每个连接创建goroutine并分配缓存。</p>    <p>Goroutine池</p>    <p>可以用一个goroutine池来限制同时处理packets的数目。下面的代码是一个简单的实现:</p>    <pre>  <code class="language-go">package gopool    func New(size int) *Pool {      return &Pool{          work: make(chan func()),          sem:  make(chan struct{}, size),      }  }    func (p *Pool) Schedule(task func()) error {      select {      case p.work <- task:      case p.sem <- struct{}{}:          go p.worker(task)      }  }    func (p *Pool) worker(task func()) {      defer func() { <-p.sem }      for {          task()          task = <-p.work      }  }</code></pre>    <p>我们使用netpoll的代码就变成下面这样:</p>    <pre>  <code class="language-go">pool := gopool.New(128)    poller.Start(conn, netpoll.EventRead, func() {      // We will block poller wait loop when      // all pool workers are busy.      pool.Schedule(func() {          ch.Receive()      })  })</code></pre>    <p>现在我们不仅要等可读的数据出现在socket上才能读packet,还必须等到从池里获取到空闲的goroutine。</p>    <p>同样的,我们修改下 Send() 的代码:</p>    <pre>  <code class="language-go">pool := gopool.New(128)    func (ch *Channel) Send(p Packet) {      if c.noWriterYet() {          pool.Schedule(ch.writer)      }      ch.send <- p  }</code></pre>    <p>这里我们没有调用 go ch.writer() ,而是想重复利用池里goroutine来发送数据。 所以,如果一个池有 N 个goroutines的话,我们可以保证有 N 个请求被同时处理。而 N + 1 个请求不会分配 N + 1 个缓存。goroutine池允许我们限制对新连接的 Accept() 和 Upgrade() ,这样就避免了大部分DDoS的情况。</p>    <h3>3.4. 零拷贝升级(Zero-copy upgrade)</h3>    <p>之前已经提到,客户端通过HTTP升级(Upgrade)请求切换到WebSocket协议。下面显示的是一个升级请求:</p>    <pre>  <code class="language-go">GET /ws HTTP/1.1  Host: mail.ru  Connection: Upgrade  Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==  Sec-Websocket-Version: 13  Upgrade: websocket    HTTP/1.1 101 Switching Protocols  Connection: Upgrade  Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=  Upgrade: websocket</code></pre>    <p>我们接收HTTP请求和它的头部只是为了切换到WebSocket协议,而 http.Request 里保存了所有头部的数据。从这里可以得到启发,如果是为了优化,我们可以放弃使用标准的 net/http 服务并在处理HTTP请求的时候避免无用的内存分配和拷贝。</p>    <p>举个例子, http.Request 包含了一个叫做Header的字段。标准 net/http 服务会将请求里的所有头部数据全部无条件地拷贝到Header字段里。你可以想象这个字段会保存许多冗余的数据,例如一个包含很长cookie的头部。</p>    <p>我们如何来优化呢?</p>    <p>WebSocket实现</p>    <p>不幸的是,在我们优化服务端的时候所有能找到的库只支持对标准 net/http 服务做升级。而且没有一个库允许我们实现上面提到的读和写的优化。为了使这些优化成为可能,我们必须有一套底层的API来操作WebSocket。为了重用缓存,我们需要类似下面这样的协议函数:</p>    <pre>  <code class="language-go">func ReadFrame(io.Reader) (Frame, error)  func WriteFrame(io.Writer, Frame) error</code></pre>    <p>如果我们有一个包含这样API的库,我们就按照下面的方式从连接上读取packets:</p>    <pre>  <code class="language-go">// getReadBuf, putReadBuf are intended to  // reuse *bufio.Reader (with sync.Pool for example).  func getReadBuf(io.Reader) *bufio.Reader  func putReadBuf(*bufio.Reader)    // readPacket must be called when data could be read from conn.  func readPacket(conn io.Reader) error {      buf := getReadBuf()      defer putReadBuf(buf)        buf.Reset(conn)      frame, _ := ReadFrame(buf)      parsePacket(frame.Payload)      //...  }</code></pre>    <p>简而言之,我们需要自己写一个库。</p>    <p>github.com/gobwas/ws</p>    <p>ws 库的主要设计思想是不将协议的操作逻辑暴露给用户。所有读写函数都接受通用的 io.Reader 和 io.Writer 接口。因此它可以随意搭配是否使用缓存以及其它I/O的库。</p>    <p>除了标准库 net/http 里的升级请求, ws 还支持零拷贝升级。它能够处理升级请求并切换到WebSocket模式而不产生任何内存分配或者拷贝。 ws.Upgrade() 接受 io.ReadWriter ( net.Conn 实现了这个接口)。换句话说,我们可以使用标准的 net.Listen() 函数然后把从 ln.Accept() 收到的连接马上交给 ws.Upgrade() 去处理。库也允许拷贝任何请求数据来满足将来应用的需求(举个例子,拷贝 Cookie 来验证一个session)。</p>    <p>下面是处理升级请求的性能测试:标准 net/http 库的实现和使用零拷贝升级的 net.Listen() :</p>    <pre>  <code class="language-go">BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op  BenchmarkUpgradeTCP     973 ns/op     0 B/op       0 allocs/op</code></pre>    <p>使用 ws 以及零拷贝升级为我们节省了24 GB的空间。这些空间原本被用做 net/http 里处理请求的I/O缓存。</p>    <h3>3.5. 回顾</h3>    <p>让我们来回顾一下之前提到过的优化:</p>    <ul>     <li>一个包含缓存的读goroutine会占用很多内存。 <strong>方案:</strong> netpoll(epoll, kqueue);重用缓存。</li>     <li>一个包含缓存的写goroutine会占用很多内存。 <strong>方案:</strong> 在需要的时候创建goroutine;重用缓存。</li>     <li>存在大量连接请求的时候,netpoll不能很好的限制连接数。 <strong>方案:</strong> 重用goroutines并且限制它们的数目。</li>     <li>net/http 对升级到WebSocket请求的处理不是最高效的。 <strong>方案:</strong> 在TCP连接上实现零拷贝升级。</li>    </ul>    <p>下面是服务端的大致实现代码:</p>    <pre>  <code class="language-go">import (      "net"      "github.com/gobwas/ws"  )    ln, _ := net.Listen("tcp", ":8080")    for {      // Try to accept incoming connection inside free pool worker.      // If there no free workers for 1ms, do not accept anything and try later.      // This will help us to prevent many self-ddos or out of resource limit cases.      err := pool.ScheduleTimeout(time.Millisecond, func() {          conn := ln.Accept()          _ = ws.Upgrade(conn)            // Wrap WebSocket connection with our Channel struct.          // This will help us to handle/send our app's packets.          ch := NewChannel(conn)            // Wait for incoming bytes from connection.          poller.Start(conn, netpoll.EventRead, func() {              // Do not cross the resource limits.              pool.Schedule(func() {                  // Read and handle incoming packet(s).                  ch.Recevie()              })          })      })      if err != nil {          time.Sleep(time.Millisecond)      }  }</code></pre>    <h2>4. 结论</h2>    <p>在程序设计时,过早优化是万恶之源。Donald Knuth</p>    <p>上面的优化是有意义的,但不是所有情况都适用。举个例子,如果空闲资源(内存,CPU)与在线连接数之间的比例很高的话,优化就没有太多意义。当然,知道什么地方可以优化以及如何优化总是有帮助的。</p>    <p>谢谢你的关注!</p>    <h2>5. 引用</h2>    <ul>     <li><a href="/misc/goto?guid=4959754525521593073">https://github.com/mailru/easygo</a></li>     <li><a href="/misc/goto?guid=4959754525599538212">https://github.com/gobwas/ws</a></li>     <li><a href="/misc/goto?guid=4959754525681113558">https://github.com/gobwas/ws-...</a></li>     <li><a href="/misc/goto?guid=4959754525751651777">https://github.com/gobwas/htt...</a></li>     <li> </li>    </ul>    <p>来自:https://segmentfault.com/a/1190000011162605</p>    <p> </p>    
 本文由用户 MarylynYvb 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
 转载本站原创文章,请注明出处,并保留原始链接、图片水印。
 本站是一个以用户分享为主的开源技术平台,欢迎各类分享!
 本文地址:https://www.open-open.com/lib/view/open1505370872096.html
WebSocket Goroutine 性能优化 Google Go/Golang开发