Revisiting JSON streaming in Go

Michael Francis
10 min readSep 4, 2022

--

Photo by Haroon Niaz on Unsplash

I didn’t plan on writing the fourth part of a series on JSON streaming, but then I stumbled on a pattern in a Go protobuf decoder that triggered the exploration of a different way to support interface streaming. So I highly recommend reading parts one through three.

Part 3 introduced a pattern whereby we injected the stream with a field ‘$type’ that allowed us to reconstruct the stream into our interface types. Our solution relies on the process of creating the stream, including the required field with the correct type. In this article, we set out to automate this and allow the round-tripping of Go interface implementation. All of the setup code is the same. Though you might notice that we are now including the reflect package.
package main

package main

import (
"encoding/json"
"fmt"
"reflect"
"strings"
)

type Type string
type MyInterface interface {
Type() Type
New() MyInterface
}

var lookup = make(map[Type]MyInterface)

func Register(iface MyInterface) {
lookup[iface.Type()] = iface
}

func init() {
Register(StructA{})
Register(StructB{})
}

type StructA struct {
A float64 `json:"a"`
}
type StructB struct {
B string `json:"b"`
}

func (_ StructA) Type() Type {
return "StructA"
}

func (_ StructB) Type() Type {
return "StructB"
}

func (_ StructA) New() MyInterface {
return &StructA{}
}

func (_ StructB) New() MyInterface {
return &StructB{}
}

// Check that we have implemented the interface
var _ MyInterface = (*StructA)(nil)
var _ MyInterface = (*StructB)(nil)
type MyList []MyInterface

Our goal is to be able to run the following code and have it not panic

func main() {
l := MyList{
StructA{A: 1.23},
StructA{A: 3.45},
StructB{B: "hello"},
}
b, _ := json.Marshal(l)
var nl MyList
_ = json.Unmarshal(b, &nl)
fmt.Println(l)
fmt.Println(nl)
if fmt.Sprint(l) != fmt.Sprint(nl) {
panic("Unable to round-trip")
}
fmt.Println(string(b))
}

Sidebar, you’ll notice that I perform a string comparison to assert that the instantiated types are the same. I do this as there is no standard comparison operator between slices. In ‘real’ code, I would use a custom comparison implementation.

As mentioned above, the solution relies on reflection, have a look at this blog post, or walk through one of the many tutorials on reflection if you are unfamiliar.

First, let us create a routine that, given an instance of our interface type, constructs a JSON stream with an embedded ‘$type’ field. The naive way, well, the way with the most typing, would be to implement a custom MarshalJSON method for each implementation. Creating a custom function isn’t hard to do but is somewhat error-prone since I need to add a field to the struct, and this field needs a structure tag to render it correctly in the stream.

func (t StructA)MarshalJSON() ([]byte, error) {
return json.Marshal( struct {
A float64
Type string `json:"$type"`
}{
A: t.A,
Type: t.Type(),
})
}

Specifically, the tags defined for JSON streaming, documented under Marshal

Encoding the stream

With that out of the way, we look at a more generic solution, and the code follows.

func LazyEncode(item MyInterface) ([]byte, error) {
elements := make(map[string]json.RawMessage)
val := reflect.Indirect(reflect.ValueOf(item))
typ := val.Type()
// iterate through the fields of the struct
for i := 0; i < typ.NumField(); i++ {
if typ.Field(i).IsExported() {
fieldValue := val.Field(i)
tag := typ.Field(i).Name
b, err := json.Marshal(fieldValue.Interface())
if err != nil {
return nil, err
}
elements[tag] = b
}
}
// Encode the type string
t, err := json.Marshal(item.Type())
if err != nil {
return nil, err
}
elements["$type"] = t
// Finally, marshal the entire result
return json.Marshal(elements)
}

Ok, so what is going on here? First, we use a bit of ‘magic’ from the JSON package; this is the json.RawMessage that we used in earlier sections. Only in this case we make a map from string to json.RawMessage. We will use this map to hold the stream representation for each of the exported fields of our struct.

elements := make(map[string]json.RawMessage)

Note that a further enhancement to this code is respecting the JSON struct tags and potentially adding additional behavior tags.

Using reflection, we get the value and the field type, and note we use `reflect.Indirect` here so that we don’t have to worry whether the passed-in type is a pointer or a value.

val := reflect.Indirect(reflect.ValueOf(item))
typ := val.Type()

We then iterate across the fields in our structure turning each into a byte slice by using the standard json.Marshal method.

for i := 0; i < typ.NumField(); i++ {
if typ.Field(i).IsExported() {
fieldValue := val.Field(i)
tag := typ.Field(i).Name
b, err := json.Marshal(fieldValue.Interface())
if err != nil {
return nil, err
}
elements[tag] = b
}
}

