@@ -17,6 +17,7 @@ use fnv::FnvHashMap;
1717use futures:: task:: Poll ;
1818use futures:: { Stream , StreamExt } ;
1919use lighthouse_network:: { MessageId , PeerId } ;
20+ use logging:: TimeLatch ;
2021use slog:: { crit, debug, error, warn, Logger } ;
2122use slot_clock:: SlotClock ;
2223use std:: collections:: { HashMap , HashSet } ;
@@ -133,6 +134,8 @@ struct ReprocessQueue<T: BeaconChainTypes> {
133134 /* Aux */
134135 /// Next attestation id, used for both aggregated and unaggregated attestations
135136 next_attestation : usize ,
137+ early_block_debounce : TimeLatch ,
138+ attestation_delay_debounce : TimeLatch ,
136139}
137140
138141#[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
@@ -223,6 +226,8 @@ pub fn spawn_reprocess_scheduler<T: BeaconChainTypes>(
223226 queued_unaggregates : FnvHashMap :: default ( ) ,
224227 awaiting_attestations_per_root : HashMap :: new ( ) ,
225228 next_attestation : 0 ,
229+ early_block_debounce : TimeLatch :: default ( ) ,
230+ attestation_delay_debounce : TimeLatch :: default ( ) ,
226231 } ;
227232
228233 executor. spawn (
@@ -261,12 +266,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
261266 if let Some ( duration_till_slot) = slot_clock. duration_to_slot ( block_slot) {
262267 // Check to ensure this won't over-fill the queue.
263268 if self . queued_block_roots . len ( ) >= MAXIMUM_QUEUED_BLOCKS {
264- warn ! (
265- log,
266- "Early blocks queue is full" ;
267- "queue_size" => MAXIMUM_QUEUED_BLOCKS ,
268- "msg" => "check system clock"
269- ) ;
269+ if self . early_block_debounce . elapsed ( ) {
270+ warn ! (
271+ log,
272+ "Early blocks queue is full" ;
273+ "queue_size" => MAXIMUM_QUEUED_BLOCKS ,
274+ "msg" => "check system clock"
275+ ) ;
276+ }
270277 // Drop the block.
271278 return ;
272279 }
@@ -306,12 +313,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
306313 }
307314 InboundEvent :: Msg ( UnknownBlockAggregate ( queued_aggregate) ) => {
308315 if self . attestations_delay_queue . len ( ) >= MAXIMUM_QUEUED_ATTESTATIONS {
309- error ! (
310- log,
311- "Aggregate attestation delay queue is full" ;
312- "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS ,
313- "msg" => "check system clock"
314- ) ;
316+ if self . attestation_delay_debounce . elapsed ( ) {
317+ error ! (
318+ log,
319+ "Aggregate attestation delay queue is full" ;
320+ "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS ,
321+ "msg" => "check system clock"
322+ ) ;
323+ }
315324 // Drop the attestation.
316325 return ;
317326 }
@@ -337,12 +346,14 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
337346 }
338347 InboundEvent :: Msg ( UnknownBlockUnaggregate ( queued_unaggregate) ) => {
339348 if self . attestations_delay_queue . len ( ) >= MAXIMUM_QUEUED_ATTESTATIONS {
340- error ! (
341- log,
342- "Attestation delay queue is full" ;
343- "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS ,
344- "msg" => "check system clock"
345- ) ;
349+ if self . attestation_delay_debounce . elapsed ( ) {
350+ error ! (
351+ log,
352+ "Attestation delay queue is full" ;
353+ "queue_size" => MAXIMUM_QUEUED_ATTESTATIONS ,
354+ "msg" => "check system clock"
355+ ) ;
356+ }
346357 // Drop the attestation.
347358 return ;
348359 }
0 commit comments