Skip to content

util/log: add buffer sink decorator#70330

Merged
craig[bot] merged 13 commits intocockroachdb:masterfrom
rauchenstein:buffer_sink
Nov 4, 2021
Merged

util/log: add buffer sink decorator#70330
craig[bot] merged 13 commits intocockroachdb:masterfrom
rauchenstein:buffer_sink

Conversation

@rauchenstein
Copy link
Copy Markdown
Contributor

Previously, only the file sink had buffering, and in that case it is
built into the sink. It's important to add buffering to network sinks
for various reasons -- reducing network chatter, and making the
networking call itself asynchronous so the log call returns with very
low latency.

This change adds a buffering decorator so that buffering can be added to
any log sink with little or no development effort, and allowing
buffering to be configured in a uniform way.

Release note (cli change): Add buffering to log sinks. This can be
configured with the new "buffering" field on any log sink provided via
the "--log" or "--log-config-file" flags.

Release justification: This change is safe because it is a no-op without
a configuration change specifically enabling it.

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

Copy link
Copy Markdown
Contributor

@knz knz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General recommendation: be careful with channel writes, which could block. Also select on read on ctx.Done during channel writes.

Reviewed 11 of 11 files at r1, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @rauchenstein)


pkg/util/log/buffer_sink.go, line 20 at r1 (raw file):

	"github.com/cockroachdb/cockroach/pkg/cli/exit"
)

It'd be useful to have a top level comment here which outlines the overall architecture.


pkg/util/log/buffer_sink.go, line 21 at r1 (raw file):

)

type bufferSinkMessage struct {

would appreciate some explanatory comments for each struct and the fields therein. Will make the review easier.


pkg/util/log/buffer_sink.go, line 63 at r1 (raw file):

	}

	messageCh := make(chan bufferSinkMessage)      // sends messages to the accumulator

place these things in a struct (perhaps the bufferSink)


pkg/util/log/buffer_sink.go, line 74 at r1 (raw file):

	// Accumulator
	go func() {

please extract these two anonymous functions into separate top-level named functions (or perhaps method on bufferSink).


pkg/util/log/buffer_sink.go, line 112 at r1 (raw file):

					flushCh <- b
					atomic.AddInt32(&nQueuedFlushes, 1)
					reset()

something needs to be done about resetting the timer here.


pkg/util/log/buffer_sink.go, line 131 at r1 (raw file):

			case b := <-flushCh:
				// Calculate size of all-but-first buffer.
				tailSize := 0

make the size computation a method on b.


pkg/util/log/buffer_sink.go, line 197 at r1 (raw file):

func (s *bufferSink) output(b []byte, opts sinkOutputOptions) error {
	// Make a copy to live in the async buffer.
	// We can't take ownership of the slice we're passed --

suggestion: we can swap the byte slice inside the buffer with an empty one, to take the ownership.

Copy link
Copy Markdown
Contributor Author

@rauchenstein rauchenstein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @knz)


pkg/util/log/buffer_sink.go, line 20 at r1 (raw file):

Previously, knz (kena) wrote…

It'd be useful to have a top level comment here which outlines the overall architecture.

Done.


pkg/util/log/buffer_sink.go, line 21 at r1 (raw file):

Previously, knz (kena) wrote…

would appreciate some explanatory comments for each struct and the fields therein. Will make the review easier.

Done.


pkg/util/log/buffer_sink.go, line 63 at r1 (raw file):

Previously, knz (kena) wrote…

place these things in a struct (perhaps the bufferSink)

Done.


pkg/util/log/buffer_sink.go, line 74 at r1 (raw file):

Previously, knz (kena) wrote…

please extract these two anonymous functions into separate top-level named functions (or perhaps method on bufferSink).

Done (methods).


pkg/util/log/buffer_sink.go, line 112 at r1 (raw file):

Previously, knz (kena) wrote…

something needs to be done about resetting the timer here.

reset() resets the timer.


pkg/util/log/buffer_sink.go, line 131 at r1 (raw file):

Previously, knz (kena) wrote…

make the size computation a method on b.

Similar is done. I moved the accumulator's byte count to be maintained within the bundle, so it's available as a data field.


pkg/util/log/buffer_sink.go, line 197 at r1 (raw file):

Previously, knz (kena) wrote…

suggestion: we can swap the byte slice inside the buffer with an empty one, to take the ownership.

I don't think that's possible, since log.buffer is backed by bytes.Buffer (not a pointer), and I don't see a way for that to relinquish ownership of its backing data (perhaps so it can do some small-value optimization).
Even if it was possible, we'd be draining the buffer pool of buffers with useful capacity (so those buffers have to allocate new backing data on first use), and leaving the backing arrays for the slices we take ownership of to be GC'ed, which is part of what the whole buffer-pooling scheme was trying to avoid.

I think we'd get similar downsides in a more understandable form by just allocating new byte slices here and not using log.buffers.

@rauchenstein rauchenstein changed the title [wip] util/log: add buffer sink decorator util/log: add buffer sink decorator Oct 11, 2021
@rauchenstein rauchenstein requested a review from knz October 11, 2021 16:37
Copy link
Copy Markdown
Contributor

@knz knz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two suggestions:

  • run this under stressrace
  • run this with the new deadlock detector that Erik enabled (ask test eng for the test flags)

Reviewed 6 of 7 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @knz, @rauchenstein, and @rimadeodhar)


