Skip to content

Commit 5097797

Browse files
committed
feat: add batch-submitter/txmgr for tx gas bumping
This commit adds a SimpleTxManager for tracking and bumping fees on txs the batch submitter needs to publish. The bulk of the logic is adapated from the existing tx manager (YNATM) used in the typescript version to minimize any new classes of bugs that are not already considered. The manager is configured via a min and max gas price, as well as an additive gas price step that is applied after each resubmission interval elapses, before signing and broadcasting a new transaction. This corresponds to the LINEAR fee policy available in YNATM. Txs generated from the same call to Send are treated as equivalent, thus the method blocks until the first tx confirms. Care is taken to throughly unit test the interactions and edge cases, as subtle bugs in tx publication can lead to big headaches in prod. To this end, we achieve 100% test coverage in the txmgr package: ``` coverage: 100.0% of statements ok github.com/ethereum-optimism/go/batch-submitter/txmgr 10.311s ```
1 parent f7380e1 commit 5097797

2 files changed

Lines changed: 682 additions & 0 deletions

File tree

go/batch-submitter/txmgr/txmgr.go

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
package txmgr
2+
3+
import (
4+
"context"
5+
"errors"
6+
"math/big"
7+
"sync"
8+
"time"
9+
10+
"github.com/ethereum/go-ethereum/common"
11+
"github.com/ethereum/go-ethereum/core/types"
12+
"github.com/ethereum/go-ethereum/log"
13+
)
14+
15+
// ErrPublishTimeout signals that the tx manager did not receive a confirmation
16+
// for a given tx after publishing with the maximum gas price and waiting out a
17+
// resubmission timeout.
18+
var ErrPublishTimeout = errors.New("failed to publish tx with max gas price")
19+
20+
// SendTxFunc defines a function signature for publishing a desired tx with a
21+
// specific gas price. Implementations of this signature should also return
22+
// promptly when the context is canceled.
23+
type SendTxFunc = func(
24+
ctx context.Context, gasPrice *big.Int) (*types.Transaction, error)
25+
26+
// Config houses parameters for altering the behavior of a SimpleTxManager.
27+
type Config struct {
28+
// MinGasPrice is the minimum gas price (in gwei). This is used as the
29+
// initial publication attempt.
30+
MinGasPrice *big.Int
31+
32+
// MaxGasPrice is the maximum gas price (in gwei). This is used to clamp
33+
// the upper end of the range that the TxManager will ever publish when
34+
// attempting to confirm a transaction.
35+
MaxGasPrice *big.Int
36+
37+
// GasRetryIncrement is the additive gas price (in gwei) that will be
38+
// used to bump each successive tx after a ResubmissionTimeout has
39+
// elapsed.
40+
GasRetryIncrement *big.Int
41+
42+
// ResubmissionTimeout is the interval at which, if no previously
43+
// published transaction has been mined, the new tx with a bumped gas
44+
// price will be published. Only one publication at MaxGasPrice will be
45+
// attempted.
46+
ResubmissionTimeout time.Duration
47+
48+
// RequireQueryInterval is the interval at which the tx manager will
49+
// query the backend to check for confirmations after a tx at a
50+
// specific gas price has been published.
51+
ReceiptQueryInterval time.Duration
52+
}
53+
54+
// TxManager is an interface that allows callers to reliably publish txs,
55+
// bumping the gas price if needed, and obtain the receipt of the resulting tx.
56+
type TxManager interface {
57+
// Send is used to publish a transaction with incrementally higher gas
58+
// prices until the transaction eventually confirms. This method blocks
59+
// until an invocation of sendTx returns (called with differing gas
60+
// prices). The method may be canceled using the passed context.
61+
//
62+
// NOTE: Send should be called by AT MOST one caller at a time.
63+
Send(ctx context.Context, sendTx SendTxFunc) (*types.Receipt, error)
64+
}
65+
66+
// ReceiptSource is a minimal function signature used to detect the confirmation
67+
// of published txs.
68+
//
69+
// NOTE: This is a subset of bind.DeployBackend.
70+
type ReceiptSource interface {
71+
// TransactionReceipt queries the backend for a receipt associated with
72+
// txHash. If lookup does not fail, but the transaction is not found,
73+
// nil should be returned for both values.
74+
TransactionReceipt(
75+
ctx context.Context, txHash common.Hash) (*types.Receipt, error)
76+
}
77+
78+
// SimpleTxManager is a implementation of TxManager that performs linear fee
79+
// bumping of a tx until it confirms.
80+
type SimpleTxManager struct {
81+
cfg Config
82+
backend ReceiptSource
83+
}
84+
85+
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
86+
func NewSimpleTxManager(cfg Config, backend ReceiptSource) *SimpleTxManager {
87+
return &SimpleTxManager{
88+
cfg: cfg,
89+
backend: backend,
90+
}
91+
}
92+
93+
// Send is used to publish a transaction with incrementally higher gas prices
94+
// until the transaction eventually confirms. This method blocks until an
95+
// invocation of sendTx returns (called with differing gas prices). The method
96+
// may be canceled using the passed context.
97+
//
98+
// NOTE: Send should be called by AT MOST one caller at a time.
99+
func (m *SimpleTxManager) Send(
100+
ctx context.Context, sendTx SendTxFunc) (*types.Receipt, error) {
101+
102+
// Initialize a wait group to track any spawned goroutines, and ensure
103+
// we properly clean up any dangling resources this method generates.
104+
// We assert that this is the case thoroughly in our unit tests.
105+
var wg sync.WaitGroup
106+
defer wg.Wait()
107+
108+
// Initialize a subcontext for the goroutines spawned in this process.
109+
// The defer to cancel is done here (in reverse order of Wait) so that
110+
// the goroutines can exit before blocking on the wait group.
111+
ctxc, cancel := context.WithCancel(ctx)
112+
defer cancel()
113+
114+
// Create a closure that will block on passed sendTx function in the
115+
// background, returning the first successfully mined receipt back to
116+
// the main event loop via receiptChan.
117+
receiptChan := make(chan *types.Receipt, 1)
118+
sendTxAsync := func(gasPrice *big.Int) {
119+
defer wg.Done()
120+
121+
// Sign and publish transaction with current gas price.
122+
tx, err := sendTx(ctxc, gasPrice)
123+
if err != nil {
124+
log.Error("Unable to publish transaction",
125+
"gas_price", gasPrice, "err", err)
126+
// TODO(conner): add retry?
127+
return
128+
}
129+
130+
txHash := tx.Hash()
131+
log.Info("Transaction published successfully", "hash", txHash,
132+
"gas_price", gasPrice)
133+
134+
// Wait for the transaction to be mined, reporting the receipt
135+
// back to the main event loop if found.
136+
receipt, err := WaitMined(
137+
ctxc, m.backend, tx, m.cfg.ReceiptQueryInterval,
138+
)
139+
if err != nil {
140+
log.Trace("Send tx failed", "hash", txHash,
141+
"gas_price", gasPrice, "err", err)
142+
}
143+
if receipt != nil {
144+
// Use non-blocking select to ensure function can exit
145+
// if more than one receipt is discovered.
146+
select {
147+
case receiptChan <- receipt:
148+
log.Trace("Send tx succeeded", "hash", txHash,
149+
"gas_price", gasPrice)
150+
default:
151+
}
152+
}
153+
}
154+
155+
// Initialize our initial gas price to the configured minimum.
156+
curGasPrice := new(big.Int).Set(m.cfg.MinGasPrice)
157+
158+
// Submit and wait for the receipt at our first gas price in the
159+
// background, before entering the event loop and waiting out the
160+
// resubmission timeout.
161+
wg.Add(1)
162+
go sendTxAsync(curGasPrice)
163+
164+
for {
165+
select {
166+
167+
// Whenever a resubmission timeout has elapsed, bump the gas
168+
// price and publish a new transaction.
169+
case <-time.After(m.cfg.ResubmissionTimeout):
170+
// If our last attempt published at the max gas price,
171+
// return an error as we are unlikely to succeed in
172+
// publishing. This also indicates that the max gas
173+
// price should likely be adjusted higher for the
174+
// daemon.
175+
if curGasPrice.Cmp(m.cfg.MaxGasPrice) >= 0 {
176+
return nil, ErrPublishTimeout
177+
}
178+
179+
// Bump the gas price using linear gas price increments.
180+
curGasPrice = NextGasPrice(
181+
curGasPrice, m.cfg.GasRetryIncrement,
182+
m.cfg.MaxGasPrice,
183+
)
184+
185+
// Submit and wait for the bumped traction to confirm.
186+
wg.Add(1)
187+
go sendTxAsync(curGasPrice)
188+
189+
// The passed context has been canceled, i.e. in the event of a
190+
// shutdown.
191+
case <-ctxc.Done():
192+
return nil, ctxc.Err()
193+
194+
// The transaction has confirmed.
195+
case receipt := <-receiptChan:
196+
return receipt, nil
197+
}
198+
}
199+
}
200+
201+
// WaitMined blocks until the backend indicates confirmation of tx and returns
202+
// the tx receipt. Queries are made every queryInterval, regardless of whether
203+
// the backend returns an error. This method can be canceled using the passed
204+
// context.
205+
func WaitMined(
206+
ctx context.Context,
207+
backend ReceiptSource,
208+
tx *types.Transaction,
209+
queryInterval time.Duration,
210+
) (*types.Receipt, error) {
211+
212+
queryTicker := time.NewTicker(queryInterval)
213+
defer queryTicker.Stop()
214+
215+
txHash := tx.Hash()
216+
217+
for {
218+
receipt, err := backend.TransactionReceipt(ctx, txHash)
219+
if receipt != nil {
220+
return receipt, nil
221+
}
222+
223+
if err != nil {
224+
log.Trace("Receipt retrievel failed", "hash", txHash,
225+
"err", err)
226+
} else {
227+
log.Trace("Transaction not yet mined", "hash", txHash)
228+
}
229+
230+
select {
231+
case <-ctx.Done():
232+
return nil, ctx.Err()
233+
case <-queryTicker.C:
234+
}
235+
}
236+
}
237+
238+
// NextGasPrice bumps the current gas price using an additive gasRetryIncrement,
239+
// clamping the resulting value to maxGasPrice.
240+
//
241+
// NOTE: This method does not mutate curGasPrice, but instead returns a copy.
242+
// This removes the possiblity of races occuring from goroutines sharing access
243+
// to the same underlying big.Int.
244+
func NextGasPrice(curGasPrice, gasRetryIncrement, maxGasPrice *big.Int) *big.Int {
245+
nextGasPrice := new(big.Int).Set(curGasPrice)
246+
nextGasPrice.Add(nextGasPrice, gasRetryIncrement)
247+
if nextGasPrice.Cmp(maxGasPrice) == 1 {
248+
nextGasPrice.Set(maxGasPrice)
249+
}
250+
return nextGasPrice
251+
}

0 commit comments

Comments
 (0)