TL;DR
- JSONが分割されて送られてくるのが予想される場合は
io.Pipe()
で受付を待ちながらjson.NewDecoder
でDecodeしよう
本題
GoでHTTPリクエストした結果がJSONの場合はBodyをそのまま json.Unmarshal()
に投げてデコードすることが多いが、途中で打ち切られる場合はこの方法だとパースに失敗してしまうため最悪プログラムが停止してしまう。
例えばHTTPリクエストの結果として以下のJSONL*1を受け取るとする。
{ "Name": "Ed", "Text": "Knock knock." } { "Name": "Sam", "Text": "Who's there?" } { "Name": "Ed", "Text": "Go fmt." } { "Name": "Sam", "Text": "Go fmt who?" } { "Name": "Ed", "Text": "Go fmt yourself!" }
理想としてはフルBody(もしくは改行ごとに)で受け取りたいが、以下のように途中でデータが分割された状態でデータが送られてくることがある。
{"Name":"Ed","Text":"Knock knock."} {"Name"
:"Sam","Text":"Wh
o's there?"} {"Name":"Ed","Text":"Go fmt."}
{"Name":"Sam","Text":"Go fmt who?"}
{"Name":"Ed","Text":"Go fmt yourself!"}
この場合、データを受信するたびにjson.Unmarshal()
をしていると2 Object目でパースに失敗してしまう。
var data []byte for _, v := range jsonStream { var rawMessage json.RawMessage if err := json.Unmarshal(data, &rawMessage); err != nil { log.Fatal(err) // {"Name" を受け取ってしまうと即死 } fmt.Println(string(rawMessage)) }
そのため、データ受付が終わりきるまで[]byteを結合すればいいかと言えばそうでもなく、JSONをパースするまで待機中の使用メモリが大きくなり無駄が大きい。
// メモリ効率が悪い var data []byte for _, v := range jsonStream { data = append(data, v) } var rawMessage json.RawMessage if err := json.Unmarshal(data, &rawMessage); err != nil { log.Fatal(err) }
こういったメモリ問題を解決する方法として io.Pipe()
で受けることが挙げられる。io.Pipe()
で作ったReaderとWriterはバッファなしChannelで繋がっており、Readerが読むまでWriterは書き込むことが出来ず、Writerが書き込むまでReaderが読み込む事ができない。Writerはgoroutineで書き込みし続けて、入力が終わったらCloseしてあげる必要がある。
io.Pipe()
で作られたReaderはio.Reader
であるため json.Decoder()
で受け付けることができる。
https://pkg.go.dev/encoding/json#NewDecoder
上記の2つを組み合わせるとメモリ効率良くJSONのパースをすることができる。
pr, pw := io.Pipe() go func() { defer pw.Close() for _, v := range jsonStream { if _, err := pw.Write(v); err != nil { log.Fatal(err) } } }() var lines [][]byte decoder := json.NewDecoder(pr) for decoder.More() { var v json.RawMessage if err := decoder.Decode(&v); err != nil { log.Fatal(err) } lines = append(lines, v) }
参考
*1:JSON Lines: 雑に言えば1行ごとにJSONを並べた形式