@@ -73,9 +73,14 @@ public void ensuresThatSetupPayloadCanBeRetained() {
7373 @ Test
7474 public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions () {
7575 Payload setupPayload = ByteBufPayload .create ("TestData" , "TestMetadata" );
76-
7776 Assertions .assertThat (setupPayload .refCnt ()).isOne ();
7877
78+ // Keep the data and metadata around so we can try changing them independently
79+ ByteBuf dataBuf = setupPayload .data ();
80+ ByteBuf metadataBuf = setupPayload .metadata ();
81+ dataBuf .retain ();
82+ metadataBuf .retain ();
83+
7984 TestClientTransport testClientTransport = new TestClientTransport ();
8085 Mono <RSocket > connectionMono =
8186 RSocketConnector .create ().setupPayload (setupPayload ).connect (testClientTransport );
@@ -92,6 +97,15 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
9297 .expectComplete ()
9398 .verify (Duration .ofMillis (100 ));
9499
100+ // Changing the original data and metadata should not impact the SetupPayload
101+ dataBuf .writerIndex (dataBuf .readerIndex ());
102+ dataBuf .writeChar ('d' );
103+ dataBuf .release ();
104+
105+ metadataBuf .writerIndex (metadataBuf .readerIndex ());
106+ metadataBuf .writeChar ('m' );
107+ metadataBuf .release ();
108+
95109 Assertions .assertThat (testClientTransport .testConnection ().getSent ())
96110 .hasSize (2 )
97111 .allMatch (
@@ -100,7 +114,11 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
100114 return payload .getDataUtf8 ().equals ("TestData" )
101115 && payload .getMetadataUtf8 ().equals ("TestMetadata" );
102116 })
103- .allMatch (ReferenceCounted ::release );
117+ .allMatch (
118+ byteBuf -> {
119+ System .out .println ("calling release " + byteBuf .refCnt ());
120+ return byteBuf .release ();
121+ });
104122 Assertions .assertThat (setupPayload .refCnt ()).isZero ();
105123 }
106124
0 commit comments