Go语言实现流量计费

2024-06-10 ⏳4.5分钟(1.8千字)

最近相关部门发出通知📢要关闭中国大陆的 Docker Hub 镜像加速服务。一时间哀嚎遍野。 Docker 支持 Secure HTTP Proxy 协议,而我也一直使用该协议。于是就想着能不能共享自己的代理服务。其中最大的障碍就是流量计费。现在互联网肯定是不能用爱发电,免费就一定会被滥用。所以需要实现一套能用的流量计费功能。在 ChatGTD1 的帮助下,花了半天的时间实现了计费功能,今天分享出来供大家参考。

我的 Docker Hub 加速服务也已上线,欢迎使用🤗 具体可以参考我的这篇文章

在说计费之前,我们需要先了解 HTTP Proxy 的实现原理2。下面是一段实现代码:

func ServeHTTP(w http.ResponseWriter, req *http.Request) {
    // 建立上游 TCP 连接
    target := req.RequestURI
    upConn, err := net.DialTimeout("tcp", target, 5*time.Second)
    defer upConn.Close()

    // 获得下游 TCP 连接
    downConn, _, err = w.(http.Hijacker).Hijack()
    downConn.Write([]byte("HTTP/1.1 200 OK\r\n\r\n"))
    defer downConn.Close()

    // 代理上下游通信流量
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        io.Copy(u, downConn)
    }()
    go func() {
        defer wg.Done()
        io.Copy(d, upConn)
    }()
    wg.Wait()
}

HTTP 的核心是客户端会发起 CONNECT 请求。假如客户端想连接 example.com 主机的 443 端口,它需要向代理服务器发送如下 HTTP 请求:

CONNECT example.com:443 HTTP/1.1
Host: proxy.host

服务端收到请求并完成鉴权后,就根据 CONNECT 指定的地址,也就是 example.com:443 尝试建立 TCP 连接。如果建立成功,服务端需要给客户端返回如下响应信息:

HTTP/1.1 200 OK

接下来双方就使用该 HTTP 请求的 TCP 连接收发数据,服务器为双方做中转。

Go 语言的 HTTP 标准库提供了 Hijack 函数,可以提取当前 HTTP 请求底层的 TCP 连接。一旦调用 Hijack 函数,Go 语言的 HTTP 框架就不再管理该 TCP 连接的状态,需要由调用方自行管理。

这样服务器就拿到了上游和下游两个 TCP 连接。最后服务端启动两个协程,分别调用标准库里的 io.Copy 函数实现数据转发。

以客户端接收数据为例,服务端先从上游的 upConn 对象读取数据,然后再写入到下游的 downConn 对象。

io.Copy(downConn, upConn)

io.Copy 的第一个参数为写入对象,类型为 io.Writer。所有数据都会调用该对象的 Write 函数。显然 downConn 实现了 io.Writer 接口。

要想实现计费,最简的办法是采用修饰器模式。我们设计一个新的 io.Writer 对象,在其 Write 函数里调用原来 downConn 的 Write 函数来发送数据,但同时记录每次送的数据量。

最简单的记数器可以设计成如下样子:

type bytesCounter struct {
    w io.Writer
    c int
}
func (bc *bytesCounter) Write(p []byte) (n int, err error) {
    bc.c += len(p)
    return bc.w.Write(p)
}

我还还需要修改前面调用 io.Copy 的地方:

// io.Copy(downConn, upConn)
dst := &bytesCounter{w: downConn}
io.Copy(dst, upConn)

这样每次给下游发送数据,流量都会记录到 bytesCounter 的 c 字段。

光记录了没有用,还需要将流量数据写到数据库,实现计费功能。显然不可能每次调用 Write 函数都写一次 DB,性能开销太大。我们需要另起一个协程,定时保存。

这样一来,就会有两个协程同时读写 bytesCounter 的 c 字段,需要加锁🔒实现同步。除了加锁,还可以使用原子变量,操作起来更方便。Go语言标准库提供了 sync/atomic 原子操作库,可以直接使用。上例需要做如下改动:

type bytesCounter struct {
    w io.Writer
    c atomic.Int64
}
func (bc *bytesCounter) Write(p []byte) (n int, err error) {
    // bc.c += len(p)
    bc.c.Add(int64(len(p)))
    return bc.w.Write(p)
}

c 的类型从 int 改成了 atomic.Int64。因为是原子变量,自然也就不能用普通的算术操作符来修改内容,需要调用它自己专用的 Add 函数来累加流量计数。

