使用Go语言解析二进制数据踩坑总结

2023-01-14 ⏳5.5分钟(2.2千字) g

最近用 Go 语言重写了推荐系统的打分服务。其中的模型数据体积很大,有 5G 字节左右,而且是文本文件。我想着先统一转成二进制格式,加载速度会提高不少,而且解析逻辑也不会太复杂。于是力排众议决定用二进制格式。结果上线后就出现非常诡异的问题。宏观表现就是打分结果偏大。经过一番调试发现,原来是自认为比较简单的解析部分有逻辑问题。今天就把踩坑的经验分享给大家😂

先简单的介绍一下背景。原始模型数据是纯文本模式。每行一条数据,有两列,用制表符分割。第一列类型为uint64,表示特征值对应的哈希值;第二列类型为float64,表示偏好分数。所谓的的打分就是根据输出的数据计算一系列的特征值哈希,然后从模型查找对应的分数,最后再按一定的算法计算一个总得分。

解析文本其实比较方便,核心代码如下:

f, err := os.Open("path")
s := bufio.NewScanner(f)
for s.Scan() {
  l := s.Text() // 每一行的数据
  ps := strings.Split(l, "\t")

  // 解析哈希值
  h, err := strconv.ParseUint(ps[0], 10, 64)
  // 解析偏好得分
  b, err := strconv.ParseUint(ps[1], 10, 64)
  f := math.Float64frombits(b)
}

但是这种处理方式性能比较差。一方面,程序需要按行读取数据。因为每行数据的长度不确定,所以需要先设置一缓冲区,然后尝试读取入内容。如果读取的内容中包含换行符,则返回给业务代码处理。极端情况是缓冲区已经占满,但还是没有读到换行符,此时需要重新分配一个更大的缓冲区,把之前的数据复制过来,然后继续读取,直到有换行为止。另一方面,读到文本内容之后,还要使用ParseUint函数将字符表示的数字转换成对应的整数数据。这一步需要执行多次乘法和加法运算,性能也有损耗。

由以上分析可知,用文本方式处理数据无论是在内存效率还是处理器效率上都有不同程度的损耗。如果换成二进制格式则可以规避掉上述问题。

所以我提议用二进制格式来保存模型数据。格式也很简单,每组数据十六个字节,分成两组。前八个字节表示一个uint64整数,字节顺序为小端,后八个字节表示float64 浮点数。这样的格式不但解析起来比较简单,而且模型数据体积几乎可以减半。

另外,我在开发模型文本转二进制工具的过程中发现,如果只用一个协程同时执行加载和解析逻辑,整个过程耗时会比较长。如果将加载和解析放到不同的协程,效率会提升不少。所以我在设计加载逻辑的时候也自然分成了两个协程,这也为后面的问题埋下了伏笔。

有了以上背景知识,我们就可以研究最初有 BUG 的代码了。

func (lr *LR) Load(r io.Reader, bufSize, dataSize int, version string) error {
  lr.scores = make(map[uint64]float64, dataSize/16)

  aerr := atomic.Value{}
  ch := make(chan []byte, 1024)

  // 读取数据使用独立协程
  go func() {
    defer func() { ch <- nil }()
    for {
      buf := make([]byte, bufSize)
      n, err := r.Read(buf)
      if err == io.EOF {
        return
      } else if err != nil {
        aerr.Store(err)
        return
      }
      ch <- buf[:n]
    }
  }()

  // 解析逻辑
  for buf := range ch {
    if buf == nil {
      if err, ok := aerr.Load().(error); ok {
        return err
      }
      break
    }
    // 每十六个字节一组
    for i := 0; i+16 <= len(buf); i += 16 {
      f := binary.LittleEndian.Uint64(buf[i : i+8])

      bits := binary.LittleEndian.Uint64(buf[i+8 : i+16])
      s := math.Float64frombits(bits)

      lr.scores[f] = s
    }
  }
  // ...
}

以上代码初看有点不容易理解。我先启动一独立协程不断从io.Reader中读取数据。每读一批就发送到ch这个通道中。当前协程不断从ch读取来自加载协程的数据。每读到一批就解析一批。为了使用两个协程正确同步,我假设加载协程结束后一定会向ch发送nil 值表示结束。解析协程收到nil后需要检测aerr这个原子变量,看有没有报错。这里 ch的类型为chan []byte,为的是每次通信可以发一批数据,避免协程间反复切换。

剩下的工作就是分析加载逻辑和解析逻辑了。但在分析之前,我先描述一下问题的症状。系统上线后发现打分不及预期,于是我输出了部分调试信息,包括:读取协程读取的数据总长度和解析协程解析结果的总数。

结果立马发现第一个问题:加载协程读到的数据总长度比模型文件的长度要小!

这是为什么呢?我们再看一遍读取过程:

for {
  buf := make([]byte, bufSize)
  n, err := r.Read(buf)
  if err == io.EOF {
    return
  } else if err != nil {
    aerr.Store(err)
    return
  }
  ch <- buf[:n]
}

当err的值为io.EOF时直接返回了。这是最可能出问题的地方。仔细阅读函数文档发现:

