Vdb 1154 transform on demand#32
Conversation
1f9a9d6 to
d895b3f
Compare
…cted error so that we can shortcut the withRetryLoop once the header is rechecked once for transformOnDemand command
d895b3f to
c11b94a
Compare
libraries/shared/logs/extractor.go
Outdated
|
|
||
| func earlierStartingBlockNumber(transformerBlock, watcherBlock int64) bool { | ||
| return transformerBlock < watcherBlock | ||
| func (extractor *LogExtractor) OverrideStartingAndEndingBlocks(startingBlock, endingBlock *int64) { |
There was a problem hiding this comment.
Instead of having these Override... methods, was thinking of passing these arguments into the Initializer. But we're not currently using the initializer in the tests, and if we were to rework the tests to do that, we'd potentially need to pass in ~4 more things as arguments that we're currently mocking in the tests. Maybe an ExtractorConfig struct to pass in would solve this?
Thought I'd get the PR up as is and get some feedback/think on it a bit.
c11b94a to
07473a1
Compare
07473a1 to
79e4c83
Compare
cmd/transformEventsOnDemand.go
Outdated
| // Overriding the normal behavior to just transform one block | ||
| extractor.OverrideStartingAndEndingBlocks(&blockNumber, &blockNumber) | ||
| extractor.OverrideRecheckHeaderCap(currentCheckCount) | ||
| ew.UnsetExpectedExtractorError() |
There was a problem hiding this comment.
These 3 lines are the crux of the changes that make this command work for just one block.
extractor.OverrideStartingAndEndingBlocksallows us to set the starting and ending blocks for the extractor, regardless of the ones that are configured for the transformers so that we can narrow down the scope of what this command does - just transforms one block's worth of logs.extractor.OverrideRecheckHeaderCapallows us to check a header more than the constants recheckHeaderCap of 5. It can be set to anything, but currently I'm fetching the check_count on the current header, and then checking it one more time.ew.UnsetExpectedExtractorError()removes the expected NoUncheckedHeaders error so that once the header is checked that one more time (or however we choose to configure the recheckHeaderCap) the watcher will return an error and the command exits. This was done to make sure that the command doesn't loop forever without doing anything productive.
There was a problem hiding this comment.
I dig this approach, but am also wondering whether we might want to do some deeper refactoring to facilitate graceful rechecking + exiting. Seems like UnsetExpectedExtractorError could create some unnecessary noise if it means that there are several retries and logged errors. Personally I'd be fine with introducing some duplication if writing new but very similar functions enables us to recheck and then exit with a success status, but also don't mind if we want to postpone while getting a first version live.
cmd/transformEventsOnDemand.go
Outdated
| @@ -0,0 +1,134 @@ | |||
| // Copyright © 2020 NAME HERE <EMAIL ADDRESS> | |||
| // | |||
| // Licensed under the Apache License, Version 2.0 (the "License"); | |||
There was a problem hiding this comment.
Probably don't want to include an Apache license
There was a problem hiding this comment.
good catch, not sure why my editor added the apache license. 🤔 I'll switch it to GNU GPL.
cmd/transformEventsOnDemand.go
Outdated
| Long: ``, | ||
| Run: func(cmd *cobra.Command, args []string) { | ||
| SubCommand = cmd.CalledAs() | ||
| LogWithCommand = *logrus.WithField("SubCommand", SubCommand) |
There was a problem hiding this comment.
Interesting - do we need to initialize this here for the logging to work? I may need to update the extractDiffs command... 😅
There was a problem hiding this comment.
I think we do - I was getting an error (below) when trying to run the command without initializing the LogWithCommand.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x2c pc=0x442dd8c]
goroutine 1 [running]:
github.com/sirupsen/logrus.(*Logger).level(...)
cmd/transformEventsOnDemand.go
Outdated
|
|
||
| currentCheckCount, err := getCurrentCheckCount(blockNumber, &db) | ||
| if err != nil { | ||
| panic("Failed to get current check_count from the headers table") |
There was a problem hiding this comment.
Probably don't want to panic here.
As an aside, I'm wondering if we might want to have executeOnDemand (and similar functions in the cmd directory) return an error so that we can use LogWithCommand.Fatal only at the highest level. Based on this, it seems like that might best facilitate clean shutdown by allowing in progress tasks to wrap up (and defers to be called, etc)
There was a problem hiding this comment.
Yep, great call. That article is really helpful. I'll probably add a story to the top of the backlog for this sort of cleanup in other commands. But in the mean time, I've switched to using Cobra.RunE instead of Cobra.Run for resetHeaderCheckCount which eventually does return Fatal.log:
b6362a1
There was a problem hiding this comment.
separate story to handle other commands: https://makerdao.atlassian.net/browse/VDB-1185
cmd/transformEventsOnDemand.go
Outdated
| // Overriding the normal behavior to just transform one block | ||
| extractor.OverrideStartingAndEndingBlocks(&blockNumber, &blockNumber) | ||
| extractor.OverrideRecheckHeaderCap(currentCheckCount) | ||
| ew.UnsetExpectedExtractorError() |
There was a problem hiding this comment.
I dig this approach, but am also wondering whether we might want to do some deeper refactoring to facilitate graceful rechecking + exiting. Seems like UnsetExpectedExtractorError could create some unnecessary noise if it means that there are several retries and logged errors. Personally I'd be fine with introducing some duplication if writing new but very similar functions enables us to recheck and then exit with a success status, but also don't mind if we want to postpone while getting a first version live.
cmd/transformEventsOnDemand.go
Outdated
| } | ||
| wg := sync.WaitGroup{} | ||
| wg.Add(1) | ||
| go transformEthEvents(&ew, &wg) |
There was a problem hiding this comment.
Wondering if we might be able to execute this synchronously and remove the wait group stuff
cmd/transformEventsOnDemand.go
Outdated
|
|
||
| func getCurrentCheckCount(blockNumber int64, db *postgres.DB) (int64, error) { | ||
| var count int64 | ||
| err := db.Get(&count, `SELECT check_count from headers where block_number = $1`, blockNumber) |
There was a problem hiding this comment.
would be cool to have this live in a repository to avoid including sql in the cmd directory, but I'm not sure if that concern merits initializing a repo 🤔
cmd/transformEventsOnDemand.go
Outdated
|
|
||
| // Setup bc and db objects | ||
| blockChain := getBlockChain() | ||
| db := utils.LoadPostgres(databaseConfig, blockChain.Node()) |
There was a problem hiding this comment.
Noticing that a lot of this code is similar to executeTransformers and wondering if we want to either extract shared code to a helper function or perhaps even configure this behavior as a CLI option for that command (rather than introducing a new command)
I know we've got medium/long term plans to enable extracting logs to run in a separate command from transforming logs - maybe it'd be worthwhile to add a note to that story/create a new linked story to address some of this stuff then?
libraries/shared/logs/extractor.go
Outdated
| return true | ||
| } | ||
|
|
||
| return *extractorBlock != int64(-1) && currentTransformerBlock > *extractorBlock |
There was a problem hiding this comment.
Looking through this code to reset numbers got me thinking... if we're introducing a new command to recheck a given header on demand, maybe that command could just zero out the check_count for the header in question? I think that would enable the standard execute command to automatically recheck that block, without needing to actually load/run the transformers here
There was a problem hiding this comment.
Oh interesting, so you're thinking that if we have a command to reset check_count to zero, then execute will automatically eventually pick up that header to fetch logs for, transform, etc? Great idea! I'll dig into that a bit further.
There was a problem hiding this comment.
Just pushed up some changes to remove the transformEventsOnDemand command in favor of a resetHeaderCheckCount command. Left some of the refactoring that was done perviously, like creating NewLogDelegator and NewLogExtractor method and passing those both into NewEventWatcher just in case we want to make some config changes to those in the future. But would be happy to remove those changes as well.
cmd/resetHeaderCheckCount.go
Outdated
| SubCommand = cmd.CalledAs() | ||
| LogWithCommand = *logrus.WithField("SubCommand", SubCommand) | ||
| LogWithCommand.Infof("Updating check_count for header %v set to 0.", blockNumber) | ||
| return resetHeaderCount(int64(blockNumber)) |
There was a problem hiding this comment.
One of the downsides to using RunE over Run is that I haven't figured out how to include a SubCommand. I am thinking of just wrapping this error with SubCommand information for now.
rmulhol
left a comment
There was a problem hiding this comment.
LGTM!
Can probably do this in a separate story, but I wonder if we want to create a Dockerfile for this command
cmd/resetHeaderCheckCount.go
Outdated
| var resetHeaderCheckCountCmd = &cobra.Command{ | ||
| Use: "resetHeaderCheckCount", | ||
| Short: "Resets header check_count for the given block number", | ||
| Long: `Resets check_count to zero for the given header so that the execute command may rechecked that header's logs |
libraries/shared/logs/extractor.go
Outdated
| return true | ||
| } | ||
|
|
||
| return *extractorBlock != int64(-1) && currentTransformerBlock > *extractorBlock |
There was a problem hiding this comment.
this is just a minor aesthetic thing (and maybe I've been looking at too much dss code 😂), but I'd be tempted to give these predicates names and combine them - something like:
isCurrentEndingBlockNil := extractorBlock == nil
isTransformerEndingBlockNil := currentTransformerBlock == int64(-1)
isCurrentEndingBlockAssigned := *extractorBlock != int64(-1)
isTransformerEndingBlockGreater := currentTransformerBlock > *extractorBlock
return isCurrentEndingBlockNil || isTransformerEndingBlockNil || (isCurrentEndingBlockAssigned && isTransformerEndingBlockGreater)
There was a problem hiding this comment.
Sounds good! The one thing is that we have to return from the method before setting isCurrentEndingBlockAssigned and isTransformerEndingBlockGreater if either of the blocks it's checking are nil. Otherwise we end up with a nil point exception.
libraries/shared/logs/extractor.go
Outdated
| return nil | ||
| } | ||
|
|
||
| func resetStartingBlockNumber(currentTransformerBlock int64, extractorBlock *int64) bool { |
There was a problem hiding this comment.
similarly very minor but I wonder if we should name these in a way that indicates they're checking a condition rather than actually resetting anything?
pkg/datastore/repository.go
Outdated
| @@ -28,6 +28,7 @@ type AddressRepository interface { | |||
| type CheckedHeadersRepository interface { | |||
| MarkHeaderChecked(headerID int64) error | |||
| MarkHeadersUnchecked(startingBlockNumber int64) error | |||
There was a problem hiding this comment.
the new function has me thinking maybe we should name this MarkHeadersUncheckedSince to communicate how much it does - wouldn't want to call this one by accident!
There was a problem hiding this comment.
Yep definitely! That's partially why I named this new method MarkSingleHeaderUnchecked. But I'll update this method too, just to be explicit.
| }) | ||
|
|
||
| Describe("MarkHeadersUnchecked", func() { | ||
| It("removes rows for headers <= starting block number", func() { |
There was a problem hiding this comment.
maybe more accurate to say "marks header with matching block number unchecked" or something?
| markHeaderOneCheckedErr := repo.MarkHeaderChecked(headerIdOne) | ||
| Expect(markHeaderOneCheckedErr).NotTo(HaveOccurred()) | ||
| markHeaderTwoCheckedErr := repo.MarkHeaderChecked(headerIdTwo) | ||
| Expect(markHeaderTwoCheckedErr).NotTo(HaveOccurred()) |
There was a problem hiding this comment.
Reluctant to suggest this bc of the amount of setup already required, but I wonder if we want a third block added to verify it doesn't mark subsequent headers unchecked
| Fetcher: fetcher.NewLogFetcher(bc), | ||
| LogRepository: repositories.NewEventLogRepository(db), | ||
| Syncer: transactions.NewTransactionsSyncer(db, bc), | ||
| RecheckHeaderCap: constants.RecheckHeaderCap, |
There was a problem hiding this comment.
👍 to moving this to assignment in the initializer - would be cool to be able to configure it via the CLI
There was a problem hiding this comment.
Yeah definitely. I think that being able to se this, and the Starting and Ending blocks via the CLI will help make it more flexible in the future. Especially for development. 🚀
- Name starting and ending block predicates in Extractor - Rename methods to determine if an extractor's block should be updated - Fix a typo in resetHeaderCheckCount command - Fix uncheck header repo tests - Rename MarkHeadersUnchecked -> MarkHeadersUncheckedSince
to run
./vulcanizedb transformEventsOnDemand --block-to-transform 9254881 --config=../vdb-mcd-transformers/environments/mcdTransformers.toml --log-level trace