Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ private record ReactorExchangeResponseFunction(
@Nullable ReactiveAdapter returnTypeAdapter,
boolean blockForOptional, @Nullable Duration blockTimeout) implements ResponseFunction {

private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";

@Override
public @Nullable Object execute(HttpRequestValues requestValues) {

Expand Down Expand Up @@ -491,14 +493,16 @@ public static ResponseFunction create(ReactorHttpExchangeAdapter client, Method
MethodParameter returnParam = new MethodParameter(method, -1);
Class<?> returnType = returnParam.getParameterType();
boolean isSuspending = KotlinDetector.isSuspendingFunction(method);
boolean hasFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName());
boolean isUnwrapped = isSuspending && !hasFlowReturnType;
if (isSuspending) {
returnType = Mono.class;
returnType = (hasFlowReturnType ? Flux.class : Mono.class);
}

ReactiveAdapter reactiveAdapter = client.getReactiveAdapterRegistry().getAdapter(returnType);

MethodParameter actualParam = (reactiveAdapter != null ? returnParam.nested() : returnParam.nestedIfOptional());
Class<?> actualType = isSuspending ? actualParam.getParameterType() : actualParam.getNestedParameterType();
Class<?> actualType = isUnwrapped ? actualParam.getParameterType() : actualParam.getNestedParameterType();

Function<HttpRequestValues, Publisher<?>> responseFunction;
if (ClassUtils.isVoidType(actualType)) {
Expand All @@ -511,18 +515,18 @@ else if (actualType.equals(HttpHeaders.class)) {
responseFunction = client::exchangeForHeadersMono;
}
else if (actualType.equals(ResponseEntity.class)) {
MethodParameter bodyParam = isSuspending ? actualParam : actualParam.nested();
MethodParameter bodyParam = isUnwrapped ? actualParam : actualParam.nested();
Class<?> bodyType = bodyParam.getNestedParameterType();
if (bodyType.equals(Void.class)) {
responseFunction = client::exchangeForBodilessEntityMono;
}
else {
ReactiveAdapter bodyAdapter = client.getReactiveAdapterRegistry().getAdapter(bodyType);
responseFunction = initResponseEntityFunction(client, bodyParam, bodyAdapter, isSuspending);
responseFunction = initResponseEntityFunction(client, bodyParam, bodyAdapter, isUnwrapped);
}
}
else {
responseFunction = initBodyFunction(client, actualParam, reactiveAdapter, isSuspending);
responseFunction = initBodyFunction(client, actualParam, reactiveAdapter, isUnwrapped);
}

return new ReactorExchangeResponseFunction(
Expand All @@ -532,7 +536,7 @@ else if (actualType.equals(ResponseEntity.class)) {
@SuppressWarnings("ConstantConditions")
private static Function<HttpRequestValues, Publisher<?>> initResponseEntityFunction(
ReactorHttpExchangeAdapter client, MethodParameter methodParam,
@Nullable ReactiveAdapter reactiveAdapter, boolean isSuspending) {
@Nullable ReactiveAdapter reactiveAdapter, boolean isUnwrapped) {

if (reactiveAdapter == null) {
return request -> client.exchangeForEntityMono(
Expand All @@ -543,7 +547,7 @@ private static Function<HttpRequestValues, Publisher<?>> initResponseEntityFunct
"ResponseEntity body must be a concrete value or a multi-value Publisher");

ParameterizedTypeReference<?> bodyType =
ParameterizedTypeReference.forType(isSuspending ? methodParam.nested().getGenericParameterType() :
ParameterizedTypeReference.forType(isUnwrapped ? methodParam.nested().getGenericParameterType() :
methodParam.nested().getNestedGenericParameterType());

// Shortcut for Flux
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class KotlinHttpServiceMethodTests {
assertThat(flowBody.toList()).containsExactly("exchange", "For", "Body", "Flux")
verifyClientInvocation("exchangeForBodyFlux", object : ParameterizedTypeReference<String>() {})

val suspendingFlowBody = service.suspendingFlowBody()
assertThat(suspendingFlowBody.toList()).containsExactly("exchange", "For", "Body", "Flux")
verifyClientInvocation("exchangeForBodyFlux", object : ParameterizedTypeReference<String>() {})

val stringEntity = service.stringEntity()
assertThat(stringEntity).isEqualTo(ResponseEntity.ok<String>("exchangeForEntityMono"))
verifyClientInvocation("exchangeForEntityMono", object : ParameterizedTypeReference<String>() {})
Expand Down Expand Up @@ -127,6 +131,9 @@ class KotlinHttpServiceMethodTests {
@GetExchange
suspend fun listBody(): MutableList<String>

@GetExchange
suspend fun suspendingFlowBody(): Flow<String>

@GetExchange
suspend fun stringEntity(): ResponseEntity<String>

Expand Down