Skip to content

Commit 590b761

Browse files
committed
Push experimental support for bulk vote forwarding
1 parent 4cd2ce5 commit 590b761

3 files changed

Lines changed: 44 additions & 27 deletions

File tree

bukkit/src/main/java/com/vexsoftware/votifier/forwarding/BukkitPluginMessagingForwardingSink.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.vexsoftware.votifier.forwarding;
22

33
import com.google.gson.JsonObject;
4+
import com.google.gson.stream.JsonReader;
45
import com.vexsoftware.votifier.NuVotifierBukkit;
56
import com.vexsoftware.votifier.model.Vote;
67
import com.vexsoftware.votifier.util.GsonInst;
@@ -10,6 +11,7 @@
1011
import org.bukkit.plugin.Plugin;
1112
import org.bukkit.plugin.messaging.PluginMessageListener;
1213

14+
import java.io.CharArrayReader;
1315
import java.nio.charset.StandardCharsets;
1416
import java.util.logging.Level;
1517

@@ -37,11 +39,13 @@ public void halt() {
3739

3840
@Override
3941
public void onPluginMessageReceived(String s, Player player, byte[] bytes) {
40-
try {
41-
String message = new String(bytes, StandardCharsets.UTF_8);
42-
JsonObject jsonObject = GsonInst.gson.fromJson(message, JsonObject.class);
43-
Vote v = new Vote(jsonObject);
44-
listener.onForward(v);
42+
String message = new String(bytes, StandardCharsets.UTF_8);
43+
try (JsonReader reader = new JsonReader(new CharArrayReader(message.toCharArray()))){
44+
while (reader.hasNext()) {
45+
JsonObject jsonObject = GsonInst.gson.fromJson(reader, JsonObject.class);
46+
Vote v = new Vote(jsonObject);
47+
listener.onForward(v);
48+
}
4549
} catch (Exception e) {
4650
NuVotifierBukkit.getInstance().getLogger().log(Level.SEVERE, "There was an unknown error when processing a forwarded vote.", e);
4751
}

common/src/main/java/com/vexsoftware/votifier/support/forwarding/AbstractPluginMessagingForwardingSource.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import java.io.IOException;
1010
import java.nio.charset.StandardCharsets;
11+
import java.util.ArrayList;
1112
import java.util.Collection;
1213
import java.util.Iterator;
1314
import java.util.concurrent.TimeUnit;
@@ -44,7 +45,16 @@ public void forward(Vote v) {
4445

4546
protected boolean forwardSpecific(BackendServer connection, Vote vote) {
4647
byte[] rawData = vote.serialize().toString().getBytes(StandardCharsets.UTF_8);
47-
return connection.sendPluginMessage(channel, rawData);
48+
return forwardSpecific(connection, rawData);
49+
}
50+
51+
protected boolean forwardSpecific(BackendServer connection, Collection<Vote> votes) {
52+
StringBuilder data = new StringBuilder();
53+
for (Vote v : votes) {
54+
data.append(v.serialize().toString());
55+
}
56+
57+
return forwardSpecific(connection, data.toString().getBytes(StandardCharsets.UTF_8));
4858
}
4959

5060
private boolean forwardSpecific(BackendServer connection, byte[] data) {
@@ -99,25 +109,24 @@ private void dumpVotesToServer(Collection<Vote> cachedVotes, BackendServer targe
99109
plugin.getScheduler().delayedOnPool(() -> {
100110
int evicted = 0;
101111
Iterator<Vote> vi = cachedVotes.iterator();
102-
112+
Collection<Vote> chunk = new ArrayList<>(dumpRate);
103113
while (vi.hasNext() && evicted < dumpRate) {
104-
Vote v = vi.next();
105-
if (forwardSpecific(target, v)) {
106-
vi.remove();
107-
evicted++;
108-
} else {
109-
// so since our forwarding failed, break like we are done
110-
break;
111-
}
114+
chunk.add(vi.next());
115+
vi.remove();
112116
}
113117

114-
if (evicted >= dumpRate && !cachedVotes.isEmpty()) {
115-
// if we evicted everything we could but still need to evict more
116-
dumpVotesToServer(cachedVotes, target, identifier, evictedAlready + evicted, cb);
117-
return;
118-
}
118+
if (forwardSpecific(target, chunk)) {
119+
evicted += chunk.size();
119120

120-
// we either successfully
121+
if (evicted >= dumpRate && !cachedVotes.isEmpty()) {
122+
// if we evicted everything we could but still need to evict more
123+
dumpVotesToServer(cachedVotes, target, identifier, evictedAlready + evicted, cb);
124+
return;
125+
}
126+
} else {
127+
// so since our forwarding failed, break like we are done
128+
cachedVotes.addAll(chunk);
129+
}
121130

122131
if (plugin.isDebug()) {
123132
plugin.getPluginLogger().info("Successfully evicted " + (evictedAlready + evicted) + " votes to " + identifier + ".");

sponge/src/main/java/com/vexsoftware/votifier/sponge/forwarding/SpongePluginMessagingForwardingSink.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.vexsoftware.votifier.sponge.forwarding;
22

33
import com.google.gson.JsonObject;
4+
import com.google.gson.stream.JsonReader;
45
import com.vexsoftware.votifier.model.Vote;
56
import com.vexsoftware.votifier.sponge.VotifierPlugin;
67
import com.vexsoftware.votifier.util.GsonInst;
@@ -11,6 +12,7 @@
1112
import org.spongepowered.api.network.RawDataListener;
1213
import org.spongepowered.api.network.RemoteConnection;
1314

15+
import java.io.CharArrayReader;
1416
import java.nio.charset.StandardCharsets;
1517

1618
public class SpongePluginMessagingForwardingSink implements ForwardingVoteSink, RawDataListener {
@@ -33,12 +35,14 @@ public void halt() {
3335

3436
@Override
3537
public void handlePayload(ChannelBuf channelBuf, RemoteConnection remoteConnection, Platform.Type type) {
36-
try {
37-
byte[] msgDirBuf = channelBuf.readBytes(channelBuf.available());
38-
String message = new String(msgDirBuf, StandardCharsets.UTF_8);
39-
JsonObject jsonObject = GsonInst.gson.fromJson(message, JsonObject.class);
40-
Vote v = new Vote(jsonObject);
41-
listener.onForward(v);
38+
byte[] msgDirBuf = channelBuf.readBytes(channelBuf.available());
39+
String message = new String(msgDirBuf, StandardCharsets.UTF_8);
40+
try (JsonReader reader = new JsonReader(new CharArrayReader(message.toCharArray()))){
41+
while (reader.hasNext()) {
42+
JsonObject jsonObject = GsonInst.gson.fromJson(reader, JsonObject.class);
43+
Vote v = new Vote(jsonObject);
44+
listener.onForward(v);
45+
}
4246
} catch (Exception e) {
4347
p.getLogger().error("There was an unknown error when processing a forwarded vote.", e);
4448
}

0 commit comments

Comments
 (0)