koluku's blog

コードは毎日の欠片

不完全なjsonを断続的に[]byteで受け取る場合のデコード #go

TL;DR

  • JSONが分割されて送られてくるのが予想される場合は io.Pipe() で受付を待ちながら json.NewDecoder でDecodeしよう

本題

GoでHTTPリクエストした結果がJSONの場合はBodyをそのまま json.Unmarshal() に投げてデコードすることが多いが、途中で打ち切られる場合はこの方法だとパースに失敗してしまうため最悪プログラムが停止してしまう。

例えばHTTPリクエストの結果として以下のJSONL*1を受け取るとする。

{ "Name": "Ed", "Text": "Knock knock." }
{ "Name": "Sam", "Text": "Who's there?" }
{ "Name": "Ed", "Text": "Go fmt." }
{ "Name": "Sam", "Text": "Go fmt who?" }
{ "Name": "Ed", "Text": "Go fmt yourself!" }

理想としてはフルBody(もしくは改行ごとに)で受け取りたいが、以下のように途中でデータが分割された状態でデータが送られてくることがある。

  1.  {"Name":"Ed","Text":"Knock knock."}
     {"Name"
    
  2.  :"Sam","Text":"Wh
    
  3.  o's there?"}
     {"Name":"Ed","Text":"Go fmt."}
    
  4.  {"Name":"Sam","Text":"Go fmt who?"}
    
  5.  {"Name":"Ed","Text":"Go fmt yourself!"}
    

この場合、データを受信するたびにjson.Unmarshal()をしていると2 Object目でパースに失敗してしまう。

var data []byte
for _, v := range jsonStream {
    var rawMessage json.RawMessage
    if err := json.Unmarshal(data, &rawMessage); err != nil {
        log.Fatal(err) // {"Name" を受け取ってしまうと即死
    }
    fmt.Println(string(rawMessage))
}

そのため、データ受付が終わりきるまで[]byteを結合すればいいかと言えばそうでもなく、JSONをパースするまで待機中の使用メモリが大きくなり無駄が大きい。

// メモリ効率が悪い
var data []byte
for _, v := range jsonStream {
    data = append(data, v)
}

var rawMessage json.RawMessage
if err := json.Unmarshal(data, &rawMessage); err != nil {
    log.Fatal(err)
}

こういったメモリ問題を解決する方法として io.Pipe() で受けることが挙げられる。io.Pipe() で作ったReaderとWriterはバッファなしChannelで繋がっており、Readerが読むまでWriterは書き込むことが出来ず、Writerが書き込むまでReaderが読み込む事ができない。Writerはgoroutineで書き込みし続けて、入力が終わったらCloseしてあげる必要がある。

https://pkg.go.dev/io#Pipe

io.Pipe()で作られたReaderはio.Readerであるため json.Decoder() で受け付けることができる。

https://pkg.go.dev/encoding/json#NewDecoder

上記の2つを組み合わせるとメモリ効率良くJSONのパースをすることができる。

pr, pw := io.Pipe()

go func() {
    defer pw.Close()
    for _, v := range jsonStream {
        if _, err := pw.Write(v); err != nil {
            log.Fatal(err)
        }
    }
}()

var lines [][]byte
decoder := json.NewDecoder(pr)
for decoder.More() {
    var v json.RawMessage
    if err := decoder.Decode(&v); err != nil {
        log.Fatal(err)
    }
    lines = append(lines, v)
}

参考

medium.com

rennnosukesann.hatenablog.com


*1:JSON Lines: 雑に言えば1行ごとにJSONを並べた形式