现在我们实现定时保存功能。显然,我们需要一个定时器。定时间隔最好能够用调用方指定。Go语言标准库提供 time.Ticker 定时器对象,我们可以直接使用。

type bytesCounter struct {
    w io.Writer
    d time.Duration
    f func(n int)

    c atomic.Int64
    t *time.Ticker
}

func (bc *bytesCounter) Start() {
    bc.t = time.NewTicker(bc.d)
    for range bc.t.C {
        if n := bc.c.Swap(0); n > 0 {
            bc.f(int(n))
        }
    }
}

我在这里引入两个参数:d 用来指定保存的时间间隔,f 是一个回调函数,具体如何保存由调用方自行处理,这里只发送当前的流量数据。然后就是 Start 函数。

time.Ticker 创建之后对外提供一个 Channel 变量 C。到了规定的时间间隔,time.Ticker 就会向 C 写入当前时间。定时逻辑可以使用 for 循环不断地从 C 中读出内容,这样就能实现定时保存。

因为用了原子变量,我们需要通过 Swap 来读取并重置当前的流量计数器。假设当前计数器值为 1024,那么调用 Swap(0) 之后,计数器的值会被重置为 0 而且函数返回 1024。整个过程是线程安全的,这样我样就可以阶断性地保存当前流量值了。

到这里,我们需要再次修改原来的代理逻辑:

dst := &bytesCounter{
    w: downConn,
    f: func (n int) { /* ... */ },
    d: 1 * time.Duration,
}
go dst.Start()
io.Copy(dst, upConn)

我们在这里新起了一个协程来执行 Start 函数,也就是定时回调 f 指定的保存函数。

最后我们还需要处理连接断开的情形。通信结束后,上下游连接都会断开,这时我们需要停掉所有相关的协程,并把最后的流量信息保存到数据库。为此我们还需要一个结束函数:

func (bc *bytesCounter) Done() {
    bc.t.Stop()
    if n := bc.c.Swap(0); n > 0 {
        bc.f(int(n))
    }
}

我最开始想的就是这么简单。关闭定时器,然后没保存的数据再提交一次。我以为定时器关闭后它对应的 C 也会自动关闭,如果是这样,执行 Start 函数的协程也会自动退出。但实际上定时器不会关闭它自己的 C 通道。我们需要额外给 Start 函数协程发送控制信号。

为此,我们还得额外引入一个 chan 对象:

type bytesCounter struct {
    w io.Writer
    d time.Duration
    f func(n int)

    c atomic.Int64
    t *time.Ticker
    s chan int
}

然后我们需要在 Start 函数里同时检查 s 和 t.C 两个通道的状态。这是 select 最典型的应用场景:

func (bc *bytesCounter) Start() {
    bc.s = make(chan int, 1)
    bc.t = time.NewTicker(bc.d)

    for {
        select {
        case <-bc.t.C:
            // 定时间隔到
            if n := bc.c.Swap(0); n > 0 {
                bc.f(int(n))
            }
        case <-bc.s:
            // 退出
            if n := bc.c.Swap(0); n > 0 {
                bc.f(int(n))
            }
            return
        }
    }
}

func (bc *bytesCounter) Done() {
    bc.t.Stop() // 停止定时器
    close(bc.s) // 通知 Start 协程退出
}

到这里我们就得到了一个完整的流量计数器。相应的代理逻辑需要做如下改动:

var wg sync.WaitGroup
wg.Add(2)

// 初始化计数器
cost := func(n in) { /* ... */ }
u := &bytesCounter{w: upConn, d: 1 * time.Second, f: cost}
d := &bytesCounter{w: downConn, d: 1 * time.Second, f: cost}
// 启动计数协程
go u.Start()
go d.Start()
// 转发上下游流量
go func() {
    defer wg.Done()
    io.Copy(u, downConn)
}()
go func() {
    defer wg.Done()
    io.Copy(d, upConn)
}()
// 待待通信结束
wg.Wait()
// 停止计数协程并提交最后数据
u.Done()
d.Done()

以上就是本次分享的主要内容了。我们通过修饰器模式配合定时器和原子变量实现了套简单的流量计费逻辑。虽然功能简单,却也用到了比较复杂的协程同步技术。从这个意义上也验证我对 Go语言协程的判断,Go协程只是降低了并发编程的门槛,但丝毫没有改变并发编程的难度。大家在使用协程的时候一定要小心谨慎😐


  1. ChatGTD 是我自己开发的 ChatGPT 代理,欢迎使用 https://chatgtd.net↩︎

  2. 如果你不了解协议原理,可以看我的这篇文章 ../https-tunnel.html↩︎