diff --git a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java index 3e4a8e7aa824..09fa254b2642 100644 --- a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java +++ b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java @@ -462,6 +462,8 @@ private record ReactorExchangeResponseFunction( @Nullable ReactiveAdapter returnTypeAdapter, boolean blockForOptional, @Nullable Duration blockTimeout) implements ResponseFunction { + private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow"; + @Override public @Nullable Object execute(HttpRequestValues requestValues) { @@ -491,14 +493,16 @@ public static ResponseFunction create(ReactorHttpExchangeAdapter client, Method MethodParameter returnParam = new MethodParameter(method, -1); Class returnType = returnParam.getParameterType(); boolean isSuspending = KotlinDetector.isSuspendingFunction(method); + boolean hasFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName()); + boolean isUnwrapped = isSuspending && !hasFlowReturnType; if (isSuspending) { - returnType = Mono.class; + returnType = (hasFlowReturnType ? Flux.class : Mono.class); } ReactiveAdapter reactiveAdapter = client.getReactiveAdapterRegistry().getAdapter(returnType); MethodParameter actualParam = (reactiveAdapter != null ? returnParam.nested() : returnParam.nestedIfOptional()); - Class actualType = isSuspending ? actualParam.getParameterType() : actualParam.getNestedParameterType(); + Class actualType = isUnwrapped ? actualParam.getParameterType() : actualParam.getNestedParameterType(); Function> responseFunction; if (ClassUtils.isVoidType(actualType)) { @@ -511,18 +515,18 @@ else if (actualType.equals(HttpHeaders.class)) { responseFunction = client::exchangeForHeadersMono; } else if (actualType.equals(ResponseEntity.class)) { - MethodParameter bodyParam = isSuspending ? actualParam : actualParam.nested(); + MethodParameter bodyParam = isUnwrapped ? actualParam : actualParam.nested(); Class bodyType = bodyParam.getNestedParameterType(); if (bodyType.equals(Void.class)) { responseFunction = client::exchangeForBodilessEntityMono; } else { ReactiveAdapter bodyAdapter = client.getReactiveAdapterRegistry().getAdapter(bodyType); - responseFunction = initResponseEntityFunction(client, bodyParam, bodyAdapter, isSuspending); + responseFunction = initResponseEntityFunction(client, bodyParam, bodyAdapter, isUnwrapped); } } else { - responseFunction = initBodyFunction(client, actualParam, reactiveAdapter, isSuspending); + responseFunction = initBodyFunction(client, actualParam, reactiveAdapter, isUnwrapped); } return new ReactorExchangeResponseFunction( @@ -532,7 +536,7 @@ else if (actualType.equals(ResponseEntity.class)) { @SuppressWarnings("ConstantConditions") private static Function> initResponseEntityFunction( ReactorHttpExchangeAdapter client, MethodParameter methodParam, - @Nullable ReactiveAdapter reactiveAdapter, boolean isSuspending) { + @Nullable ReactiveAdapter reactiveAdapter, boolean isUnwrapped) { if (reactiveAdapter == null) { return request -> client.exchangeForEntityMono( @@ -543,7 +547,7 @@ private static Function> initResponseEntityFunct "ResponseEntity body must be a concrete value or a multi-value Publisher"); ParameterizedTypeReference bodyType = - ParameterizedTypeReference.forType(isSuspending ? methodParam.nested().getGenericParameterType() : + ParameterizedTypeReference.forType(isUnwrapped ? methodParam.nested().getGenericParameterType() : methodParam.nested().getNestedGenericParameterType()); // Shortcut for Flux diff --git a/spring-web/src/test/kotlin/org/springframework/web/service/invoker/HttpServiceMethodKotlinTests.kt b/spring-web/src/test/kotlin/org/springframework/web/service/invoker/HttpServiceMethodKotlinTests.kt index ae84239f97dd..c5a62bd593a9 100644 --- a/spring-web/src/test/kotlin/org/springframework/web/service/invoker/HttpServiceMethodKotlinTests.kt +++ b/spring-web/src/test/kotlin/org/springframework/web/service/invoker/HttpServiceMethodKotlinTests.kt @@ -56,6 +56,10 @@ class KotlinHttpServiceMethodTests { assertThat(flowBody.toList()).containsExactly("exchange", "For", "Body", "Flux") verifyClientInvocation("exchangeForBodyFlux", object : ParameterizedTypeReference() {}) + val suspendingFlowBody = service.suspendingFlowBody() + assertThat(suspendingFlowBody.toList()).containsExactly("exchange", "For", "Body", "Flux") + verifyClientInvocation("exchangeForBodyFlux", object : ParameterizedTypeReference() {}) + val stringEntity = service.stringEntity() assertThat(stringEntity).isEqualTo(ResponseEntity.ok("exchangeForEntityMono")) verifyClientInvocation("exchangeForEntityMono", object : ParameterizedTypeReference() {}) @@ -127,6 +131,9 @@ class KotlinHttpServiceMethodTests { @GetExchange suspend fun listBody(): MutableList + @GetExchange + suspend fun suspendingFlowBody(): Flow + @GetExchange suspend fun stringEntity(): ResponseEntity