|
| 1 | +package txmgr |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "errors" |
| 6 | + "math/big" |
| 7 | + "sync" |
| 8 | + "time" |
| 9 | + |
| 10 | + "github.com/ethereum/go-ethereum/common" |
| 11 | + "github.com/ethereum/go-ethereum/core/types" |
| 12 | + "github.com/ethereum/go-ethereum/log" |
| 13 | +) |
| 14 | + |
| 15 | +// ErrPublishTimeout signals that the tx manager did not receive a confirmation |
| 16 | +// for a given tx after publishing with the maximum gas price and waiting out a |
| 17 | +// resubmission timeout. |
| 18 | +var ErrPublishTimeout = errors.New("failed to publish tx with max gas price") |
| 19 | + |
| 20 | +// SendTxFunc defines a function signature for publishing a desired tx with a |
| 21 | +// specific gas price. Implementations of this signature should also return |
| 22 | +// promptly when the context is canceled. |
| 23 | +type SendTxFunc = func( |
| 24 | + ctx context.Context, gasPrice *big.Int) (*types.Transaction, error) |
| 25 | + |
| 26 | +// Config houses parameters for altering the behavior of a SimpleTxManager. |
| 27 | +type Config struct { |
| 28 | + // MinGasPrice is the minimum gas price (in gwei). This is used as the |
| 29 | + // initial publication attempt. |
| 30 | + MinGasPrice *big.Int |
| 31 | + |
| 32 | + // MaxGasPrice is the maximum gas price (in gwei). This is used to clamp |
| 33 | + // the upper end of the range that the TxManager will ever publish when |
| 34 | + // attempting to confirm a transaction. |
| 35 | + MaxGasPrice *big.Int |
| 36 | + |
| 37 | + // GasRetryIncrement is the additive gas price (in gwei) that will be |
| 38 | + // used to bump each successive tx after a ResubmissionTimeout has |
| 39 | + // elapsed. |
| 40 | + GasRetryIncrement *big.Int |
| 41 | + |
| 42 | + // ResubmissionTimeout is the interval at which, if no previously |
| 43 | + // published transaction has been mined, the new tx with a bumped gas |
| 44 | + // price will be published. Only one publication at MaxGasPrice will be |
| 45 | + // attempted. |
| 46 | + ResubmissionTimeout time.Duration |
| 47 | + |
| 48 | + // RequireQueryInterval is the interval at which the tx manager will |
| 49 | + // query the backend to check for confirmations after a tx at a |
| 50 | + // specific gas price has been published. |
| 51 | + ReceiptQueryInterval time.Duration |
| 52 | +} |
| 53 | + |
| 54 | +// TxManager is an interface that allows callers to reliably publish txs, |
| 55 | +// bumping the gas price if needed, and obtain the receipt of the resulting tx. |
| 56 | +type TxManager interface { |
| 57 | + // Send is used to publish a transaction with incrementally higher gas |
| 58 | + // prices until the transaction eventually confirms. This method blocks |
| 59 | + // until an invocation of sendTx returns (called with differing gas |
| 60 | + // prices). The method may be canceled using the passed context. |
| 61 | + // |
| 62 | + // NOTE: Send should be called by AT MOST one caller at a time. |
| 63 | + Send(ctx context.Context, sendTx SendTxFunc) (*types.Receipt, error) |
| 64 | +} |
| 65 | + |
| 66 | +// ReceiptSource is a minimal function signature used to detect the confirmation |
| 67 | +// of published txs. |
| 68 | +// |
| 69 | +// NOTE: This is a subset of bind.DeployBackend. |
| 70 | +type ReceiptSource interface { |
| 71 | + // TransactionReceipt queries the backend for a receipt associated with |
| 72 | + // txHash. If lookup does not fail, but the transaction is not found, |
| 73 | + // nil should be returned for both values. |
| 74 | + TransactionReceipt( |
| 75 | + ctx context.Context, txHash common.Hash) (*types.Receipt, error) |
| 76 | +} |
| 77 | + |
| 78 | +// SimpleTxManager is a implementation of TxManager that performs linear fee |
| 79 | +// bumping of a tx until it confirms. |
| 80 | +type SimpleTxManager struct { |
| 81 | + cfg Config |
| 82 | + backend ReceiptSource |
| 83 | +} |
| 84 | + |
| 85 | +// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config. |
| 86 | +func NewSimpleTxManager(cfg Config, backend ReceiptSource) *SimpleTxManager { |
| 87 | + return &SimpleTxManager{ |
| 88 | + cfg: cfg, |
| 89 | + backend: backend, |
| 90 | + } |
| 91 | +} |
| 92 | + |
| 93 | +// Send is used to publish a transaction with incrementally higher gas prices |
| 94 | +// until the transaction eventually confirms. This method blocks until an |
| 95 | +// invocation of sendTx returns (called with differing gas prices). The method |
| 96 | +// may be canceled using the passed context. |
| 97 | +// |
| 98 | +// NOTE: Send should be called by AT MOST one caller at a time. |
| 99 | +func (m *SimpleTxManager) Send( |
| 100 | + ctx context.Context, sendTx SendTxFunc) (*types.Receipt, error) { |
| 101 | + |
| 102 | + // Initialize a wait group to track any spawned goroutines, and ensure |
| 103 | + // we properly clean up any dangling resources this method generates. |
| 104 | + // We assert that this is the case thoroughly in our unit tests. |
| 105 | + var wg sync.WaitGroup |
| 106 | + defer wg.Wait() |
| 107 | + |
| 108 | + // Initialize a subcontext for the goroutines spawned in this process. |
| 109 | + // The defer to cancel is done here (in reverse order of Wait) so that |
| 110 | + // the goroutines can exit before blocking on the wait group. |
| 111 | + ctxc, cancel := context.WithCancel(ctx) |
| 112 | + defer cancel() |
| 113 | + |
| 114 | + // Create a closure that will block on passed sendTx function in the |
| 115 | + // background, returning the first successfully mined receipt back to |
| 116 | + // the main event loop via receiptChan. |
| 117 | + receiptChan := make(chan *types.Receipt, 1) |
| 118 | + sendTxAsync := func(gasPrice *big.Int) { |
| 119 | + defer wg.Done() |
| 120 | + |
| 121 | + // Sign and publish transaction with current gas price. |
| 122 | + tx, err := sendTx(ctxc, gasPrice) |
| 123 | + if err != nil { |
| 124 | + log.Error("Unable to publish transaction", |
| 125 | + "gas_price", gasPrice, "err", err) |
| 126 | + // TODO(conner): add retry? |
| 127 | + return |
| 128 | + } |
| 129 | + |
| 130 | + txHash := tx.Hash() |
| 131 | + log.Info("Transaction published successfully", "hash", txHash, |
| 132 | + "gas_price", gasPrice) |
| 133 | + |
| 134 | + // Wait for the transaction to be mined, reporting the receipt |
| 135 | + // back to the main event loop if found. |
| 136 | + receipt, err := WaitMined( |
| 137 | + ctxc, m.backend, tx, m.cfg.ReceiptQueryInterval, |
| 138 | + ) |
| 139 | + if err != nil { |
| 140 | + log.Trace("Send tx failed", "hash", txHash, |
| 141 | + "gas_price", gasPrice, "err", err) |
| 142 | + } |
| 143 | + if receipt != nil { |
| 144 | + // Use non-blocking select to ensure function can exit |
| 145 | + // if more than one receipt is discovered. |
| 146 | + select { |
| 147 | + case receiptChan <- receipt: |
| 148 | + log.Trace("Send tx succeeded", "hash", txHash, |
| 149 | + "gas_price", gasPrice) |
| 150 | + default: |
| 151 | + } |
| 152 | + } |
| 153 | + } |
| 154 | + |
| 155 | + // Initialize our initial gas price to the configured minimum. |
| 156 | + curGasPrice := new(big.Int).Set(m.cfg.MinGasPrice) |
| 157 | + |
| 158 | + // Submit and wait for the receipt at our first gas price in the |
| 159 | + // background, before entering the event loop and waiting out the |
| 160 | + // resubmission timeout. |
| 161 | + wg.Add(1) |
| 162 | + go sendTxAsync(curGasPrice) |
| 163 | + |
| 164 | + for { |
| 165 | + select { |
| 166 | + |
| 167 | + // Whenever a resubmission timeout has elapsed, bump the gas |
| 168 | + // price and publish a new transaction. |
| 169 | + case <-time.After(m.cfg.ResubmissionTimeout): |
| 170 | + // If our last attempt published at the max gas price, |
| 171 | + // return an error as we are unlikely to succeed in |
| 172 | + // publishing. This also indicates that the max gas |
| 173 | + // price should likely be adjusted higher for the |
| 174 | + // daemon. |
| 175 | + if curGasPrice.Cmp(m.cfg.MaxGasPrice) >= 0 { |
| 176 | + return nil, ErrPublishTimeout |
| 177 | + } |
| 178 | + |
| 179 | + // Bump the gas price using linear gas price increments. |
| 180 | + curGasPrice = NextGasPrice( |
| 181 | + curGasPrice, m.cfg.GasRetryIncrement, |
| 182 | + m.cfg.MaxGasPrice, |
| 183 | + ) |
| 184 | + |
| 185 | + // Submit and wait for the bumped traction to confirm. |
| 186 | + wg.Add(1) |
| 187 | + go sendTxAsync(curGasPrice) |
| 188 | + |
| 189 | + // The passed context has been canceled, i.e. in the event of a |
| 190 | + // shutdown. |
| 191 | + case <-ctxc.Done(): |
| 192 | + return nil, ctxc.Err() |
| 193 | + |
| 194 | + // The transaction has confirmed. |
| 195 | + case receipt := <-receiptChan: |
| 196 | + return receipt, nil |
| 197 | + } |
| 198 | + } |
| 199 | +} |
| 200 | + |
| 201 | +// WaitMined blocks until the backend indicates confirmation of tx and returns |
| 202 | +// the tx receipt. Queries are made every queryInterval, regardless of whether |
| 203 | +// the backend returns an error. This method can be canceled using the passed |
| 204 | +// context. |
| 205 | +func WaitMined( |
| 206 | + ctx context.Context, |
| 207 | + backend ReceiptSource, |
| 208 | + tx *types.Transaction, |
| 209 | + queryInterval time.Duration, |
| 210 | +) (*types.Receipt, error) { |
| 211 | + |
| 212 | + queryTicker := time.NewTicker(queryInterval) |
| 213 | + defer queryTicker.Stop() |
| 214 | + |
| 215 | + txHash := tx.Hash() |
| 216 | + |
| 217 | + for { |
| 218 | + receipt, err := backend.TransactionReceipt(ctx, txHash) |
| 219 | + if receipt != nil { |
| 220 | + return receipt, nil |
| 221 | + } |
| 222 | + |
| 223 | + if err != nil { |
| 224 | + log.Trace("Receipt retrievel failed", "hash", txHash, |
| 225 | + "err", err) |
| 226 | + } else { |
| 227 | + log.Trace("Transaction not yet mined", "hash", txHash) |
| 228 | + } |
| 229 | + |
| 230 | + select { |
| 231 | + case <-ctx.Done(): |
| 232 | + return nil, ctx.Err() |
| 233 | + case <-queryTicker.C: |
| 234 | + } |
| 235 | + } |
| 236 | +} |
| 237 | + |
| 238 | +// NextGasPrice bumps the current gas price using an additive gasRetryIncrement, |
| 239 | +// clamping the resulting value to maxGasPrice. |
| 240 | +// |
| 241 | +// NOTE: This method does not mutate curGasPrice, but instead returns a copy. |
| 242 | +// This removes the possiblity of races occuring from goroutines sharing access |
| 243 | +// to the same underlying big.Int. |
| 244 | +func NextGasPrice(curGasPrice, gasRetryIncrement, maxGasPrice *big.Int) *big.Int { |
| 245 | + nextGasPrice := new(big.Int).Set(curGasPrice) |
| 246 | + nextGasPrice.Add(nextGasPrice, gasRetryIncrement) |
| 247 | + if nextGasPrice.Cmp(maxGasPrice) == 1 { |
| 248 | + nextGasPrice.Set(maxGasPrice) |
| 249 | + } |
| 250 | + return nextGasPrice |
| 251 | +} |
0 commit comments