-
Notifications
You must be signed in to change notification settings - Fork 129
Concurrent schema parsing and unmarshalling causes nil pointer dereference #481
Description
Hi there.
It seems there is a concurrency bug that can cause nil pointer dereferences.
This GitHub issue first presents the stack trace, then a test case that can (sometimes) cause it and finally a best bet on the root cause. As a bonus, a stack trace against an older version of the library is also presented.
The stack trace
This is the stack trace I get running against v2.27.0.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x40 pc=0x102fbf314]
goroutine 11352 [running]:
github.com/hamba/avro/v2.(*Field).String(...)
external/com_github_hamba_avro_v2/schema.go:845
github.com/hamba/avro/v2.(*RecordSchema).String(0x14021ec9c70)
external/com_github_hamba_avro_v2/schema.go:642 +0x84
github.com/hamba/avro/v2.(*fingerprinter).Fingerprint(0x14021ec9cc0, {0x1033b87a0?, 0x14021ec9c70?})
external/com_github_hamba_avro_v2/schema.go:291 +0xc4
github.com/hamba/avro/v2.(*RecordSchema).Fingerprint(0x2a5a0?)
external/com_github_hamba_avro_v2/schema.go:691 +0x3c
github.com/hamba/avro/v2.(*cacheFingerprinter).CacheFingerprint(0x14021ec9cf0, {0x1033be080?, 0x14021ec9c70?}, 0x102fa49d4?)
external/com_github_hamba_avro_v2/schema.go:336 +0x2a8
github.com/hamba/avro/v2.(*RecordSchema).CacheFingerprint(0x140055e58b8?)
external/com_github_hamba_avro_v2/schema.go:701 +0x54
github.com/hamba/avro/v2.decoderOfType(0x140055e5df8, {0x1033be180, 0x14030677910}, {0x1033c26e0, 0x140000f6e10})
external/com_github_hamba_avro_v2/codec.go:142 +0x22c
github.com/hamba/avro/v2.decoderOfNullableUnion(0x140055e5df8, {0x1033be140?, 0x14034956840}, {0x1033c25a0, 0x140000f6db0})
external/com_github_hamba_avro_v2/codec_union.go:183 +0x138
github.com/hamba/avro/v2.createDecoderOfUnion(0x140055e5df8, 0x14034956840, {0x1033c25a0, 0x140000f6db0})
external/com_github_hamba_avro_v2/codec_union.go:30 +0x1f0
github.com/hamba/avro/v2.decoderOfType(0x140055e5df8, {0x1033be140, 0x14034956840}, {0x1033c25a0, 0x140000f6db0})
external/com_github_hamba_avro_v2/codec.go:154 +0x3f0
github.com/hamba/avro/v2.decoderOfStruct(0x140055e5df8, {0x1033be080?, 0x1400866bba0}, {0x1033c26e0, 0x140000f6b70})
external/com_github_hamba_avro_v2/codec_record.go:107 +0x534
github.com/hamba/avro/v2.createDecoderOfRecord(0x140055e5df8, {0x1033be080, 0x1400866bba0}, {0x1033c26e0, 0x140000f6b70})
external/com_github_hamba_avro_v2/codec_record.go:16 +0x118
github.com/hamba/avro/v2.decoderOfType(0x140055e5df8, {0x1033be080, 0x1400866bba0}, {0x1033c26e0, 0x140000f6b70})
external/com_github_hamba_avro_v2/codec.go:139 +0x528
github.com/hamba/avro/v2.(*frozenConfig).DecoderOf(0x140001ba090, {0x1033be080, 0x1400866bba0}, {0x1033c25a0, 0x1400103cf30})
external/com_github_hamba_avro_v2/codec.go:75 +0xcc
github.com/hamba/avro/v2.(*Reader).ReadVal(0x1401cc5e540, {0x1033be080, 0x1400866bba0}, {0x1032ed6a0, 0x1400bcc2a80})
external/com_github_hamba_avro_v2/codec.go:45 +0xd4
github.com/hamba/avro/v2.(*frozenConfig).Unmarshal(0x140001ba090, {0x1033be080, 0x1400866bba0}, {0x14000492380?, 0x0?, 0x0?}, {0x1032ed6a0, 0x1400bcc2a80})
external/com_github_hamba_avro_v2/config.go:163 +0x98
github.com/hamba/avro/v2.Unmarshal(...)
external/com_github_hamba_avro_v2/decoder.go:48
<redacted>.Test_Concurrent.func1()
<redacted>/avrohandler_test.go:81 +0xe8
<redacted>.Test_Concurrent in goroutine 7
<redacted>/avrohandler_test.go:74 +0x6e8
The test case
I have created a test case that can (sometimes) cause failures. I need to run it multiple times. Since our build system is bazel, I run it with runs_per_test=20.
package debezium
import (
"math/rand"
"strconv"
"strings"
"sync"
"testing"
"github.com/hamba/avro/v2"
"github.com/stretchr/testify/require"
)
func Test_Concurrent(t *testing.T) {
kvs := DbzEnvelope[Payload]{
Before: &Payload{
Tenant: "foo",
Store: "bar",
PartitionKey: "baz",
SortKey: "qaz",
Version: "faz",
UpdatedAt: "2024-01-01T00:00:00Z",
Tags: &[]*string{toPtr("tag1"), toPtr("tag2")},
Data: toPtr("data"),
Encoding: toPtr("encoding"),
Schema: toPtr(int64(1)),
LookupKeys: &[]*string{toPtr("key1"), toPtr("key2")},
},
After: &Payload{
Tenant: "foo",
Store: "bar",
PartitionKey: "baz",
SortKey: "qaz",
Version: "faz",
UpdatedAt: "2024-01-01T00:00:00Z",
Tags: &[]*string{toPtr("tag1"), toPtr("tag2")},
Data: toPtr("data"),
Encoding: toPtr("encoding"),
Schema: toPtr(int64(1)),
LookupKeys: &[]*string{toPtr("key1"), toPtr("key2")},
},
Source: DbzSource{
Version: "test",
Connector: "ba",
Name: "f",
TsMs: 43,
TsUs: nil,
TsNs: nil,
Snapshot: nil,
Db: "asdf",
Sequence: nil,
Schema: "f",
Table: "f",
TxID: nil,
Lsn: nil,
Xmin: nil,
},
Op: "read",
TsMs: nil,
TsUs: nil,
TsNs: nil,
Transaction: nil,
}
schema, err := avro.Parse(genSchema())
require.NoError(t, err)
msg, err := avro.Marshal(schema, kvs)
require.NoError(t, err)
wg := sync.WaitGroup{}
for i := 0; i < 20000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
schema, err := avro.Parse(genSchema())
require.NoError(t, err)
var kvsEnvelope Envelope[KVPayload]
err = avro.Unmarshal(schema, msg, &kvsEnvelope)
require.NoError(t, err)
}()
}
wg.Wait()
}
func toPtr[T any](x T) *T {
return &x
}
func genSchema() string {
return strings.ReplaceAll(_schema, "%s", strconv.Itoa(rand.Intn(1000)))
}
const _schema = `
{
"type": "record",
"name": "Envelope",
"namespace": "changefeed.pg.testtenant_teststore_%s_kv.public.kvs",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "tenant",
"type": "string"
},
{
"name": "store",
"type": "string"
},
{
"name": "partition_key",
"type": "string"
},
{
"name": "sort_key",
"type": "string"
},
{
"name": "version",
"type": "string"
},
{
"name": "updated_at",
"type": {
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
},
{
"name": "tags",
"type": [
"null",
{
"type": "array",
"items": [
"null",
"string"
]
}
],
"default": null
},
{
"name": "data",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "encoding",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "schema",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "lookup_keys",
"type": [
"null",
{
"type": "array",
"items": [
"null",
"string"
]
}
],
"default": null
}
],
"connect.name": "changefeed.pg.testtenant_teststore_%s_kv.public.kvs.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
},
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.postgresql",
"fields": [
{
"name": "version",
"type": "string"
},
{
"name": "connector",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "ts_ms",
"type": "long"
},
{
"name": "snapshot",
"type": [
{
"type": "string",
"connect.version": 1,
"connect.parameters": {
"allowed": "true,last,false,incremental"
},
"connect.default": "false",
"connect.name": "io.debezium.data.Enum"
},
"null"
],
"default": "false"
},
{
"name": "db",
"type": "string"
},
{
"name": "sequence",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ts_us",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "ts_ns",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "schema",
"type": "string"
},
{
"name": "table",
"type": "string"
},
{
"name": "txId",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "lsn",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "xmin",
"type": [
"null",
"long"
],
"default": null
}
],
"connect.name": "io.debezium.connector.postgresql.Source"
}
},
{
"name": "transaction",
"type": [
"null",
{
"type": "record",
"name": "block",
"namespace": "event",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "total_order",
"type": "long"
},
{
"name": "data_collection_order",
"type": "long"
}
],
"connect.version": 1,
"connect.name": "event.block"
}
],
"default": null
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "ts_us",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "ts_ns",
"type": [
"null",
"long"
],
"default": null
}
],
"connect.version": 2,
"connect.name": "changefeed.pg.testtenant_teststore_%s_kv.public.kvs.Envelope"
}`
type Payload struct {
Tenant string `avro:"tenant"`
Store string `avro:"store"`
PartitionKey string `avro:"partition_key"`
SortKey string `avro:"sort_key"`
Version string `avro:"version"`
UpdatedAt string `avro:"updated_at"`
Tags *[]*string `avro:"tags"`
Data *string `avro:"data"`
Encoding *string `avro:"encoding"`
Schema *int64 `avro:"schema"`
LookupKeys *[]*string `avro:"lookup_keys"`
}
type DbzOpType string
type DbzSource struct {
Version string `avro:"version"`
Connector string `avro:"connector"`
Name string `avro:"name"`
TsMs int64 `avro:"ts_ms"`
TsUs *int64 `avro:"ts_us"`
TsNs *int64 `avro:"ts_ns"`
Snapshot *string `avro:"snapshot"`
Db string `avro:"db"`
Sequence *string `avro:"sequence"`
Schema string `avro:"schema"`
Table string `avro:"table"`
TxID *int64 `avro:"txId"`
Lsn *int64 `avro:"lsn"`
Xmin *int64 `avro:"xmin"`
}
type DbzBlock struct {
ID string `avro:"id"`
TotalOrder int64 `avro:"total_order"`
DataCollectionOrder int64 `avro:"data_collection_order"`
}
type DbzEnvelope[T any] struct {
Before *T `avro:"before"`
After *T `avro:"after"`
Source DbzSource `avro:"source"`
Op DbzOpType `avro:"op"`
TsMs *int64 `avro:"ts_ms"`
TsUs *int64 `avro:"ts_us"`
TsNs *int64 `avro:"ts_ns"`
Transaction *DbzBlock `avro:"transaction"`
}
The root cause
I think it is caused by adding a RecordSchema to the schema cache before it is fully parsed.
Let me try to elaborate.
-
First a slice of fields containing all nils is allocated at
Line 247 in 45d131b
fields := make([]*Field, len(r.Fields)) -
A record is constructed at
Lines 254 to 262 in 45d131b
case Record: rec, err = NewRecordSchema(r.Name, r.Namespace, fields, WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), ) case Error: rec, err = NewErrorRecordSchema(r.Name, r.Namespace, fields, WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), ) } -
The record is added to the cache at
Line 272 in 45d131b
cache.Add(rec.FullName(), ref) -
but the fields are not populated before the code reaches
Lines 277 to 283 in 45d131b
for i, f := range r.Fields { field, err := parseField(rec.namespace, f, seen, cache) if err != nil { return nil, err } fields[i] = field }
This can cause others to use the cache with nil fields, causing the stack trace above.
I think the solution would be to parse the fields before adding it to the cache.
Bonus stack trace against v2.8.1
The bug was found while on an older version (v2.8.1). The stack trace below is when tested against that version:
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x8 pc=0x1043a81cc]
goroutine 11148 [running]:
github.com/hamba/avro/v2.(*Field).Name(...)
external/com_github_hamba_avro_v2/schema.go:600
github.com/hamba/avro/v2.decoderOfStruct(0x1400019a580, {0x1047a8348?, 0x14007c3f550}, {0x1047ad2a0, 0x140003bfef0})
external/com_github_hamba_avro_v2/codec_record.go:62 +0x11c
github.com/hamba/avro/v2.createDecoderOfRecord(0x1400019a580, {0x1047a8348, 0x14007c3f550}, {0x1047ad2a0, 0x140003bfef0})
external/com_github_hamba_avro_v2/codec_record.go:16 +0x118
github.com/hamba/avro/v2.decoderOfType(0x1400019a580, {0x1047a8348, 0x14007c3f550}, {0x1047ad2a0, 0x140003bfef0})
external/com_github_hamba_avro_v2/codec.go:96 +0x2e8
github.com/hamba/avro/v2.decoderOfType(0x1400019a580, {0x1047a8310, 0x14034796f60}, {0x1047ad2a0, 0x140003bfef0})
external/com_github_hamba_avro_v2/codec.go:99 +0x4dc
github.com/hamba/avro/v2.decoderOfPtrUnion(0x1400019a580, {0x1047a8230?, 0x14015a44960}, {0x1047ad160, 0x140003bfe90})
external/com_github_hamba_avro_v2/codec_union.go:161 +0xbc
github.com/hamba/avro/v2.createDecoderOfUnion(0x1400019a580, {0x1047a8230, 0x14015a44960}, {0x1047ad160, 0x140003bfe90})
external/com_github_hamba_avro_v2/codec_union.go:26 +0xb8
github.com/hamba/avro/v2.decoderOfType(0x1400019a580, {0x1047a8230, 0x14015a44960}, {0x1047ad160, 0x140003bfe90})
external/com_github_hamba_avro_v2/codec.go:111 +0x278
github.com/hamba/avro/v2.decoderOfStruct(0x1400019a580, {0x1047a8348?, 0x14002143550}, {0x1047ad2a0, 0x140003bfc80})
external/com_github_hamba_avro_v2/codec_record.go:80 +0x1cc
github.com/hamba/avro/v2.createDecoderOfRecord(0x1400019a580, {0x1047a8348, 0x14002143550}, {0x1047ad2a0, 0x140003bfc80})
external/com_github_hamba_avro_v2/codec_record.go:16 +0x118
github.com/hamba/avro/v2.decoderOfType(0x1400019a580, {0x1047a8348, 0x14002143550}, {0x1047ad2a0, 0x140003bfc80})
external/com_github_hamba_avro_v2/codec.go:96 +0x2e8
github.com/hamba/avro/v2.(*frozenConfig).DecoderOf(0x1400019a580, {0x1047a8348, 0x14002143550}, {0x1047ad160, 0x1400092fc20})
external/com_github_hamba_avro_v2/codec.go:74 +0xb8
github.com/hamba/avro/v2.(*Reader).ReadVal(0x1400169a8c0, {0x1047a8348, 0x14002143550}, {0x1046d9380, 0x1400236b7a0})
external/com_github_hamba_avro_v2/codec.go:44 +0xd4
github.com/hamba/avro/v2.(*frozenConfig).Unmarshal(0x1400019a580, {0x1047a8348, 0x14002143550}, {0x140003fe1c0?, 0x0?, 0x0?}, {0x1046d9380, 0x1400236b7a0})
external/com_github_hamba_avro_v2/config.go:143 +0x58
github.com/hamba/avro/v2.Unmarshal(...)
external/com_github_hamba_avro_v2/decoder.go:49
<redacted>.Test_Concurrent.func1()
<redacted>/avrohandler_test.go:81 +0xf8
created by <redacted>.Test_Concurrent in goroutine 9
<redacted>/avrohandler_test.go:74 +0x6f8