Skip to content

Commit e9a551e

Browse files
committed
Add sequence numbers for detecting dropped messages
Previously if a message was dropped, there was no way to know. This gives every message from the bus to the VM a sequence number; if any is ever skipped, that tells the VM that some messages were not received. This should rarely happen unless there is a flood of events, or a few very large events, but if it does happen it's good to be able to know. So far, no userspace library actually handles these sequence numbers, but as far as I can tell none of the existing libraries are broken by them either.
1 parent e5e5da6 commit e9a551e

File tree

3 files changed

+16
-9
lines changed

3 files changed

+16
-9
lines changed

src/main/java/li/cil/oc2/api/README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,11 @@ the built-in `RedstoneInterfaceBlockEntity` device.
9191
- If subscriptions are used on or before OC2R version 2.2.12, they can cause a server crash if too many messages are
9292
sent at once. To be safe, do not send any event in the same tick as another message, or depend on a later minimum
9393
version of OC2R.
94-
- Right now, if a lot of messages are sent and the VM does not listen for them, some may be dropped, and there is no
95-
indication to the device or VM when this happens. You cannot depend on the events being reliably delivered, and must
96-
use some other communication channel if reliability is a requirement.
94+
- Right now, if a lot of messages are sent and the VM does not listen for them, some may be dropped; in theory the VM
95+
can detect this, but none of the existing libraries do (see next point). You cannot depend on the events being
96+
reliably delivered, and must use some other communication channel if reliability is a requirement. A method call
97+
should not have this issue unless the results and all the (subscribed to) events in that tick put together are more
98+
than 4 KiB, though be aware that if the method call causes a state change, that might itself cause some events.
9799
- The on-the-"wire" event and subscription format is probably stable, but the VM's userspace python and lua libraries do
98100
not easily support them yet, and often silently discard events when expecting a different sort of message. Better
99101
support is actively being worked on.

src/main/java/li/cil/oc2/common/bus/RPCDeviceBusAdapter.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public final class RPCDeviceBusAdapter implements Steppable, IEventSink {
5858
@Serialized private final ByteBuffer transmitBuffer; // for data written to device by VM
5959
@Serialized private ByteBuffer receiveBuffer; // for data written by device to VM
6060
@Serialized private MethodInvocation synchronizedInvocation; // pending main thread invocation
61+
@Serialized private volatile long sequenceNumber = 0;
6162

6263
///////////////////////////////////////////////////////////////////
6364

@@ -113,6 +114,7 @@ public void reset() {
113114
transmitBuffer.clear();
114115
receiveBuffer.clear();
115116
synchronizedInvocation = null;
117+
sequenceNumber = 0;
116118
}
117119

118120
public void pause() {
@@ -476,10 +478,11 @@ private void writeError(final String message) {
476478
}
477479

478480
private void writeMessage(final String type, @Nullable final Object data) {
479-
final String json = gson.toJson(new Message(type, data));
480-
final byte[] bytes = json.getBytes();
481-
final int messageLength = bytes.length + MESSAGE_DELIMITER.length * 2;
481+
// Start synchronization here to also include sequenceNumber increment
482482
synchronized (receiveLock) {
483+
final String json = gson.toJson(new Message(type, data, sequenceNumber++));
484+
final byte[] bytes = json.getBytes();
485+
final int messageLength = bytes.length + MESSAGE_DELIMITER.length * 2;
483486
if (receiveBuffer.remaining() < messageLength) {
484487
// Decide whether to resize or not
485488
// The current heuristic is to resize for a large message (because
@@ -493,7 +496,8 @@ private void writeMessage(final String type, @Nullable final Object data) {
493496
reallocate ? "reallocating" : "ignoring");
494497

495498
if (!reallocate) {
496-
// Note: There is nothing that indicates to either the VM or the peripheral that a message was eaten
499+
// Return without writing anything. We already incremented sequenceNumber, so the VM can know
500+
// something was missed when it reads the next message actually present.
497501
return;
498502
}
499503

@@ -533,7 +537,7 @@ public record RPCDeviceWithIdentifier(UUID identifier, RPCDevice device) { }
533537

534538
public record EmptyMethodGroup(String name) { }
535539

536-
public record Message(String type, @Nullable Object data) {
540+
public record Message(String type, @Nullable Object data, long seq) {
537541
// Device -> VM
538542
public static final String MESSAGE_TYPE_LIST = "list";
539543
public static final String MESSAGE_TYPE_METHODS = "methods";

src/main/java/li/cil/oc2/common/serialization/gson/MessageJsonDeserializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ public RPCDeviceBusAdapter.Message deserialize(final JsonElement json, final Typ
2121
case RPCDeviceBusAdapter.Message.MESSAGE_TYPE_INVOKE_METHOD -> context.deserialize(jsonObject.getAsJsonObject("data"), RPCDeviceBusAdapter.MethodInvocation.class);
2222
default -> throw new JsonParseException(RPCDeviceBusAdapter.ERROR_UNKNOWN_MESSAGE_TYPE + messageType);
2323
};
24+
final long messageSequenceNumber = jsonObject.has("seq") ? jsonObject.getAsJsonPrimitive("seq").getAsLong() : -1;
2425

25-
return new RPCDeviceBusAdapter.Message(messageType, messageData);
26+
return new RPCDeviceBusAdapter.Message(messageType, messageData, messageSequenceNumber);
2627
}
2728
}

0 commit comments

Comments
 (0)