2424import io .serverlessworkflow .api .types .Input ;
2525import io .serverlessworkflow .api .types .Output ;
2626import io .serverlessworkflow .api .types .TaskBase ;
27+ import io .serverlessworkflow .api .types .TaskTimeout ;
28+ import io .serverlessworkflow .api .types .Timeout ;
2729import io .serverlessworkflow .api .types .Workflow ;
2830import io .serverlessworkflow .impl .TaskContext ;
2931import io .serverlessworkflow .impl .WorkflowApplication ;
3032import io .serverlessworkflow .impl .WorkflowContext ;
3133import io .serverlessworkflow .impl .WorkflowDefinition ;
34+ import io .serverlessworkflow .impl .WorkflowError ;
35+ import io .serverlessworkflow .impl .WorkflowException ;
3236import io .serverlessworkflow .impl .WorkflowFilter ;
3337import io .serverlessworkflow .impl .WorkflowModel ;
3438import io .serverlessworkflow .impl .WorkflowMutablePosition ;
3539import io .serverlessworkflow .impl .WorkflowPosition ;
3640import io .serverlessworkflow .impl .WorkflowPredicate ;
3741import io .serverlessworkflow .impl .WorkflowStatus ;
42+ import io .serverlessworkflow .impl .WorkflowUtils ;
43+ import io .serverlessworkflow .impl .WorkflowValueResolver ;
3844import io .serverlessworkflow .impl .lifecycle .TaskCancelledEvent ;
3945import io .serverlessworkflow .impl .lifecycle .TaskCompletedEvent ;
4046import io .serverlessworkflow .impl .lifecycle .TaskFailedEvent ;
4147import io .serverlessworkflow .impl .lifecycle .TaskRetriedEvent ;
4248import io .serverlessworkflow .impl .lifecycle .TaskStartedEvent ;
4349import io .serverlessworkflow .impl .resources .ResourceLoader ;
4450import io .serverlessworkflow .impl .schema .SchemaValidator ;
51+ import java .time .Duration ;
4552import java .time .Instant ;
4653import java .util .Iterator ;
4754import java .util .Map ;
55+ import java .util .Objects ;
4856import java .util .Optional ;
4957import java .util .concurrent .CancellationException ;
5058import java .util .concurrent .CompletableFuture ;
5159import java .util .concurrent .CompletionException ;
60+ import java .util .concurrent .TimeUnit ;
5261
5362public abstract class AbstractTaskExecutor <T extends TaskBase > implements TaskExecutor <T > {
5463
@@ -62,6 +71,7 @@ public abstract class AbstractTaskExecutor<T extends TaskBase> implements TaskEx
6271 private final Optional <SchemaValidator > outputSchemaValidator ;
6372 private final Optional <SchemaValidator > contextSchemaValidator ;
6473 private final Optional <WorkflowPredicate > ifFilter ;
74+ private final Optional <WorkflowValueResolver <Duration >> timeout ;
6575
6676 public abstract static class AbstractTaskExecutorBuilder <
6777 T extends TaskBase , V extends AbstractTaskExecutor <T >>
@@ -80,6 +90,7 @@ public abstract static class AbstractTaskExecutorBuilder<
8090 protected final Workflow workflow ;
8191 protected final ResourceLoader resourceLoader ;
8292 private final WorkflowDefinition definition ;
93+ private final Optional <WorkflowValueResolver <Duration >> timeout ;
8394
8495 private V instance ;
8596
@@ -113,6 +124,28 @@ protected AbstractTaskExecutorBuilder(
113124 getSchemaValidator (application .validatorFactory (), resourceLoader , export .getSchema ());
114125 }
115126 this .ifFilter = application .expressionFactory ().buildIfFilter (task );
127+ this .timeout = getTaskTimeout ();
128+ }
129+
130+ private Optional <WorkflowValueResolver <Duration >> getTaskTimeout () {
131+ TaskTimeout timeout = task .getTimeout ();
132+ if (timeout == null ) {
133+ return Optional .empty ();
134+ }
135+ Timeout timeoutDef = timeout .getTaskTimeoutDefinition ();
136+ if (timeoutDef == null && timeout .getTaskTimeoutReference () != null ) {
137+ timeoutDef =
138+ Objects .requireNonNull (
139+ Objects .requireNonNull (
140+ workflow .getUse ().getTimeouts (),
141+ "Timeout reference "
142+ + timeout .getTaskTimeoutReference ()
143+ + " specified, but use timeout was not defined" )
144+ .getAdditionalProperties ()
145+ .get (timeout .getTaskTimeoutReference ()),
146+ "Timeout reference " + timeout .getTaskTimeoutReference () + "cannot be found" );
147+ }
148+ return Optional .of (WorkflowUtils .fromTimeoutAfter (application , timeoutDef .getAfter ()));
116149 }
117150
118151 protected final TransitionInfoBuilder next (
@@ -171,6 +204,7 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {
171204 this .outputSchemaValidator = builder .outputSchemaValidator ;
172205 this .contextSchemaValidator = builder .contextSchemaValidator ;
173206 this .ifFilter = builder .ifFilter ;
207+ this .timeout = builder .timeout ;
174208 }
175209
176210 protected final CompletableFuture <TaskContext > executeNext (
@@ -200,7 +234,7 @@ public CompletableFuture<TaskContext> apply(
200234 } else if (taskContext .isCompleted ()) {
201235 return executeNext (completable , workflowContext );
202236 } else if (ifFilter .map (f -> f .test (workflowContext , taskContext , input )).orElse (true )) {
203- return executeNext (
237+ completable =
204238 completable
205239 .thenCompose (workflowContext .instance ()::suspendedCheck )
206240 .thenApply (
@@ -247,8 +281,26 @@ public CompletableFuture<TaskContext> apply(
247281 l .onTaskCompleted (
248282 new TaskCompletedEvent (workflowContext , taskContext )));
249283 return t ;
250- }),
251- workflowContext );
284+ });
285+ if (timeout .isPresent ()) {
286+ completable =
287+ completable
288+ .orTimeout (
289+ timeout
290+ .map (t -> t .apply (workflowContext , taskContext , input ))
291+ .orElseThrow ()
292+ .toMillis (),
293+ TimeUnit .MILLISECONDS )
294+ .exceptionallyCompose (
295+ e ->
296+ CompletableFuture .failedFuture (
297+ new WorkflowException (
298+ WorkflowError .timeout ()
299+ .instance (taskContext .position ().jsonPointer ())
300+ .build (),
301+ e )));
302+ }
303+ return executeNext (completable , workflowContext );
252304 } else {
253305 taskContext .transition (getSkipTransition ());
254306 return executeNext (completable , workflowContext );
0 commit comments