We now have a map filled with each field of our struct. Finally, we add our custom ‘$type’ field and marshal the result as JSON.

[{“$type”:”StructA”,”A”:1.23},{“$type”:”StructA”,”A”:3.45},{“$type”:”StructB”,”B”:”hello”}]

Obviously, for production code, we are now writing several tests to ensure that this function behaves correctly. We can also update our ‘MyList’ MarshalJSON function to use this new code.

func (l MyList) MarshalJSON() ([]byte, error) {
raw := make([]json.RawMessage, len(l))
for i, v := range l {
b, err := LazyEncode(v)
if err != nil {
return nil, err
}
raw[i] = b
}
return json.Marshal(raw)
}

In an ideal world, we would not have to use delayed JSON decoding, but since we can’t extend the default implementation of JSON encoding, we have to do this in our code. The other way we could have achieved this is to customize each of our implementations.

func (t StructA) MarshalJSON() ([]byte, error) {
return LazyEncode( t )
}

I hope you will agree it is much less error-prone than before. Resulting in the desired stream.

{“$type”:”StructA”,”A”:1.23}

Decoding the stream

The stream we have just created is directly compatible with the stream being read in part 3

So we could stop here, but hopefully, the stream encoding logic gave a hint at the solution. We are going to perform the inverse operation and define a function

func LazyDecode(b []byte) (MyInterface, error) {...}

That uses similar reflection logic to turn our byte stream into an implementation of our interface.

If you look back at how we defined ‘New’ we had it return a pointer to a new instance of our type. This behavior is one of the most confusing areas of Go where an interface can point to a value of a type or a pointer to a type. There is no way to enforce that an interface defined with value receivers only contains a just a value. Both of the following are correct implementations of the interface.

var foo MyInterface = StructA{A: 1}
var bar MyInterface = &StructA{A: 2}

If I print these using fmt.Println, I get the following

{1}
&{2}

It makes sense when you think about it, but it can come back to bite you. Here is my LazyDecode function; I’ve commented the function, and it is a mirror image of the LazyEncode

func LazyDecode(b []byte) (MyInterface, error) {
// Create a map from string to RawMessage
// when we Unmarshal to this map the elements
// will not be decoded
elements := make(map[string]json.RawMessage)
err := json.Unmarshal(b, &elements)
if err != nil {
return nil, err
}
// Check that we have our type field in the stream
if typ, ok := elements["$type"]; ok {
// Now we decode the type into a string
var t string
err := json.Unmarshal(typ, &t)
if err != nil {
return nil, err
}
// Create a new instance of the type
myInterfaceFunc, ok := lookup[Type(t)]
// use reflection to iterate the
if !ok {
return nil, fmt.Errorf("unregistered interface type : %s", t)
}
item := myInterfaceFunc.New()
// Due to the way we defined New we have a pointer to our type
val := reflect.Indirect(reflect.ValueOf(item))
typ := val.Type()
// iterate through the fields of the struct
for i := 0; i < typ.NumField(); i++ {
if typ.Field(i).IsExported() {
fieldValue := val.Field(i)
// Get the field from stream and decode it
// into our structure
tag := typ.Field(i).Name
elem, ok := elements[tag]
if !ok {
// Fall back to lowercase,
// needs to be case insensitive
elem, ok = elements[strings.ToLower(tag)]
}
if ok {
err := json.Unmarshal(elem, fieldValue.Addr().Interface())
if err != nil {
return nil, err
}
} else {
fmt.Printf("Warn: tag %s not found in stream", tag)
// FIXME, we need structure tags to tell if this is empty or not
}
}
}
return item, nil
}
return nil, fmt.Errorf("invalid stream, no $type specified")
}

When we run this code on our JSON message we get the following

[0x14000018218 0x14000018268 0x14000010360]

We have a list of pointers to our implementation. This result isn’t incorrect, but it is not a valid round trip. We expected to see and for our code to non-panic.

[{1.23} {3.45} {hello}]

Creating a value-based result

We once again turn to reflection; this time, we use the information about our type to create a new value-based version of the type. Thankfully solving our problem is a one-line change! Rather than returning our item directly, we produce the value type. We have to cast this to the MyInterface type since .Interface() returns interface{}, but this is safe as we know that only types that implement MyInterface are supported.

return val.Interface().(MyInterface), nil

We now get the result we expected

func main() {
l := MyList{
StructA{A: 1.23},
StructA{A: 3.45},
StructB{B: "hello"},
}
b, _ := json.Marshal(l)
var nl MyList
_ = json.Unmarshal(b, &nl)
fmt.Println(l)
fmt.Println(nl)
if fmt.Sprint(l) != fmt.Sprint(nl) {
panic("Unable to round-trip")
}
fmt.Println(string(b))
}

generates the following output and no longer panics.

