Skip to content

Commit 3fe9998

Browse files
authored
[DEV-1038] fix: add missing no stream case to expected revision error handling (#363)
* fix: missing noStream case to expected revision error handling Handle EXPECTED_NO_STREAM case in AppendToStream error response parsing. Previously this case fell through to the else branch, incorrectly returning a streamRevision state instead of noStream.
1 parent fc09a6a commit 3fe9998

2 files changed

Lines changed: 72 additions & 13 deletions

File tree

src/main/java/io/kurrent/dbclient/AppendToStream.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.grpc.ManagedChannel;
88
import io.grpc.stub.StreamObserver;
99

10+
1011
import java.util.ArrayList;
1112
import java.util.Iterator;
1213
import java.util.List;
@@ -69,6 +70,8 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Event
6970
expectedRevision = StreamState.any();
7071
} else if (wev.getExpectedRevisionOptionCase() == StreamsOuterClass.AppendResp.WrongExpectedVersion.ExpectedRevisionOptionCase.EXPECTED_STREAM_EXISTS) {
7172
expectedRevision = StreamState.streamExists();
73+
} else if (wev.getExpectedRevisionOptionCase() == StreamsOuterClass.AppendResp.WrongExpectedVersion.ExpectedRevisionOptionCase.EXPECTED_NO_STREAM) {
74+
expectedRevision = StreamState.noStream();
7275
} else {
7376
expectedRevision = StreamState.streamRevision(wev.getExpectedRevision());
7477
}

src/test/java/io/kurrent/dbclient/streams/AppendTests.java

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,41 +12,97 @@ public interface AppendTests extends ConnectionAware {
1212
default void testAppendSingleEventNoStream() throws Throwable {
1313
KurrentDBClient client = getDatabase().defaultClient();
1414
String streamName = generateName();
15-
String eventType = "TestEvent";
1615
UUID eventId = UUID.randomUUID();
1716
Foo foo = new Foo();
1817
byte[] fooBytes = mapper.writeValueAsBytes(foo);
1918

20-
EventData event = EventData.builderAsJson(eventType, fooBytes)
19+
EventData event = EventData.builderAsJson("TestEvent", fooBytes)
2120
.metadataAsBytes(fooBytes)
2221
.eventId(eventId)
2322
.build();
2423

2524
WriteResult appendResult = client.appendToStream(
26-
streamName,
27-
AppendToStreamOptions.get().streamState(StreamState.noStream()),
28-
event
29-
).get();
25+
streamName, AppendToStreamOptions.get().streamState(StreamState.noStream()), event).get();
3026

3127
Assertions.assertEquals(StreamState.streamRevision(0), appendResult.getNextExpectedRevision());
3228

33-
ReadResult result = client.readStream(
34-
streamName,
35-
ReadStreamOptions.get().fromEnd().backwards().maxCount(1)
36-
).get();
37-
38-
Assertions.assertEquals(1, result.getEvents().size());
29+
ReadResult result = client.readStream(streamName, ReadStreamOptions.get().fromEnd().backwards().maxCount(1)).get();
3930
RecordedEvent first = result.getEvents().get(0).getEvent();
4031
ObjectNode userMetadata = mapper.readValue(first.getUserMetadata(), ObjectNode.class);
4132

4233
Assertions.assertAll(
4334
() -> Assertions.assertEquals(streamName, first.getStreamId()),
44-
() -> Assertions.assertEquals(eventType, first.getEventType()),
35+
() -> Assertions.assertEquals("TestEvent", first.getEventType()),
4536
() -> Assertions.assertEquals(eventId.toString(), first.getEventId().toString()),
4637
() -> Assertions.assertEquals(foo, mapper.readValue(first.getEventData(), Foo.class)),
4738
() -> Assertions.assertEquals(foo, mapper.readValue(first.getUserMetadata(), Foo.class)),
4839
() -> Assertions.assertFalse(userMetadata.has(ClientTelemetryConstants.Metadata.TRACE_ID)),
4940
() -> Assertions.assertFalse(userMetadata.has(ClientTelemetryConstants.Metadata.SPAN_ID))
5041
);
5142
}
43+
44+
@Test
45+
default void testAppendMultipleEventsAtOnce() throws Throwable {
46+
KurrentDBClient client = getDatabase().defaultClient();
47+
String streamName = generateName();
48+
int eventCount = 5;
49+
50+
WriteResult result = client.appendToStream(streamName,
51+
AppendToStreamOptions.get().streamState(StreamState.noStream()),
52+
generateEvents(eventCount, "TestEvent").iterator()).get();
53+
54+
Assertions.assertEquals(StreamState.streamRevision(eventCount - 1), result.getNextExpectedRevision());
55+
Assertions.assertEquals(eventCount, client.readStream(streamName, ReadStreamOptions.get()).get().getEvents().size());
56+
}
57+
58+
@Test
59+
default void testStreamStateOptimisticConcurrency() throws Throwable {
60+
KurrentDBClient client = getDatabase().defaultClient();
61+
62+
String anyStream = generateName();
63+
appendEvent(client, anyStream, StreamState.any());
64+
appendEvent(client, anyStream, StreamState.any());
65+
Assertions.assertEquals(2, client.readStream(anyStream, ReadStreamOptions.get()).get().getEvents().size());
66+
67+
String existsStream = generateName();
68+
assertWrongExpectedVersion(client, existsStream, StreamState.streamExists(), StreamState.streamExists(), StreamState.noStream());
69+
appendEvent(client, existsStream, StreamState.noStream());
70+
appendEvent(client, existsStream, StreamState.streamExists());
71+
72+
String noStream = generateName();
73+
appendEvent(client, noStream, StreamState.noStream());
74+
assertWrongExpectedVersion(client, noStream, StreamState.noStream(), StreamState.noStream(), StreamState.streamRevision(0));
75+
76+
String revStream = generateName();
77+
appendEvent(client, revStream, StreamState.noStream());
78+
appendEvent(client, revStream, StreamState.streamRevision(0));
79+
assertWrongExpectedVersion(client, revStream, StreamState.streamRevision(99), StreamState.streamRevision(99), StreamState.streamRevision(1));
80+
}
81+
82+
default EventData createTestEvent() throws Exception {
83+
return EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
84+
.eventId(UUID.randomUUID())
85+
.build();
86+
}
87+
88+
default void appendEvent(KurrentDBClient client, String streamName, StreamState state) throws Exception {
89+
client.appendToStream(streamName, AppendToStreamOptions.get().streamState(state), createTestEvent()).get();
90+
}
91+
92+
default void assertWrongExpectedVersion(KurrentDBClient client, String streamName, StreamState state, StreamState expectedState, StreamState actualState) {
93+
WrongExpectedVersionException ex = Assertions.assertThrows(WrongExpectedVersionException.class, () -> {
94+
try {
95+
appendEvent(client, streamName, state);
96+
} catch (java.util.concurrent.ExecutionException e) {
97+
if (e.getCause() != null) {
98+
throw e.getCause();
99+
}
100+
throw e;
101+
}
102+
});
103+
Assertions.assertEquals(streamName, ex.getStreamName());
104+
Assertions.assertEquals(expectedState, ex.getExpectedState());
105+
Assertions.assertEquals(actualState, ex.getActualState());
106+
}
107+
52108
}

0 commit comments

Comments
 (0)