Callers should always process the n > 0 bytes returned before considering the error err. Doing so correctly handles I/O errors that happen after reading some bytes and also both of the allowed EOF behaviors.

所以程序必须先处理n > 0的情况,然后再处理err != nil的情况!这跟我之前的印象不相符。因为一般来说,当err非空时,前面的返回数据一般都是空值。但io.Reader 并不是这样。

那为什么io.Reader如此与众不同呢?考虑一种极端情况,缓冲区的长度大于文件的长度。比如缓冲区长度为1023,但文件的长度为128。那几乎可以肯定,一次读取操作就可以将全部文件内容加载到内存。此时n肯定大于零,而且err肯定是err.EOF。

所以说上述代码需要改为:

for {
  buf := make([]byte, bufSize)
  n, err := r.Read(buf)
  if n > 0 {
    ch <- buf[:n]
  }
  if err == io.EOF {
    return
  } else if err != nil {
    aerr.Store(err)
    return
  }
}

到此,文件读取内容长度不匹配的问题就解决了。但是,线上问题并没有就此消失。而且更加诡异了。因为,虽然我们加载了完整的模型数据,但是解析出的模型数据条数比实际的数据要少,而且这个数量有点随机变化的意思😂这又是什么情况呢?

我们继续研究解析代码:

for buf := range ch {
  // ...
  for i := 0; i+16 <= len(buf); i += 16 {
    f := binary.LittleEndian.Uint64(buf[i : i+8])

    bits := binary.LittleEndian.Uint64(buf[i+8 : i+16])
    s := math.Float64frombits(bits)

    lr.scores[f] = s
  }
  // ...
}

这里的循环是每次处理十六个字节。如果数据长度不能被十六整除的话,最后的尾巴就会被丢弃。我最早考虑模型数据如果没问题,那么长度一定是十六的倍数。如果不能整除,丢弃掉也问题不大。

结果是从加载协程里传来的数据是通过Read函数返回的。该函数返回的内容长度受各方面的影响,跟我们指定的缓冲区长度没有任何关系。也就是说,解析协程收到的内容长度可能是各式各样的,有的能整除,有的不能整除。我的程序不但把最后的尾巴丢弃了,因此还导致数据顺序错位,一但丢弃,后续所有的数据解析都会出错。这就是线上报错的根本原因。

为此,我们声明了一个特殊的变量把尾巴保存下来,待下次循环合并处理。

var tail []byte
for buf := range ch {
  if buf == nil { /* ... */ }

  if len(tail) > 0 {
    buf = append(tail, buf...)
  }

  var i int
  for i = 0; i+16 <= len(buf); i += 16 {
    f := binary.LittleEndian.Uint64(buf[i : i+8])

    bits := binary.LittleEndian.Uint64(buf[i+8 : i+16])
    s := math.Float64frombits(bits)

    lr.scores[f] = s
  }

  if i+16 > len(buf) {
    tail = buf[i:]
  } else {
    tail = nil
  }
}

我们在循环中是十六个字节一跳。如果数据长度能被十六整除,则最后i的值一定为len(buf)。否则,i一定满足i < len(buf) < i+16。基于此,我们用tail变量保存尾巴数据。在下次循环中将两者合并后再处理。这样就得到了正确的结果🎉

回顾整个过程,所以出现问题,完全是因为我对io.Reader的行为不熟悉导致的。这里面有太多的想当然。但另外一个问题是,我的单元测试也没能发现这个问题。为什么呢?因为没有构建对应的测试用例!

我在测试用例中是这样准备数据的:

var buf []byte
var i uint64
for i = 1; i <= 64; i++ {
  s := math.Float64bits(float64(i))
  buf = binary.LittleEndian.AppendUint64(buf, i)
  buf = binary.LittleEndian.AppendUint64(buf, s)
}

buf = binary.LittleEndian.AppendUint64(buf, i)
s := math.Float32bits(float32(1.0))
buf = binary.LittleEndian.AppendUint32(buf, s)

r := bytes.NewReader(buf)

这完全不可能模拟Read函数返回数量不能被十六整除的情况。为此,我修改成如下方式:

var buf []byte
var i uint64
// 一共 64 组数据,1024 字节
for i = 1; i <= 64; i++ {
  s := math.Float64bits(float64(i))
  buf = binary.LittleEndian.AppendUint64(buf, i)
  buf = binary.LittleEndian.AppendUint64(buf, s)
}

rs := []io.Reader{}
// 模拟每次读取的内容长度不为 16 的倍数的情况。
// 1024 字节分解成 35 批,前面的每次 30 字节, 最后一次 4 字节。
// 读取缓冲区设为 32 字节,这样每次只能读到 30 字节,只包含
// 一组数据。剩余数据需要暂存起来,跟下一批数据合并后再处理。
for i = 0; i < 34; i++ {
  rs = append(rs, bytes.NewReader(buf[i*30:i*30+30]))
}
rs = append(rs, bytes.NewReader(buf[1020:]))

在测试的时候,我把读取缓冲区设为32字节,但每次最多读到30字节。所以程序必须处理有尾巴的情况。改完后测试用例一次性通过,我知道线上问题解决了。发版上线,果然!