Skip to content

Commit 09ad4ff

Browse files
committed
Reimplemented the accumulation for the head token using a StringBuilder instead of a JRuby array, added more tests to cover some 'buffer full' conditions
1 parent 528260f commit 09ad4ff

3 files changed

Lines changed: 70 additions & 22 deletions

File tree

logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@
2020

2121
package org.logstash.common;
2222

23-
import org.jruby.Ruby;
24-
import org.jruby.RubyArray;
25-
import org.jruby.RubyClass;
26-
import org.jruby.RubyObject;
27-
import org.jruby.RubyString;
23+
import org.jruby.*;
2824
import org.jruby.anno.JRubyClass;
2925
import org.jruby.anno.JRubyMethod;
3026
import org.jruby.runtime.ThreadContext;
@@ -40,10 +36,12 @@ public class BufferedTokenizerExt extends RubyObject {
4036
freeze(RubyUtil.RUBY.getCurrentContext());
4137

4238
private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray();
39+
private StringBuilder headToken = new StringBuilder();
4340
private RubyString delimiter = NEW_LINE;
4441
private int sizeLimit;
4542
private boolean hasSizeLimit;
4643
private int inputSize;
44+
private boolean bufferFullErrorNotified = false;
4745

4846
public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) {
4947
super(runtime, metaClass);
@@ -66,7 +64,6 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
6664
* Extract takes an arbitrary string of input data and returns an array of
6765
* tokenized entities, provided there were any available to extract. This
6866
* makes for easy processing of datagrams using a pattern like:
69-
*
7067
* {@code tokenizer.extract(data).map { |entity| Decode(entity) }.each do}
7168
*
7269
* @param context ThreadContext
@@ -77,22 +74,53 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
7774
@SuppressWarnings("rawtypes")
7875
public RubyArray extract(final ThreadContext context, IRubyObject data) {
7976
final RubyArray entities = data.convertToString().split(delimiter, -1);
77+
if (!bufferFullErrorNotified) {
78+
input.clear();
79+
input.addAll(entities);
80+
} else {
81+
// after a full buffer signal
82+
if (input.isEmpty()) {
83+
// after a buffer full error, the remaining part of the line, till next delimiter,
84+
// has to be consumed, unless the input buffer doesn't still contain fragments of
85+
// subsequent tokens.
86+
entities.shift(context);
87+
input.addAll(entities);
88+
} else {
89+
// merge last of the input with first of incoming data segment
90+
if (!entities.isEmpty()) {
91+
RubyString last = ((RubyString) input.pop(context));
92+
RubyString nextFirst = ((RubyString) entities.shift(context));
93+
entities.unshift(last.concat(nextFirst));
94+
input.addAll(entities);
95+
}
96+
}
97+
}
98+
8099
if (hasSizeLimit) {
81-
final int entitiesSize = ((RubyString) entities.first()).size();
100+
if (bufferFullErrorNotified) {
101+
bufferFullErrorNotified = false;
102+
if (input.isEmpty()) {
103+
return RubyUtil.RUBY.newArray();
104+
}
105+
}
106+
final int entitiesSize = ((RubyString) input.first()).size();
82107
if (inputSize + entitiesSize > sizeLimit) {
108+
bufferFullErrorNotified = true;
109+
headToken = new StringBuilder();
110+
inputSize = 0;
111+
input.shift(context); // consume the token fragment that generates the buffer full
83112
throw new IllegalStateException("input buffer full");
84113
}
85114
this.inputSize = inputSize + entitiesSize;
86115
}
87-
input.append(entities.shift(context));
88-
if (entities.isEmpty()) {
116+
headToken.append(input.shift(context)); // remove head
117+
if (input.isEmpty()) {
89118
return RubyUtil.RUBY.newArray();
90119
}
91-
entities.unshift(input.join(context));
92-
input.clear();
93-
input.append(entities.pop(context));
94-
inputSize = ((RubyString) input.first()).size();
95-
return entities;
120+
input.unshift(RubyUtil.toRubyObject(headToken.toString())); // insert new head
121+
headToken = new StringBuilder(input.pop(context).toJava(String.class)); // remove the tail of the data segment
122+
inputSize = headToken.length();
123+
return input;
96124
}
97125

98126
/**
@@ -104,14 +132,14 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) {
104132
*/
105133
@JRubyMethod
106134
public IRubyObject flush(final ThreadContext context) {
107-
final IRubyObject buffer = input.join(context);
108-
input.clear();
135+
final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString());
136+
headToken = new StringBuilder();
109137
return buffer;
110138
}
111139

112140
@JRubyMethod(name = "empty?")
113141
public IRubyObject isEmpty(final ThreadContext context) {
114-
return input.empty_p();
142+
return RubyBoolean.newBoolean(context.runtime, headToken.toString().isEmpty());
115143
}
116144

117145
}

logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void shouldIgnoreEmptyPayload() {
8383
@Test
8484
public void shouldTokenizeEmptyPayloadWithNewline() {
8585
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n"));
86-
assertEquals(List.of(""), tokens);;
86+
assertEquals(List.of(""), tokens);
8787

8888
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n"));
8989
assertEquals(List.of("", "", ""), tokens);

logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@
3131

3232
import static org.hamcrest.MatcherAssert.assertThat;
3333
import static org.hamcrest.Matchers.containsString;
34-
import static org.junit.Assert.assertEquals;
35-
import static org.junit.Assert.assertThrows;
34+
import static org.junit.Assert.*;
3635
import static org.logstash.RubyUtil.RUBY;
3736

3837
@SuppressWarnings("unchecked")
@@ -72,7 +71,7 @@ public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStarti
7271
assertThat(thrownException.getMessage(), containsString("input buffer full"));
7372

7473
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\nanother"));
75-
assertEquals("After buffer full error should resume form the end of line", List.of("kaboom", "another"), tokens);
74+
assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens);
7675
}
7776

7877
@Test
@@ -84,7 +83,28 @@ public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeed
8483
});
8584
assertThat(thrownException.getMessage(), containsString("input buffer full"));
8685

87-
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb"));
86+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc"));
8887
assertEquals(List.of("bbbb"), tokens);
8988
}
89+
90+
@Test
91+
public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() {
92+
sut.extract(context, RubyUtil.RUBY.newString("aaaa"));
93+
94+
//first buffer full on 13 "a" letters
95+
Exception thrownException = assertThrows(IllegalStateException.class, () -> {
96+
sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa"));
97+
});
98+
assertThat(thrownException.getMessage(), containsString("input buffer full"));
99+
100+
// second buffer full on 11 "b" letters
101+
Exception secondThrownException = assertThrows(IllegalStateException.class, () -> {
102+
sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc"));
103+
});
104+
assertThat(secondThrownException.getMessage(), containsString("input buffer full"));
105+
106+
// now should resemble processing on c and d
107+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n"));
108+
assertEquals(List.of("ccccc", "ddd"), tokens);
109+
}
90110
}

0 commit comments

Comments
 (0)