pkg/util/log/buffer_sink.go, line 20 at r1 (raw file):

Previously, rauchenstein wrote…

Done.

Maybe some more prose would be useful:

  • what is the initial condition
  • what happens (outline) when the first log event comes in, and the buffer is not yet full
  • what happens (outline) when the buffer is full and a log event comes in
  • what happens in the error case

pkg/util/log/buffer_sink.go, line 26 at r3 (raw file):

// by the forceSync option).
type bufferSink struct {
	// child is the wrapped logSink

nit: here and below: periods at the end of sentences


pkg/util/log/buffer_sink.go, line 83 at r3 (raw file):

	var (
		b     bufferSinkBundle
		timer <-chan time.Time // staleness timer

Add some explanation for this that explains when the timer is created, when it is reset etc.


pkg/util/log/buffer_sink.go, line 168 at r3 (raw file):

}

// bufferSinkMessage holds an individual log message sent from output to accumulate

nit: here and below, periods at the end of comments (comments are sentences).


pkg/util/log/buffer_sink.go, line 173 at r3 (raw file):

	// flush is set if the call explicitly requests to trigger an flush
	flush bool
	// errorCh is set iff the message is forceSync

separate sentences with periods and capitals.


pkg/util/log/buffer_sink_test.go, line 39 at r3 (raw file):

}

