After calling any? or first() on an instance of MessagePackEventStream which contains multiple events, iterating over it using each returns only the first event and the stream size is 0.
Here is a minimal reproduction:
require 'fluent/event'
time = Fluent::EventTime.now
array_stream = Fluent::ArrayEventStream.new([[time, {'a' => 1}], [time, {'a' => 2}]])
mp_stream = Fluent::MessagePackEventStream.new(array_stream.to_msgpack_stream)
puts mp_stream.any?
puts mp_stream.size # Prints 0
mp_stream.each do |time|
puts time # Executes only once
end
Commenting the call to any? makes it work as it should. Am I missing something here?
I noticed it trying to debug a specific filter directly following a forward-input.
I'm using fluentd 1.2.4.
After calling
any?orfirst()on an instance of MessagePackEventStream which contains multiple events, iterating over it usingeachreturns only the first event and the streamsizeis 0.Here is a minimal reproduction:
Commenting the call to
any?makes it work as it should. Am I missing something here?I noticed it trying to debug a specific filter directly following a forward-input.
I'm using fluentd 1.2.4.