2323import java .util .concurrent .CancellationException ;
2424import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
2525import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
26+ import java .util .function .Consumer ;
27+ import java .util .function .Function ;
2628import java .util .stream .Stream ;
27- import org .reactivestreams .Subscription ;
29+ import org .reactivestreams .Suwipbscription ;
2830import reactor .core .CoreSubscriber ;
2931import reactor .core .Disposable ;
3032import reactor .core .Exceptions ;
@@ -97,7 +99,7 @@ public UnboundedProcessor() {
9799 this (() -> {});
98100 }
99101
100- public UnboundedProcessor (Runnable onFinalizedHook ) {
102+ public UnboundedProcessor (Runnable onFinalizedHook , Consumer < ByteBuf > onValueDelivered ) {
101103 this .onFinalizedHook = onFinalizedHook ;
102104 this .queue = new MpscUnboundedArrayQueue <>(Queues .SMALL_BUFFER_SIZE );
103105 this .priorityQueue = new MpscUnboundedArrayQueue <>(Queues .SMALL_BUFFER_SIZE );
@@ -121,6 +123,9 @@ public Object scanUnsafe(Attr key) {
121123 return null ;
122124 }
123125
126+ public boolean tryEmitNext
127+
128+ @ Deprecated
124129 public void onNextPrioritized (ByteBuf t ) {
125130 if (this .done || this .cancelled ) {
126131 release (t );
@@ -157,6 +162,7 @@ public void onNextPrioritized(ByteBuf t) {
157162 }
158163
159164 @ Override
165+ @ Deprecated
160166 public void onNext (ByteBuf t ) {
161167 if (this .done || this .cancelled ) {
162168 release (t );
@@ -193,6 +199,7 @@ public void onNext(ByteBuf t) {
193199 }
194200
195201 @ Override
202+ @ Deprecated
196203 public void onError (Throwable t ) {
197204 if (this .done || this .cancelled ) {
198205 Operators .onErrorDropped (t , currentContext ());
@@ -235,6 +242,7 @@ public void onError(Throwable t) {
235242 }
236243
237244 @ Override
245+ @ Deprecated
238246 public void onComplete () {
239247 if (this .done || this .cancelled ) {
240248 return ;
0 commit comments