Go 로 만든 두 프로그램이 통신을 할 일이 생겼다. 네트워크로 통신을 해야만 했기 때문에 Web에 익숙한 나에게 처음 드는 생각은 당연히 HTTP 였지만, 속도가 더욱 중요한 이 프로그램에서 HTTP 는 오버헤드가 크다고 생각해서 TCP 를 통신 프로토콜로 하면 좋을 것 같다고 생각하고 개발을 진행하였다.

당연하게도 처음엔 너무 너무 쉬웠다! 대략적인 코드는 아래와 같았다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// TCP Server
package main

import (
    "io"
    "log"
    "net"
)

func main() {
    l, err := net.Listen("tcp", ":5032")
    if nil != err {
        log.Fatalf("fail to bind address to 5032; err: %v", err)
    }
    defer l.Close()

    for {
        conn, err := l.Accept()
        if nil != err {
            log.Printf("fail to accept; err: %v", err)
            continue
        }
        go ConnHandler(conn)
    }
}

func ConnHandler(conn net.Conn) {
    recvBuf := make([]byte, 4096) // receive buffer: 4kB
    for {
        n, err := conn.Read(recvBuf)
        if nil != err {
            if io.EOF == err {
                log.Printf("connection is closed from client; %v", conn.RemoteAddr().String())
                return
            }
            log.Printf("fail to receive data; err: %v", err)
            return
        }
        if 0 < n {
            data := recvBuf[:n]
            log.Println(string(data))
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// TCP Client
package main

import (
    "log"
    "net"
    "time"
)

func main() {
    conn, err := net.Dial("tcp", "server0:5032")
    if nil != err {
        log.Fatalf("failed to connect to server")
    }

    // some event happens

    for {
        // heartbeat
        conn.Write([]byte("ping"))
        time.Sleep(time.Duration(3) * time.Second)
    }
}

군더더기 없이, 깔끔하다.

여기에 어차피 Go 들간의 통신이니 encoding/gob 패키지를 이용해서 바이트 스트림을 인코딩 하면 좋겠다고 생각했다. 그래서 클라이언트와 서버 간의 통신 메시지를 대략적으로 다음과 같이 정의하였다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type MyMsg struct {
    Header MyMsgHeader
    Body   interface{}
}
type MyMsgHeader struct {
    MsgType string
    Date    string
}
type MyMsgBodyPing struct {
    Content string
}

그래서 업데이트 된 코드는 다음과 같다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// TCP Server
package main

import (
    "bytes"
    "encoding/gob"
    "io"
    "log"
    "net"
)

func init() {
    gob.Register(MyMsgBodyPing{})
}

func main() {
    l, err := net.Listen("tcp", ":5032")
    if nil != err {
        log.Fatalf("fail to bind address to 5032; err: %v", err)
    }
    defer l.Close()

    for {
        conn, err := l.Accept()
        if nil != err {
            log.Printf("fail to accept; err: %v", err)
            continue
        }
        go ConnHandler(conn)
    }
}

type MyMsg struct {
    Header MyMsgHeader
    Body   interface{}
}
type MyMsgHeader struct {
    MsgType string
    Date    string
}
type MyMsgBodyPing struct {
    Content string
}

func ConnHandler(conn net.Conn) {
    var (
        codecBuffer bytes.Buffer
        dec         *gob.Decoder = gob.NewDecoder(&codecBuffer)
    )

    recvBuf := make([]byte, 4096) // receive buffer: 4kB
    for {
        n, err := conn.Read(recvBuf)
        if nil != err {
            if io.EOF == err {
                log.Printf("connection is closed from client; %v", conn.RemoteAddr().String())
                return
            }
            log.Printf("fail to receive data; err: %v", err)
            return
        }
        if 0 < n {
            data := recvBuf[:n]
            codecBuffer.Write(data)

            msg := MyMsg{}
            if err = dec.Decode(&msg); nil != err {
                log.Printf("failed to decode message; err: %v", err)
                continue
            }

            log.Println("msg: ", msg)
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// TCP Client
package main

import (
    "bytes"
    "encoding/gob"
    "log"
    "net"
    "time"
)

func init() {
    gob.Register(MyMsgBodyPing{})
}

type MyMsg struct {
    Header MyMsgHeader
    Body   interface{}
}
type MyMsgHeader struct {
    MsgType string
    Date    string
}
type MyMsgBodyPing struct {
    Content string
}

func main() {
    conn, err := net.Dial("tcp", "server0:5032")
    if nil != err {
        log.Fatalf("failed to connect to server")
    }

    var (
        codecBuffer bytes.Buffer
        enc         *gob.Encoder = gob.NewEncoder(&codecBuffer)
    )

    for {
        enc.Encode(MyMsg{
            Header: MyMsgHeader{
                MsgType: "ping",
                Date:    time.Now().UTC().Format(time.RFC3339),
            },
            Body: MyMsgBodyPing{
                Content: "Hello! I'm alive!",
            },
        })

        conn.Write(codecBuffer.Bytes())
        codecBuffer.Reset()
        time.Sleep(time.Duration(3) * time.Second)
    }
}

Body Type만 메시지 프로토콜 종류에 따라 늘려주면, 이 구조는 잘 동작하는 것처럼 보인다.

사실 거의 문제가 없는 것처럼 너무 잘 동작해서… 하마터면 알아치리지 못할 뻔 했다.

위의 코드는 무엇이 문제일까?


네트워크 프로그래밍을 많이 해보신 분이라면 아시겠지만, 네트워크 프로그래밍에 다양한 경험이 없던 나로써는 문제점을 당연히 모르고 있었다.

문제점은 바로, recvBuf 의 크기를 넘는 데이터가 수신되는 경우 gob 메시지 디코딩에 실패하게 된다. 현재 설정한 크기는 4096 바이트이니, 4kB 를 초과하는 데이터를 수신할 경우 conn.Read 에서 딱 4 kB까지만 읽는다.

예를 들어서 6000 바이트의 데이터를 전송받은 경우, OS 에서는 TCP 버퍼에 전송받은 6000 바이트의 데이터를 쌓아놓는다. go 프로그램에서 conn.Read 가 호출되면 시스템콜을 통해 OS의 TCP 버퍼로부터 4096 바이트를 읽어오고, 그 다음의 conn.Read 호출에서 남은 1094 바이트를 읽어오는 것이다.

다행히도 OS 에서 TCP 세그먼트 처리를 잘해주기 때문에 세그먼트의 순서는 보장되며(즉 conn.Write 에서 쓰여지는 데이터의 순서가 conn.Read 에서 읽는 데이터의 순서와 일치하다는 것이 보장된다!), 어플리케이션에서 한번 읽을때 다 읽지 않아도 TCP 버퍼를 통해 여러 번 나누어서 읽을 수 있다는 아주 감사한 특징이 있긴 하지만 OS가 해주는 부분은 거기까지이며, 어플리케이션에서는 어디까지의 바이트를 읽어야 하는가?, 즉 이 바이트 스트림 중에 메시지의 끝이 어디인가? 에 대한 부분을 처리해야 한다.


맨날 Web 의 세계에만 있던 나에게는 어려운 문제였다. 한번 요청하면 응답이 오는게 당연한 거라고 생각했는데… 심지어 WebSocket 을 이용해도 통신의 기본 단위가 ‘바이트 스트림’이 아니라 ‘메시지’ 이기 때문에 이러한 문제는 직면하지 못했었다. 이전에 TCP 를 잠깐 맛보았을 때에는 데이터의 크기가 작아서 알아채지 못했다.

일단 문제는 알았는데 어떻게 해결할 지 몰라서, 일단 곰곰히 생각했다. 제일 처음 드는 생각은 전송하는 MyMsg 자체를 버퍼의 크기 (4096) 만큼 나눠서, 해당 데이터에서 상태를 체크하는 것이었다. 가령 다음과 같이 말이다.

1
2
3
4
5
6
// 이 코드를 따라하지 마시오!!
// 잘못된 예시임
type TCPMsg struct {
    Status uint8 // 0: 한번에 전송 성공, 1: 송신 데이터 남아있음, 2: 송신 완료
    Data   MyMsg // 전송하려는 데이터
}

즉 전송하고자 하는 데이터인 MyMsg 를 TCPMsg 로 감싸고, 수신측에서 해당 TCPMsg 를 파싱해서 체크하는 것이었다.

말이 안되는 것은 아니지만, 다음과 같은 문제점이 있었다.

  1. TCPMsg 가 버퍼의 크기만큼임을(4096 바이트) 송신 측에서 확실하게 해야 함. 그에 따른 처리가 필요
  2. MyMsg -> gob 인코딩 데이터 -> TCPMsg -> 추가로 인코딩… 두 번의 인코딩…?!
  3. 심지어 TCPMsg 에 들어가는 Data 는 gob 으로 인코딩 된 데이터인데.. 크기를 알기가… 어렵다…

아무리 생각해도 비효율적이라 생각해서 실제로 구현하진 않았다.

두번째로 들었던 생각은 그냥 recvBuf 의 크기를 늘리는 것이었다. 임시 방편이지만, 확실하다고 생각했다… 하지만 여기서도 간과한 것이 있었으니, 전송하는 데이터의 크기가 OS의 TCP 버퍼보다 커지면 어플리케이션의 버퍼는 의미가 없었다. OS의 TCP 버퍼가 가득차서 더이상 메시지를 받지 못하는 상태가 되어버리면, 어플리케이션에서 그 이상 읽지 못하기 때문이다. 대개 32 비트 운영체제에서의 TCP 버퍼는 32 kB, 64 비트 운영체제에서의 TCP 버퍼는 64 kB 로 설정되어 있기 때문에… 그보다 큰 recvBuf 는 의미가 없었다.

뭔가 방법이 없을까, 다음과 같은 오아시스 글을 찾았다.

https://stackoverflow.com/a/2390150

결론적으로 다음 두 가지 방법이 있었다.

  • 고정된 크기의 바이트를 먼저 보낸다. 해당 바이트에는 앞으로 보낼 메시지의 크기를 보낸다. 그 이후에 실제 메시지를 전송한다.
  • 메시지가 끝나는 부분을 데이터에 실어 보낸다. 예를 들어 HTML의 마크업은 시작과 끝이 명확하다 (<a>메시지이이이</a>). 라인 캐리지(\n) 같은 바이트를 메시지의 끝에 전송한다.

첫 번째 방법을 사용하게 되면 일일히 conn.Write 부분에 메시지의 크기를 먼저 보내고 이후에 메시지를 보내야 하니, 별도의 SendMessage 함수를 만들어서 사용하면 될 것이다. 그리고 conn.Read 시에도 메시지의 크기를 먼저 읽고 이후에 메시지를 읽으면 되니, 별도의 RecvMessage 함수를 만들어서 사용하면 될 것이다.

두 번째 방법을 사용하게 되면 그냥 메시지가 끝나는 부분으로 정의한 바이트까지 읽어서 사용하면 되지만, 전송하고자 하는 데이터가 gob 으로 인코딩 되기 때문에 적합하지 않다고 생각했다.

그래서 첫 번째 방법을 이용하여 문제를 해결하였고, 그에 대한 코드는 다음과 같다.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
// TCP Server
package main

import (
    "bytes"
    "encoding/binary"
    "encoding/gob"
    "io"
    "log"
    "net"
)

func init() {
    gob.Register(MyMsgBodyPing{})
}

func main() {
    l, err := net.Listen("tcp", ":5032")
    if nil != err {
        log.Fatalf("fail to bind address to 5032; err: %v", err)
    }
    defer l.Close()

    for {
        conn, err := l.Accept()
        if nil != err {
            log.Printf("fail to accept; err: %v", err)
            continue
        }
        go ConnHandler(conn)
    }
}

type MyMsg struct {
    Header MyMsgHeader
    Body   interface{}
}
type MyMsgHeader struct {
    MsgType string
    Date    string
}
type MyMsgBodyPing struct {
    Content string
}

type MyConnection struct {
    conn        net.Conn
    dec         *gob.Decoder
    codecBuffer bytes.Buffer
    recvBuf     []byte
}

func (mc *MyConnection) RecvMessage() (MyMsg, error) {
    lengthBuf := make([]byte, 4)
    _, err := mc.conn.Read(lengthBuf)
    if nil != err {
        return MyMsg{}, err
    }
    msgLength := binary.LittleEndian.Uint32(lengthBuf)

    mc.codecBuffer.Reset()

    for 0 < msgLength {
        n, err := mc.conn.Read(mc.recvBuf)
        if nil != err {
            return MyMsg{}, err
        }
        if 0 < n {
            data := mc.recvBuf[:n]
            mc.codecBuffer.Write(data)
            msgLength -= uint32(n)
        }
    }

    msg := MyMsg{}
    if err = mc.dec.Decode(&msg); nil != err {
        log.Printf("failed to decode message; err: %v", err)
        return msg, err
    }
    return msg, nil
}

func ConnHandler(conn net.Conn) {
    mc := MyConnection{
        conn:       conn,
        recvBuf:    make([]byte, 4096),
    }
    mc.dec = gob.NewDecoder(&mc.codecBuffer)

    for {
        msg, err := mc.RecvMessage()
        if nil != err {
            if io.EOF == err {
                log.Printf("connection is closed from client; %v", mc.conn.RemoteAddr().String())
                return
            }
            log.Printf("failed to recv message! err: %v", err)
            continue
        }

        log.Println("msg: ", msg)
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// TCP Client
package main

import (
    "bytes"
    "encoding/binary"
    "encoding/gob"
    "log"
    "net"
    "time"
)

func init() {
    gob.Register(MyMsgBodyPing{})
}

type MyMsg struct {
    Header MyMsgHeader
    Body   interface{}
}
type MyMsgHeader struct {
    MsgType string
    Date    string
}
type MyMsgBodyPing struct {
    Content string
}

type MyConnection struct {
    conn        net.Conn
    enc         *gob.Encoder
    codecBuffer bytes.Buffer
}

func (mc *MyConnection) SendMessage(msg MyMsg) error {
    mc.codecBuffer.Reset()

    mc.enc.Encode(msg)

    lengthBuf := make([]byte, 4)
    binary.LittleEndian.PutUint32(lengthBuf, uint32(mc.codecBuffer.Len()))

    if _, err := mc.conn.Write(lengthBuf); nil != err {
        log.Printf("failed to send msg length; err: %v", err)
        return err
    }

    if _, err := mc.conn.Write(mc.codecBuffer.Bytes()); nil != err {
        log.Printf("failed to send msg; err: %v", err)
        return err
    }

    return nil
}

func main() {
    conn, err := net.Dial("tcp", "server0:5032")
    if nil != err {
        log.Fatalf("failed to connect to server")
    }

    mc := MyConnection{
        conn: conn,
    }
    mc.enc = gob.NewEncoder(&mc.codecBuffer)

    for {
        mc.SendMessage(MyMsg{
            Header: MyMsgHeader{
                MsgType: "ping",
                Date:    time.Now().UTC().Format(time.RFC3339),
            },
            Body: MyMsgBodyPing{
                Content: "Hello! I'm alive!",
            },
        })

        time.Sleep(time.Duration(3) * time.Second)
    }
}

이제 어떠한 크기의 데이터라도 TCP 를 통해서 전송할 수 있게 되었다!

하지만 대량으로 병렬의 연산이 일어날 경우에는 문제가 발생하게 된다. 가령, 메시지를 읽고 있는 도중에 Write 이 발생할 경우에는 codecBuffer 가 레이스 컨디션으로 인해 제대로 처리되지 않는 일이 발생하고 마는 것. 그래서 몇 가지 리팩토링이 더 필요하다.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// TCP Server
package main

import (
    "bytes"
    "encoding/binary"
    "encoding/gob"
    "io"
    "log"
    "net"
    "sync"
)

func init() {
    gob.Register(MyMsgBodyPing{})
}

func main() {
    l, err := net.Listen("tcp", ":5032")
    if nil != err {
        log.Fatalf("fail to bind address to 5032; err: %v", err)
    }
    defer l.Close()

    for {
        conn, err := l.Accept()
        if nil != err {
            log.Printf("fail to accept; err: %v", err)
            continue
        }
        go ConnHandler(conn)
    }
}

type MyMsg struct {
    Header MyMsgHeader
    Body   interface{}
}
type MyMsgHeader struct {
    MsgType string
    Date    string
}
type MyMsgBodyPing struct {
    Content string
}

type MyConnection struct {
    recv chan MyMsg

    conn            net.Conn
    dec             *gob.Decoder
    codecBuffer     bytes.Buffer
    codecBufferLock sync.Mutex
    recvBuf         []byte
}

func (mc *MyConnection) RecvMessage() (MyMsg, error) {
    mc.codecBufferLock.Lock()
    defer mc.codecBufferLock.Unlock()

    lengthBuf := make([]byte, 4)
    _, err := mc.conn.Read(lengthBuf)
    if nil != err {
        return MyMsg{}, err
    }
    msgLength := binary.LittleEndian.Uint32(lengthBuf)

    mc.codecBuffer.Reset()

    for 0 < msgLength {
        n, err := mc.conn.Read(mc.recvBuf)
        if nil != err {
            return MyMsg{}, err
        }
        if 0 < n {
            data := mc.recvBuf[:n]
            mc.codecBuffer.Write(data)
            msgLength -= uint32(n)
        }
    }

    msg := MyMsg{}
    if err = mc.dec.Decode(&msg); nil != err {
        log.Printf("failed to decode message; err: %v", err)
        return msg, err
    }
    return msg, nil
}

func (mc *MyConnection) MyMsgReceiver() {
    // receiver
    go func() {
        for {
            msg, err := mc.RecvMessage()
            if nil != err {
                if io.EOF == err {
                    log.Printf("connection is closed from client; %v", mc.conn.RemoteAddr().String())
                    return
                }
                log.Printf("failed to recv message! err: %v", err)
                continue
            }

            mc.recv <- msg
        }
    }()

    // message handler
    for {
        select {
        case msg := <-mc.recv:
            log.Println(msg)
        }
    }
}

func ConnHandler(conn net.Conn) {
    mc := MyConnection{
        conn:    conn,
        recvBuf: make([]byte, 4096),
        recv:    make(chan MyMsg),
    }
    mc.dec = gob.NewDecoder(&mc.codecBuffer)

    mc.MyMsgReceiver()
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
// TCP Client
package main

import (
    "bytes"
    "encoding/binary"
    "encoding/gob"
    "log"
    "net"
    "sync"
    "time"
)

func init() {
    gob.Register(MyMsgBodyPing{})
}

type MyMsg struct {
    Header MyMsgHeader
    Body   interface{}
}
type MyMsgHeader struct {
    MsgType string
    Date    string
}
type MyMsgBodyPing struct {
    Content string
}

type MyConnection struct {
    send chan MyMsg

    conn            net.Conn
    enc             *gob.Encoder
    codecBuffer     bytes.Buffer
    codecBufferLock sync.Mutex
}

func (mc *MyConnection) SendMessage(msg MyMsg) error {
    mc.codecBufferLock.Lock()
    defer mc.codecBufferLock.Unlock()

    mc.codecBuffer.Reset()

    mc.enc.Encode(msg)

    lengthBuf := make([]byte, 4)
    binary.LittleEndian.PutUint32(lengthBuf, uint32(mc.codecBuffer.Len()))

    if _, err := mc.conn.Write(lengthBuf); nil != err {
        log.Printf("failed to send msg length; err: %v", err)
        return err
    }

    if _, err := mc.conn.Write(mc.codecBuffer.Bytes()); nil != err {
        log.Printf("failed to send msg; err: %v", err)
        return err
    }

    return nil
}

func (mc *MyConnection) MyMsgSender() {
    for {
        select {
        case msg := <-mc.send:
            if err := mc.SendMessage(msg); nil != err {
                log.Printf("failed to send message; err: %v", err)
            }
        }
    }
}

func main() {
    conn, err := net.Dial("tcp", "server0:5032")
    if nil != err {
        log.Fatalf("failed to connect to server")
    }

    mc := MyConnection{
        conn: conn,
        send: make(chan MyMsg),
    }
    mc.enc = gob.NewEncoder(&mc.codecBuffer)

    go mc.MyMsgSender()

    for {
        mc.send <- MyMsg{
            Header: MyMsgHeader{
                MsgType: "ping",
                Date:    time.Now().UTC().Format(time.RFC3339),
            },
            Body: MyMsgBodyPing{
                Content: "Hello! I'm alive!",
            },
        }

        time.Sleep(time.Duration(3) * time.Second)
    }
}

codecBufferLock 을 두어 레이스 컨디션을 해결하고, send, recv 채널을 통해 고루틴의 활용 및 코드의 구조화 및 깔끔화를 처리하였다. Everything is now happy!