func TestBufferOneLine(t *testing.T) {

for all tests here, use defer leaktest.AfterTest(t) to verify there are no stray goroutines after the test ends.


pkg/util/log/buffer_sink_test.go, line 47 at r3 (raw file):

	require.NoError(t, sink.output(message, sinkOutputOptions{extraFlush: true}))
	<-time.After(time.Second)

this is not needed if cleanup works.

ditto below.


pkg/util/log/flags.go, line 403 at r3 (raw file):

func attachBufferWrapper(ctx context.Context, s *sinkInfo, c logconfig.CommonSinkConfig) {
	b := c.Buffering

From a UX perspective, it's not so easy / clear how to disable buffering entirely with the current mechanism. Should I write:

http-servers:
   foo:
     buffering: nil

?

or something else entirely?

This looks cumbersome. Can I instead write:

http-servers:
   foo:
     buffered: false

(NB network sinks should be buffered by default unless buffering disabled by auditable: true. )

(we also probably don't want buffering: {enabled: false} which is a tad too verbose)


pkg/util/log/logconfig/config.go, line 187 at r3 (raw file):

	// Buffering configures buffering for this log sink.
	Buffering *BufferSinkConfig `yaml:",omitempty"`

ok but please add tests that check that defaults are propagated

(will need to update the recursive default propagation logic to handle pointers to structs. Beware of the nil case.)

Copy link
Copy Markdown
Contributor

@knz knz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also extend the commit message / PR description to explain that this change is not replacing the file buffering just yet, we want to exercise this code with the new sink types before we use it also for files. However, mention that we do aim to reuse the same buffering code for files too, and so that next cleanup/improvement should be planned ASAP.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @knz, @rauchenstein, and @rimadeodhar)

Copy link
Copy Markdown
Collaborator

@rimadeodhar rimadeodhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @knz, @rauchenstein, and @rimadeodhar)


pkg/util/log/buffer_sink.go, line 134 at r3 (raw file):

		case b := <-bs.flushCh:
			// Append all the messages in the first buffer.
			buf := b.messages[0].b

Would it be better to have a safety check here to confirm that len(b.messages) > 0?


pkg/util/log/buffer_sink.go, line 152 at r3 (raw file):

			if forceSync {
				b.errorCh <- err
			} else if err != nil && bs.errCallback != nil {

So the err will not be forwarded to the errCallback when forceSync is true i.e when b.errorCh is non nil? Why do we need these two mechanisms for forwarding the error?


pkg/util/log/buffer_sink.go, line 156 at r3 (raw file):

				// Temporarily disable this sink so it's skipped by
				// any logging in the callback.
				bs.inErrorState = true

I suspect we'll need a mutex around inErrorState?


pkg/util/log/exit_override.go, line 101 at r3 (raw file):

		if logpb.Severity_ERROR >= s.threshold.get(entry.ch) && sink.active() {
			buf := s.formatter.formatEntry(entry)
			_ = sink.output(buf.Bytes(), sinkOutputOptions{ignoreErrors: true})

Nit, if we don't need the return value, we can just skip the assignment i.e. remove the _ = ?

sink.output(buf.Bytes(), sinkOutputOptions{ignoreErrors: true})

pkg/util/log/sinks.go, line 24 at r3 (raw file):

	ignoreErrors bool
	// forceSync forces synchronous operation of this output operation.
	// This is, it will block until the output has been handled.

Nit: Typo - Should beThat is, I'm guessing.

Copy link
Copy Markdown
Contributor Author

@rauchenstein rauchenstein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @knz and @rimadeodhar)


pkg/util/log/buffer_sink.go, line 20 at r1 (raw file):

Previously, knz (kena) wrote…

Maybe some more prose would be useful:

  • what is the initial condition
  • what happens (outline) when the first log event comes in, and the buffer is not yet full
  • what happens (outline) when the buffer is full and a log event comes in
  • what happens in the error case

I elaborated.


pkg/util/log/buffer_sink.go, line 83 at r3 (raw file):

Previously, knz (kena) wrote…

Add some explanation for this that explains when the timer is created, when it is reset etc.

Done.


pkg/util/log/buffer_sink.go, line 134 at r3 (raw file):

Previously, rimadeodhar (Rima Deodhar) wrote…

Would it be better to have a safety check here to confirm that len(b.messages) > 0?

Indeed it would be, and in fact that's a case that could happen in the latest refactor.


pkg/util/log/buffer_sink.go, line 152 at r3 (raw file):

Previously, rimadeodhar (Rima Deodhar) wrote…

So the err will not be forwarded to the errCallback when forceSync is true i.e when b.errorCh is non nil? Why do we need these two mechanisms for forwarding the error?

As relevant an error as possible should be returned by output. In the async case, an error in the child sink cannot be returned because output simply queued up the message to be flushed by the child sink later, so we use a callback mechanism. In the synchronous case, we have to wait for the child sink anyways, so we should return its error.


pkg/util/log/buffer_sink.go, line 156 at r3 (raw file):

Previously, rimadeodhar (Rima Deodhar) wrote…

I suspect we'll need a mutex around inErrorState?

No, this is the only thread that modifies that value.


pkg/util/log/buffer_sink.go, line 173 at r3 (raw file):

Previously, knz (kena) wrote…

separate sentences with periods and capitals.

Done.


pkg/util/log/buffer_sink_test.go, line 39 at r3 (raw file):

Previously, knz (kena) wrote…

for all tests here, use defer leaktest.AfterTest(t) to verify there are no stray goroutines after the test ends.

Done.


pkg/util/log/buffer_sink_test.go, line 47 at r3 (raw file):

Previously, knz (kena) wrote…

this is not needed if cleanup works.

ditto below.

It was actually, because after getting the signal from ctx.Done(), triggering a final flush then shutdown from bufferSync, the calls from that final flush may not have occurred before the call to ctrl.Finish() in the main thread (which is where gomock checks the received calls against its expectations). (This was the case even when the calls in cleanup() were swapped to be ordered properly.)

I found a better way to do this using waitgroups (or just a channel for synchronization) to make the synchronization explicit instead of just "waiting long enough."


pkg/util/log/exit_override.go, line 101 at r3 (raw file):

Previously, rimadeodhar (Rima Deodhar) wrote…

Nit, if we don't need the return value, we can just skip the assignment i.e. remove the _ = ?

sink.output(buf.Bytes(), sinkOutputOptions{ignoreErrors: true})

There are linter errors for ignoring the returned error. This lets the linter know that we're discarding the error on purpose.


pkg/util/log/flags.go, line 403 at r3 (raw file):

Previously, knz (kena) wrote…

From a UX perspective, it's not so easy / clear how to disable buffering entirely with the current mechanism. Should I write:

http-servers:
   foo:
     buffering: nil

?

or something else entirely?

This looks cumbersome. Can I instead write:

http-servers:
   foo:
     buffered: false

(NB network sinks should be buffered by default unless buffering disabled by auditable: true. )

(we also probably don't want buffering: {enabled: false} which is a tad too verbose)

Discussed with Ryan Kuo. Created new type for "buffering" that supports the existing structure or "NONE" (case insensitive). (Actually took a second pass at it too after I realized it could be simplified.)


pkg/util/log/sinks.go, line 24 at r3 (raw file):

Previously, rimadeodhar (Rima Deodhar) wrote…

Nit: Typo - Should beThat is, I'm guessing.

Good eye! Fixed.


pkg/util/log/logconfig/config.go, line 187 at r3 (raw file):

Previously, knz (kena) wrote…

ok but please add tests that check that defaults are propagated

(will need to update the recursive default propagation logic to handle pointers to structs. Beware of the nil case.)

Didn't end up needing to update propagation; it ended up being cleaner to specify the struct by value. Still need to add the tests -- had to finalize the disabling logic first, since that interacts interestingly with propagation.

@rauchenstein rauchenstein force-pushed the buffer_sink branch 2 times, most recently from 9815a62 to a9e65c4 Compare October 19, 2021 18:19
@rauchenstein rauchenstein requested a review from a team as a code owner October 19, 2021 18:19
@rauchenstein rauchenstein force-pushed the buffer_sink branch 2 times, most recently from 3191fca to e7ff8cb Compare October 21, 2021 23:45
@rauchenstein rauchenstein requested a review from a team as a code owner October 26, 2021 21:59
Previously, only the file sink had buffering, and in that case it is
built into the sink.  It's important to add buffering to network sinks
for various reasons -- reducing network chatter, and making the
networking call itself asynchronous so the log call returns with very
low latency.

This change adds a buffering decorator so that buffering can be added to
any log sink with little or no development effort, and allowing
buffering to be configured in a uniform way.

This functionality is not yet enabled by default and does not yet
replace the existing functionality for file sinks, though both these
things are intended in the future.

Release note (cli change): Add buffering to log sinks. This can be
configured with the new "buffering" field on any log sink provided via
the "--log" or "--log-config-file" flags.

Release justification: This change is safe because it is a no-op without
a configuration change specifically enabling it.
@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 3, 2021

Some findings while trying this on my local computer:

  1. For further reference - with fluent output, we get the following bench difference:
name              old time/op    new time/op    delta
LogFileOutput-32    13.3µs ± 6%     9.6µs ± 5%   -27.67%  (p=0.008 n=5+5)

name              old alloc/op   new alloc/op   delta
LogFileOutput-32      233B ± 0%      660B ± 0%  +183.43%  (p=0.008 n=5+5)

name              old allocs/op  new allocs/op  delta
LogFileOutput-32      8.00 ± 0%      9.00 ± 0%   +12.50%  (p=0.008 n=5+5)

(no further action required on this)

  1. the configuration fails with a file error when trying to add buffering to file sinks. I will try to add a guardrail about this.

@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 3, 2021

Ok so I have dug down into this a bit more. I have added several comments inline (as an additional commit on the branch) which I will want to investigate further a bit later.

There are three high level questions that popped up during this review. no action needs to be taken on them at this time; I want to mull over them a bit before we make a step forward.

  • is there a way to reduce the current bundle in the flusher goroutine when a write to the target sink fails partially, instead of retrying the entire bundle?

    I am especially wary of situations where either the network config or log collector refuse to process "large writes" in one go, and always chunk them up in smaller units, say 512 bytes at a time. In that case, we're bound to always observe a partial write and the flush would never make progress.

  • how is this code meant to interact with the log flush goroutine that already exists in the logging package?

  • what would be a good way to reconcile the previous log config option buffered-writes and the new buffering option?

rauchenstein and others added 2 commits November 4, 2021 21:29
There remain issues to be investigated surrounding the interaction of the new
buffered logging decorator and the file sink concerning synchronization at
shutdown.  Forbidding it from being enabled in the logconfig for now.

Release note: None
Release note: None
@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 4, 2021

Filed the following follow-up issues, and added links where appropriate:

Now that these follow-up issues are filed and linked, we can merge the PR (assuming CI is satisfied)

@rauchenstein
Copy link
Copy Markdown
Contributor Author

bors r=knz

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Nov 4, 2021

Build succeeded:

@craig craig bot merged commit ed35bbe into cockroachdb:master Nov 4, 2021
@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 8, 2021

@rauchenstein next time please don't forget to link the related github issue (with "Fixes ...")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants