2020
2121package org .logstash .common ;
2222
23- import org .jruby .Ruby ;
24- import org .jruby .RubyArray ;
25- import org .jruby .RubyClass ;
26- import org .jruby .RubyObject ;
27- import org .jruby .RubyString ;
23+ import org .jruby .*;
2824import org .jruby .anno .JRubyClass ;
2925import org .jruby .anno .JRubyMethod ;
3026import org .jruby .runtime .ThreadContext ;
@@ -40,10 +36,12 @@ public class BufferedTokenizerExt extends RubyObject {
4036 freeze (RubyUtil .RUBY .getCurrentContext ());
4137
4238 private @ SuppressWarnings ("rawtypes" ) RubyArray input = RubyUtil .RUBY .newArray ();
39+ private StringBuilder headToken = new StringBuilder ();
4340 private RubyString delimiter = NEW_LINE ;
4441 private int sizeLimit ;
4542 private boolean hasSizeLimit ;
4643 private int inputSize ;
44+ private boolean bufferFullErrorNotified = false ;
4745
4846 public BufferedTokenizerExt (final Ruby runtime , final RubyClass metaClass ) {
4947 super (runtime , metaClass );
@@ -66,7 +64,6 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
6664 * Extract takes an arbitrary string of input data and returns an array of
6765 * tokenized entities, provided there were any available to extract. This
6866 * makes for easy processing of datagrams using a pattern like:
69- *
7067 * {@code tokenizer.extract(data).map { |entity| Decode(entity) }.each do}
7168 *
7269 * @param context ThreadContext
@@ -77,22 +74,53 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
7774 @ SuppressWarnings ("rawtypes" )
7875 public RubyArray extract (final ThreadContext context , IRubyObject data ) {
7976 final RubyArray entities = data .convertToString ().split (delimiter , -1 );
77+ if (!bufferFullErrorNotified ) {
78+ input .clear ();
79+ input .addAll (entities );
80+ } else {
81+ // after a full buffer signal
82+ if (input .isEmpty ()) {
83+ // after a buffer full error, the remaining part of the line, till next delimiter,
84+ // has to be consumed, unless the input buffer doesn't still contain fragments of
85+ // subsequent tokens.
86+ entities .shift (context );
87+ input .addAll (entities );
88+ } else {
89+ // merge last of the input with first of incoming data segment
90+ if (!entities .isEmpty ()) {
91+ RubyString last = ((RubyString ) input .pop (context ));
92+ RubyString nextFirst = ((RubyString ) entities .shift (context ));
93+ entities .unshift (last .concat (nextFirst ));
94+ input .addAll (entities );
95+ }
96+ }
97+ }
98+
8099 if (hasSizeLimit ) {
81- final int entitiesSize = ((RubyString ) entities .first ()).size ();
100+ if (bufferFullErrorNotified ) {
101+ bufferFullErrorNotified = false ;
102+ if (input .isEmpty ()) {
103+ return RubyUtil .RUBY .newArray ();
104+ }
105+ }
106+ final int entitiesSize = ((RubyString ) input .first ()).size ();
82107 if (inputSize + entitiesSize > sizeLimit ) {
108+ bufferFullErrorNotified = true ;
109+ headToken = new StringBuilder ();
110+ inputSize = 0 ;
111+ input .shift (context ); // consume the token fragment that generates the buffer full
83112 throw new IllegalStateException ("input buffer full" );
84113 }
85114 this .inputSize = inputSize + entitiesSize ;
86115 }
87- input .append (entities .shift (context ));
88- if (entities .isEmpty ()) {
116+ headToken .append (input .shift (context )); // remove head
117+ if (input .isEmpty ()) {
89118 return RubyUtil .RUBY .newArray ();
90119 }
91- entities .unshift (input .join (context ));
92- input .clear ();
93- input .append (entities .pop (context ));
94- inputSize = ((RubyString ) input .first ()).size ();
95- return entities ;
120+ input .unshift (RubyUtil .toRubyObject (headToken .toString ())); // insert new head
121+ headToken = new StringBuilder (input .pop (context ).toJava (String .class )); // remove the tail of the data segment
122+ inputSize = headToken .length ();
123+ return input ;
96124 }
97125
98126 /**
@@ -104,14 +132,14 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) {
104132 */
105133 @ JRubyMethod
106134 public IRubyObject flush (final ThreadContext context ) {
107- final IRubyObject buffer = input . join ( context );
108- input . clear ();
135+ final IRubyObject buffer = RubyUtil . toRubyObject ( headToken . toString () );
136+ headToken = new StringBuilder ();
109137 return buffer ;
110138 }
111139
112140 @ JRubyMethod (name = "empty?" )
113141 public IRubyObject isEmpty (final ThreadContext context ) {
114- return input . empty_p ( );
142+ return RubyBoolean . newBoolean ( context . runtime , headToken . toString (). isEmpty () );
115143 }
116144
117145}
0 commit comments