[{1.23} {3.45} {hello}]
[{1.23} {3.45} {hello}]
[{“$type”:”StructA”,”A”:1.23},{“$type”:”StructA”,”A”:3.45},{“$type”:”StructB”,”B”:”hello”}]

Full code

package main

import (
"encoding/json"
"fmt"
"reflect"
"strings"
)

type Type string
type MyInterface interface {
Type() Type
New() MyInterface
}

var lookup = make(map[Type]MyInterface)

func Register(iface MyInterface) {
lookup[iface.Type()] = iface
}

func init() {
Register(StructA{})
Register(StructB{})
}

type StructA struct {
A float64 `json:"a"`
}
type StructB struct {
B string `json:"b"`
}

func (_ StructA) Type() Type {
return "StructA"
}

func (_ StructB) Type() Type {
return "StructB"
}

func (_ StructA) New() MyInterface {
return &StructA{}
}

func (_ StructB) New() MyInterface {
return &StructB{}
}

// Check that we have implemented the interface
var _ MyInterface = (*StructA)(nil)
var _ MyInterface = (*StructB)(nil)

type MyList []MyInterface

func LazyEncode(item MyInterface) ([]byte, error) {
elements := make(map[string]json.RawMessage)
val := reflect.Indirect(reflect.ValueOf(item))
typ := val.Type()
// iterate through the fields of the struct
for i := 0; i < typ.NumField(); i++ {
if typ.Field(i).IsExported() {
fieldValue := val.Field(i)
// Get the field from stream and decode it into out structure
tag := typ.Field(i).Name
b, err := json.Marshal(fieldValue.Interface())
if err != nil {
return nil, err
}
elements[tag] = b
}
}
// Encode the type string
t, err := json.Marshal(item.Type())
if err != nil {
return nil, err
}
elements["$type"] = t
// Finally, marshal the entire result
return json.Marshal(elements)
}

func LazyDecode(b []byte) (MyInterface, error) {
// Create a map from string to RawMessage
// when we Unmarshal to this map the elements
// will not be decoded
elements := make(map[string]json.RawMessage)
err := json.Unmarshal(b, &elements)
if err != nil {
return nil, err
}
// Check that we have our type field in the stream
if typ, ok := elements["$type"]; ok {
// Now we decode the type into a string
var t string
err := json.Unmarshal(typ, &t)
if err != nil {
return nil, err
}
// Create a new instance of the type
myInterfaceFunc, ok := lookup[Type(t)]
// use reflection to iterate the
if !ok {
return nil, fmt.Errorf("unregistered interface type : %s", t)
}
item := myInterfaceFunc.New()
// Due to the way we defined New we have a pointer to our type
val := reflect.Indirect(reflect.ValueOf(item))
typ := val.Type()
// iterate through the fields of the struct
for i := 0; i < typ.NumField(); i++ {
if typ.Field(i).IsExported() {
fieldValue := val.Field(i)
// Get the field from stream and decode it into out structure
tag := typ.Field(i).Name
elem, ok := elements[tag]
if !ok {
// Fall back to lowercase, needs to be case insensitive
elem, ok = elements[strings.ToLower(tag)]
}
if ok {
err := json.Unmarshal(elem, fieldValue.Addr().Interface())
if err != nil {
return nil, err
}
} else {
fmt.Printf("Warn: tag %s not found in stream", tag)
// FIXME, we need structure tags to tell if this is empty or not
}
}
}
return val.Interface().(MyInterface), nil
}
return nil, fmt.Errorf("invalid stream, no $type specified")
}

func (l *MyList) UnmarshalJSON(b []byte) error {
var raw []json.RawMessage
err := json.Unmarshal(b, &raw)
if err != nil {
return err
}
// Allocate an array of MyInterface
*l = make(MyList, len(raw))
for i, r := range raw {
item, err := LazyDecode(r)
if err != nil {
return err
}
(*l)[i] = item
}
return nil
}

func (l MyList) MarshalJSON() ([]byte, error) {
raw := make([]json.RawMessage, len(l))
for i, v := range l {
b, err := LazyEncode(v)
if err != nil {
return nil, err
}
raw[i] = b
}
return json.Marshal(raw)
}

func main() {
l := MyList{
StructA{A: 1.23},
StructA{A: 3.45},
StructB{B: "hello"},
}
b, _ := json.Marshal(l)
var nl MyList
_ = json.Unmarshal(b, &nl)
fmt.Println(l)
fmt.Println(nl)
if fmt.Sprint(l) != fmt.Sprint(nl) {
panic("Unable to round-trip")
}
fmt.Println(string(b))
}

I hope this run-through was helpful; some additional features are required before this is production code, not least the tests and handling of the JSON flags. If you liked this, please give it a thumbs up and subscribe for more content.

--

--

No responses yet