From ae0f29ff53599a184197e5676ce347ba5f53f262 Mon Sep 17 00:00:00 2001 From: Dmitrii Tikhomirov Date: Mon, 27 Oct 2025 18:21:31 -0700 Subject: [PATCH 1/4] Add initial RunContainer Task support Signed-off-by: Dmitrii Tikhomirov Signed-off-by: Dmitrii Tikhomirov --- impl/container/pom.xml | 33 ++++ .../executors/CommandPropertySetter.java | 33 ++++ .../ContainerEnvironmentPropertySetter.java | 49 +++++ .../executors/ContainerPropertySetter.java | 31 ++++ .../container/executors/ContainerRunner.java | 174 ++++++++++++++++++ .../executors/LifetimePropertySetter.java | 43 +++++ .../executors/NamePropertySetter.java | 34 ++++ .../executors/PortsPropertySetter.java | 51 +++++ .../executors/RunContainerExecutor.java | 51 +++++ .../executors/StringExpressionResolver.java | 47 +++++ .../executors/VolumesPropertySetter.java | 51 +++++ ...erlessworkflow.impl.executors.RunnableTask | 1 + impl/pom.xml | 17 ++ impl/test/pom.xml | 4 + .../impl/test/ContainerTest.java | 42 +++++ .../container/container.yaml | 30 +++ 16 files changed, 691 insertions(+) create mode 100644 impl/container/pom.xml create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java create mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java create mode 100644 impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java create mode 100644 impl/test/src/test/resources/workflows-samples/container/container.yaml diff --git a/impl/container/pom.xml b/impl/container/pom.xml new file mode 100644 index 00000000..30b7bb53 --- /dev/null +++ b/impl/container/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-container + Serverless Workflow :: Impl :: OpenAPI + + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + io.serverlessworkflow + serverlessworkflow-types + + + com.github.docker-java + docker-java-core + + + com.github.docker-java + docker-java-transport-httpclient5 + + + + \ No newline at end of file diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java new file mode 100644 index 00000000..44eb5cd4 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.api.types.Container; + +class CommandPropertySetter extends ContainerPropertySetter { + + CommandPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { + super(createContainerCmd, configuration); + } + + @Override + public void accept(StringExpressionResolver resolver) { + if (configuration.getCommand() != null && !configuration.getCommand().isEmpty()) { + createContainerCmd.withCmd("sh", "-c", configuration.getCommand()); + } + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java new file mode 100644 index 00000000..a4df76f8 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java @@ -0,0 +1,49 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.api.types.Container; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +class ContainerEnvironmentPropertySetter extends ContainerPropertySetter { + + ContainerEnvironmentPropertySetter( + CreateContainerCmd createContainerCmd, Container configuration) { + super(createContainerCmd, configuration); + } + + @Override + public void accept(StringExpressionResolver resolver) { + if (!(configuration.getEnvironment() == null + || configuration.getEnvironment().getAdditionalProperties() == null)) { + List envs = new ArrayList<>(); + for (Map.Entry entry : + configuration.getEnvironment().getAdditionalProperties().entrySet()) { + String key = entry.getKey(); + if (entry.getValue() instanceof String value) { + String resolvedValue = resolver.resolve(value); + envs.add(key + "=" + resolvedValue); + } else { + throw new IllegalArgumentException("Environment variable values must be strings"); + } + } + createContainerCmd.withEnv(envs.toArray(new String[0])); + } + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java new file mode 100644 index 00000000..6a5d7055 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.api.types.Container; +import java.util.function.Consumer; + +abstract class ContainerPropertySetter implements Consumer { + + protected final CreateContainerCmd createContainerCmd; + protected final Container configuration; + + ContainerPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { + this.createContainerCmd = createContainerCmd; + this.configuration = configuration; + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java new file mode 100644 index 00000000..cf785964 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java @@ -0,0 +1,174 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import static io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy.*; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.CreateContainerCmd; +import com.github.dockerjava.api.command.CreateContainerResponse; +import com.github.dockerjava.api.command.WaitContainerResultCallback; +import com.github.dockerjava.api.exception.DockerClientException; +import com.github.dockerjava.core.DefaultDockerClientConfig; +import com.github.dockerjava.core.DockerClientImpl; +import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +class ContainerRunner { + + private static final DefaultDockerClientConfig DEFAULT_CONFIG = + DefaultDockerClientConfig.createDefaultConfigBuilder().build(); + + private static final DockerClient dockerClient = + DockerClientImpl.getInstance( + DEFAULT_CONFIG, + new ApacheDockerHttpClient.Builder().dockerHost(DEFAULT_CONFIG.getDockerHost()).build()); + + private final CreateContainerCmd createContainerCmd; + private final Container container; + private final List propertySetters; + private final WorkflowDefinition definition; + + private ContainerRunner( + CreateContainerCmd createContainerCmd, WorkflowDefinition definition, Container container) { + this.createContainerCmd = createContainerCmd; + this.definition = definition; + this.container = container; + this.propertySetters = new ArrayList<>(); + } + + /** + * Blocking container execution according to the lifetime policy. Returns an already completed + * CompletableFuture: - completedFuture(input) if exitCode == 0 - exceptionally completed if the + * exit code is non-zero or an error occurs. The method blocks the calling thread until the + * container finishes or the timeout expires. + */ + CompletableFuture startSync( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + + StringExpressionResolver resolver = + new StringExpressionResolver(workflowContext, taskContext, input); + + propertySetters.forEach(setter -> setter.accept(resolver)); + + CreateContainerResponse createContainerResponse = createContainerCmd.exec(); + String containerId = createContainerResponse.getId(); + + if (containerId == null || containerId.isEmpty()) { + return failed("Container creation failed: empty container ID"); + } + + dockerClient.startContainerCmd(containerId).exec(); + + int exitCode; + try (WaitContainerResultCallback resultCallback = + dockerClient.waitContainerCmd(containerId).exec(new WaitContainerResultCallback())) { + if (container.getLifetime() != null + && container.getLifetime().getCleanup() != null + && container.getLifetime().getCleanup().equals(EVENTUALLY)) { + try { + WorkflowValueResolver durationResolver = + WorkflowUtils.fromTimeoutAfter( + definition.application(), container.getLifetime().getAfter()); + Duration timeout = durationResolver.apply(workflowContext, taskContext, input); + exitCode = resultCallback.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (DockerClientException e) { + return failed( + String.format("Error while waiting for container to finish: %s ", e.getMessage())); + } finally { + dockerClient.removeContainerCmd(containerId).withForce(true).exec(); + } + } else { + exitCode = resultCallback.awaitStatusCode(); + } + } catch (IOException e) { + return failed( + String.format("Error while waiting for container to finish: %s ", e.getMessage())); + } + + return switch (exitCode) { + case 0 -> CompletableFuture.completedFuture(input); + case 1 -> failed("General error (exit code 1)"); + case 2 -> failed("Shell syntax error (exit code 2)"); + case 126 -> failed("Command found but not executable (exit code 126)"); + case 127 -> failed("Command not found (exit code 127)"); + case 130 -> failed("Interrupted by SIGINT (exit code 130)"); + case 137 -> failed("Killed by SIGKILL (exit code 137)"); + case 139 -> failed("Segmentation fault (exit code 139)"); + case 143 -> failed("Terminated by SIGTERM (exit code 143)"); + default -> failed("Process exited with code " + exitCode); + }; + } + + private static CompletableFuture failed(String message) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new RuntimeException(message)); + return f; + } + + static ContainerRunnerBuilder builder() { + return new ContainerRunnerBuilder(); + } + + public static class ContainerRunnerBuilder { + private Container container = null; + private WorkflowDefinition workflowDefinition; + + private ContainerRunnerBuilder() {} + + ContainerRunnerBuilder withContainer(Container container) { + this.container = container; + return this; + } + + public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) { + this.workflowDefinition = definition; + return this; + } + + ContainerRunner build() { + if (container.getImage() == null || container.getImage().isEmpty()) { + throw new IllegalArgumentException("Container image must be provided"); + } + + CreateContainerCmd createContainerCmd = dockerClient.createContainerCmd(container.getImage()); + + ContainerRunner runner = + new ContainerRunner(createContainerCmd, workflowDefinition, container); + + runner.propertySetters.add(new CommandPropertySetter(createContainerCmd, container)); + runner.propertySetters.add( + new ContainerEnvironmentPropertySetter(createContainerCmd, container)); + runner.propertySetters.add(new NamePropertySetter(createContainerCmd, container)); + runner.propertySetters.add(new PortsPropertySetter(createContainerCmd, container)); + runner.propertySetters.add(new VolumesPropertySetter(createContainerCmd, container)); + runner.propertySetters.add(new LifetimePropertySetter(createContainerCmd, container)); + return runner; + } + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java new file mode 100644 index 00000000..b254d9d2 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import static io.serverlessworkflow.api.types.ContainerLifetime.*; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.api.types.ContainerLifetime; + +class LifetimePropertySetter extends ContainerPropertySetter { + + LifetimePropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { + super(createContainerCmd, configuration); + } + + @Override + public void accept(StringExpressionResolver resolver) { + // case of cleanup=eventually processed at ContainerRunner + if (configuration.getLifetime() != null) { + ContainerLifetime lifetime = configuration.getLifetime(); + ContainerCleanupPolicy cleanupPolicy = lifetime.getCleanup(); + if (cleanupPolicy.equals(ContainerCleanupPolicy.ALWAYS)) { + createContainerCmd.getHostConfig().withAutoRemove(true); + } else if (cleanupPolicy.equals(ContainerCleanupPolicy.NEVER)) { + createContainerCmd.getHostConfig().withAutoRemove(false); + } + } + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java new file mode 100644 index 00000000..52878196 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.api.types.Container; + +class NamePropertySetter extends ContainerPropertySetter { + + NamePropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { + super(createContainerCmd, configuration); + } + + @Override + public void accept(StringExpressionResolver resolver) { + if (configuration.getName() != null && !configuration.getName().isEmpty()) { + String resolvedName = resolver.resolve(configuration.getName()); + createContainerCmd.withName(resolvedName); + } + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java new file mode 100644 index 00000000..b13da8e0 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.Ports; +import io.serverlessworkflow.api.types.Container; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +class PortsPropertySetter extends ContainerPropertySetter { + + PortsPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { + super(createContainerCmd, configuration); + } + + @Override + public void accept(StringExpressionResolver resolver) { + if (configuration.getPorts() != null + && configuration.getPorts().getAdditionalProperties() != null) { + Ports portBindings = new Ports(); + List exposed = new ArrayList<>(); + + for (Map.Entry entry : + configuration.getPorts().getAdditionalProperties().entrySet()) { + int hostPort = Integer.parseInt(entry.getKey()); + int containerPort = Integer.parseInt(entry.getValue().toString()); + ExposedPort exposedPort = ExposedPort.tcp(containerPort); + portBindings.bind(exposedPort, Ports.Binding.bindPort(hostPort)); + exposed.add(exposedPort); + } + createContainerCmd.withExposedPorts(exposed.toArray(new ExposedPort[0])); + createContainerCmd.getHostConfig().withPortBindings(portBindings); + } + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java new file mode 100644 index 00000000..0e40b891 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.api.types.RunContainer; +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.executors.RunnableTask; +import java.util.concurrent.CompletableFuture; + +public class RunContainerExecutor implements RunnableTask { + + private ContainerRunner containerRunner; + + public void init(RunContainer taskConfiguration, WorkflowDefinition definition) { + Container container = taskConfiguration.getContainer(); + containerRunner = + ContainerRunner.builder() + .withContainer(container) + .withWorkflowDefinition(definition) + .build(); + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + return containerRunner.startSync(workflowContext, taskContext, input); + } + + @Override + public boolean accept(Class clazz) { + return RunContainer.class.equals(clazz); + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java new file mode 100644 index 00000000..ce2ee2df --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java @@ -0,0 +1,47 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.expressions.ExpressionUtils; + +class StringExpressionResolver { + + private final WorkflowContext workflowContext; + private final TaskContext taskContext; + private final WorkflowModel input; + + StringExpressionResolver( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + this.workflowContext = workflowContext; + this.taskContext = taskContext; + this.input = input; + } + + String resolve(String expression) { + if (ExpressionUtils.isExpr(expression)) { + WorkflowUtils.buildStringResolver( + workflowContext.definition().application(), + expression, + taskContext.input().asJavaObject()) + .apply(workflowContext, taskContext, input); + } + return expression; + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java new file mode 100644 index 00000000..5effe641 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import com.github.dockerjava.api.model.Bind; +import com.github.dockerjava.api.model.Volume; +import io.serverlessworkflow.api.types.Container; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +class VolumesPropertySetter extends ContainerPropertySetter { + + VolumesPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { + super(createContainerCmd, configuration); + } + + @Override + public void accept(StringExpressionResolver resolver) { + if (configuration.getVolumes() != null + && configuration.getVolumes().getAdditionalProperties() != null) { + List binds = new ArrayList<>(); + for (Map.Entry entry : + configuration.getVolumes().getAdditionalProperties().entrySet()) { + String hostPath = entry.getKey(); + if (entry.getValue() instanceof String containerPath) { + String resolvedHostPath = resolver.resolve(hostPath); + String resolvedContainerPath = resolver.resolve(containerPath); + binds.add(new Bind(resolvedHostPath, new Volume(resolvedContainerPath))); + } else { + throw new IllegalArgumentException("Volume container paths must be strings"); + } + } + createContainerCmd.getHostConfig().withBinds(binds); + } + } +} diff --git a/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask new file mode 100644 index 00000000..c1450d21 --- /dev/null +++ b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask @@ -0,0 +1 @@ +io.serverlessworkflow.impl.container.executors.RunContainerExecutor \ No newline at end of file diff --git a/impl/pom.xml b/impl/pom.xml index 00d900f9..890a0edb 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -14,6 +14,7 @@ 4.0.0 1.6.0 3.1.11 + 3.6.0 @@ -92,6 +93,11 @@ serverlessworkflow-impl-openapi ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-container + ${project.version} + net.thisptr jackson-jq @@ -124,6 +130,16 @@ h2-mvstore ${version.com.h2database} + + com.github.docker-java + docker-java-core + ${version.docker.java} + + + com.github.docker-java + docker-java-transport-httpclient5 + ${version.docker.java} + @@ -139,6 +155,7 @@ model lifecycleevent validation + container test diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 7672bbef..f4e846cb 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -41,6 +41,10 @@ io.serverlessworkflow serverlessworkflow-impl-openapi + + io.serverlessworkflow + serverlessworkflow-impl-container + org.glassfish.jersey.core jersey-client diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java new file mode 100644 index 00000000..45bf8c28 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.io.IOException; +import java.util.Map; +import org.junit.jupiter.api.Test; + +public class ContainerTest { + + @Test + public void testContainer() throws IOException { + Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container.yaml"); + Map result; + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + result = + app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); + } catch (Exception e) { + throw new RuntimeException("Workflow execution failed", e); + } + + assertNotNull(result); + } +} diff --git a/impl/test/src/test/resources/workflows-samples/container/container.yaml b/impl/test/src/test/resources/workflows-samples/container/container.yaml new file mode 100644 index 00000000..07a87fe1 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container.yaml @@ -0,0 +1,30 @@ +document: + dsl: '1.0.2' + namespace: test + name: run-example + version: '0.1.0' +do: + - runContainer: + run: + container: + image: alpine:latest + #command: echo Hello World + #command: printenv + #command: sleep 30 + command: "ls -la /treblereel" + name: hello-world + ports: + 8880: 8880 + 8881: 8881 + 8882: 8882 + lifetime: + #cleanup: never + cleanup: never + #cleanup: eventually + #after: + # seconds: 100 + environment: + FOO: BAR + BAR: FOO + volumes: + "/Users/treblereel/temp": "/treblereel" From 0f73a9e6582fb49d9e75b696a2c47658920261e0 Mon Sep 17 00:00:00 2001 From: Dmitrii Tikhomirov Date: Mon, 3 Nov 2025 17:36:42 -0800 Subject: [PATCH 2/4] image pull before run Signed-off-by: Dmitrii Tikhomirov --- .../container/executors/ContainerRunner.java | 146 +++++++++++++----- 1 file changed, 110 insertions(+), 36 deletions(-) diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java index cf785964..8530c9cc 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java @@ -15,15 +15,16 @@ */ package io.serverlessworkflow.impl.container.executors; -import static io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy.*; +import static io.serverlessworkflow.api.types.ContainerLifetime.*; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.CreateContainerCmd; import com.github.dockerjava.api.command.CreateContainerResponse; +import com.github.dockerjava.api.command.PullImageResultCallback; import com.github.dockerjava.api.command.WaitContainerResultCallback; -import com.github.dockerjava.api.exception.DockerClientException; import com.github.dockerjava.core.DefaultDockerClientConfig; import com.github.dockerjava.core.DockerClientImpl; +import com.github.dockerjava.core.NameParser; import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; import io.serverlessworkflow.api.types.Container; import io.serverlessworkflow.impl.TaskContext; @@ -32,7 +33,6 @@ import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -71,48 +71,122 @@ private ContainerRunner( CompletableFuture startSync( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - StringExpressionResolver resolver = - new StringExpressionResolver(workflowContext, taskContext, input); + try { + var resolver = new StringExpressionResolver(workflowContext, taskContext, input); + applyPropertySetters(resolver); + pullImageIfNeeded(container.getImage()); - propertySetters.forEach(setter -> setter.accept(resolver)); + String id = createAndStartContainer(); + int exit = waitAccordingToLifetime(id, workflowContext, taskContext, input); - CreateContainerResponse createContainerResponse = createContainerCmd.exec(); - String containerId = createContainerResponse.getId(); + return mapExitCode(exit, input); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return failed("Interrupted while waiting for container"); + } catch (Exception e) { + return failed("Container run failed: " + e.getMessage()); + } + } + + private void applyPropertySetters(StringExpressionResolver resolver) { + for (var setter : propertySetters) setter.accept(resolver); + } - if (containerId == null || containerId.isEmpty()) { - return failed("Container creation failed: empty container ID"); + private void pullImageIfNeeded(String imageRef) throws InterruptedException { + NameParser.ReposTag rt = NameParser.parseRepositoryTag(imageRef); + NameParser.HostnameReposName hr = NameParser.resolveRepositoryName(imageRef); + + String repository = hr.reposName; + String tag = rt.tag != null && rt.tag.isEmpty() ? rt.tag : "latest"; + dockerClient + .pullImageCmd(repository) + .withTag(tag) + .exec(new PullImageResultCallback()) + .awaitCompletion(); + } + + private String createAndStartContainer() { + CreateContainerResponse resp = createContainerCmd.exec(); + String id = resp.getId(); + if (id == null || id.isEmpty()) { + throw new IllegalStateException("Container creation failed: empty ID"); } + dockerClient.startContainerCmd(id).exec(); + return id; + } - dockerClient.startContainerCmd(containerId).exec(); - - int exitCode; - try (WaitContainerResultCallback resultCallback = - dockerClient.waitContainerCmd(containerId).exec(new WaitContainerResultCallback())) { - if (container.getLifetime() != null - && container.getLifetime().getCleanup() != null - && container.getLifetime().getCleanup().equals(EVENTUALLY)) { - try { - WorkflowValueResolver durationResolver = - WorkflowUtils.fromTimeoutAfter( - definition.application(), container.getLifetime().getAfter()); - Duration timeout = durationResolver.apply(workflowContext, taskContext, input); - exitCode = resultCallback.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS); - } catch (DockerClientException e) { - return failed( - String.format("Error while waiting for container to finish: %s ", e.getMessage())); - } finally { - dockerClient.removeContainerCmd(containerId).withForce(true).exec(); + private int waitAccordingToLifetime( + String id, WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) + throws Exception { + + var lifetime = container.getLifetime(); + var policy = lifetime != null ? lifetime.getCleanup() : null; + + try (var cb = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) { + + if (policy == ContainerCleanupPolicy.EVENTUALLY) { + Duration timeout = resolveAfter(lifetime, workflowContext, taskContext, input); + int exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS); + + if (isRunning(id)) { + safeStop(id, Duration.ofSeconds(10)); } + safeRemove(id); + return exit; + } else { - exitCode = resultCallback.awaitStatusCode(); + int exit = cb.awaitStatusCode(); + if (policy == ContainerCleanupPolicy.ALWAYS) { + safeRemove(id); + } + return exit; } - } catch (IOException e) { - return failed( - String.format("Error while waiting for container to finish: %s ", e.getMessage())); } + } + + private Duration resolveAfter( + io.serverlessworkflow.api.types.ContainerLifetime lifetime, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel input) { + + if (lifetime == null || lifetime.getAfter() == null) { + return Duration.ZERO; + } + WorkflowValueResolver r = + WorkflowUtils.fromTimeoutAfter(definition.application(), lifetime.getAfter()); + return r.apply(workflowContext, taskContext, input); + } + + private boolean isRunning(String id) { + try { + var st = dockerClient.inspectContainerCmd(id).exec().getState(); + return st != null && Boolean.TRUE.equals(st.getRunning()); + } catch (Exception e) { + return false; // must be already removed + } + } + + private void safeStop(String id, Duration timeout) { + try { + dockerClient.stopContainerCmd(id).withTimeout((int) Math.max(1, timeout.toSeconds())).exec(); + } catch (Exception ignore) { + // we can ignore this + } + } + + // must be removed because of withAutoRemove(true), but just in case + private void safeRemove(String id) { + try { + dockerClient.removeContainerCmd(id).withForce(true).exec(); + } catch (Exception ignore) { + // we can ignore this + } + } - return switch (exitCode) { - case 0 -> CompletableFuture.completedFuture(input); + private static CompletableFuture mapExitCode(int exit, T ok) { + return switch (exit) { + case 0 -> CompletableFuture.completedFuture(ok); case 1 -> failed("General error (exit code 1)"); case 2 -> failed("Shell syntax error (exit code 2)"); case 126 -> failed("Command found but not executable (exit code 126)"); @@ -121,7 +195,7 @@ CompletableFuture startSync( case 137 -> failed("Killed by SIGKILL (exit code 137)"); case 139 -> failed("Segmentation fault (exit code 139)"); case 143 -> failed("Terminated by SIGTERM (exit code 143)"); - default -> failed("Process exited with code " + exitCode); + default -> failed("Process exited with code " + exit); }; } From 40bb765414020d7678871df8e2ee4bb8f7a1ed8d Mon Sep 17 00:00:00 2001 From: Dmitrii Tikhomirov Date: Tue, 4 Nov 2025 16:37:57 -0800 Subject: [PATCH 3/4] refactoring + tests Signed-off-by: Dmitrii Tikhomirov --- .../executors/CommandPropertySetter.java | 3 +- .../ContainerEnvironmentPropertySetter.java | 5 +- .../executors/ContainerPropertySetter.java | 3 +- .../container/executors/ContainerRunner.java | 86 ++++---- .../executors/LifetimePropertySetter.java | 3 +- .../executors/NamePropertySetter.java | 5 +- .../executors/PortsPropertySetter.java | 3 +- .../executors/RunContainerExecutor.java | 2 +- .../executors/StringExpressionResolver.java | 7 +- .../executors/VolumesPropertySetter.java | 7 +- .../impl/test/ContainerTest.java | 204 +++++++++++++++++- .../container/container-cleanup-default.yaml | 12 ++ .../container/container-cleanup.yaml | 14 ++ .../container/container-env.yaml | 17 ++ .../container/container-ports.yaml | 18 ++ .../container/container-test-command.yaml | 14 ++ .../container/container-timeout.yaml | 16 ++ .../container/container.yaml | 30 --- 18 files changed, 363 insertions(+), 86 deletions(-) create mode 100644 impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/container/container-env.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/container/container-ports.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml delete mode 100644 impl/test/src/test/resources/workflows-samples/container/container.yaml diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java index 44eb5cd4..7c125c72 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java @@ -17,6 +17,7 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; +import java.util.function.Function; class CommandPropertySetter extends ContainerPropertySetter { @@ -25,7 +26,7 @@ class CommandPropertySetter extends ContainerPropertySetter { } @Override - public void accept(StringExpressionResolver resolver) { + public void accept(Function resolver) { if (configuration.getCommand() != null && !configuration.getCommand().isEmpty()) { createContainerCmd.withCmd("sh", "-c", configuration.getCommand()); } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java index a4df76f8..18a4dc85 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; class ContainerEnvironmentPropertySetter extends ContainerPropertySetter { @@ -29,7 +30,7 @@ class ContainerEnvironmentPropertySetter extends ContainerPropertySetter { } @Override - public void accept(StringExpressionResolver resolver) { + public void accept(Function resolver) { if (!(configuration.getEnvironment() == null || configuration.getEnvironment().getAdditionalProperties() == null)) { List envs = new ArrayList<>(); @@ -37,7 +38,7 @@ public void accept(StringExpressionResolver resolver) { configuration.getEnvironment().getAdditionalProperties().entrySet()) { String key = entry.getKey(); if (entry.getValue() instanceof String value) { - String resolvedValue = resolver.resolve(value); + String resolvedValue = resolver.apply(value); envs.add(key + "=" + resolvedValue); } else { throw new IllegalArgumentException("Environment variable values must be strings"); diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java index 6a5d7055..44b62ebe 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java @@ -18,8 +18,9 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; import java.util.function.Consumer; +import java.util.function.Function; -abstract class ContainerPropertySetter implements Consumer { +abstract class ContainerPropertySetter implements Consumer> { protected final CreateContainerCmd createContainerCmd; protected final Container configuration; diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java index 8530c9cc..8fb8d65f 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java @@ -20,8 +20,9 @@ import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.CreateContainerCmd; import com.github.dockerjava.api.command.CreateContainerResponse; -import com.github.dockerjava.api.command.PullImageResultCallback; import com.github.dockerjava.api.command.WaitContainerResultCallback; +import com.github.dockerjava.api.exception.DockerClientException; +import com.github.dockerjava.api.exception.NotFoundException; import com.github.dockerjava.core.DefaultDockerClientConfig; import com.github.dockerjava.core.DockerClientImpl; import com.github.dockerjava.core.NameParser; @@ -38,6 +39,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; class ContainerRunner { @@ -62,15 +64,15 @@ private ContainerRunner( this.propertySetters = new ArrayList<>(); } - /** - * Blocking container execution according to the lifetime policy. Returns an already completed - * CompletableFuture: - completedFuture(input) if exitCode == 0 - exceptionally completed if the - * exit code is non-zero or an error occurs. The method blocks the calling thread until the - * container finishes or the timeout expires. - */ - CompletableFuture startSync( + CompletableFuture start( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + return CompletableFuture.supplyAsync( + () -> startSync(workflowContext, taskContext, input), + definition.application().executorService()); + } + private WorkflowModel startSync( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { try { var resolver = new StringExpressionResolver(workflowContext, taskContext, input); applyPropertySetters(resolver); @@ -78,18 +80,20 @@ CompletableFuture startSync( String id = createAndStartContainer(); int exit = waitAccordingToLifetime(id, workflowContext, taskContext, input); - - return mapExitCode(exit, input); + if (exit == 0) { + return input; + } + throw mapExitCode(exit); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); - return failed("Interrupted while waiting for container"); + throw failed("Container execution failed with exit code " + ie.getMessage()); } catch (Exception e) { - return failed("Container run failed: " + e.getMessage()); + throw failed("Container execution failed with exit code " + e.getMessage()); } } - private void applyPropertySetters(StringExpressionResolver resolver) { - for (var setter : propertySetters) setter.accept(resolver); + private void applyPropertySetters(Function resolver) { + propertySetters.forEach(setter -> setter.accept(resolver)); } private void pullImageIfNeeded(String imageRef) throws InterruptedException { @@ -97,12 +101,8 @@ private void pullImageIfNeeded(String imageRef) throws InterruptedException { NameParser.HostnameReposName hr = NameParser.resolveRepositoryName(imageRef); String repository = hr.reposName; - String tag = rt.tag != null && rt.tag.isEmpty() ? rt.tag : "latest"; - dockerClient - .pullImageCmd(repository) - .withTag(tag) - .exec(new PullImageResultCallback()) - .awaitCompletion(); + String tag = (rt.tag == null || rt.tag.isBlank()) ? "latest" : rt.tag; + dockerClient.pullImageCmd(repository).withTag(tag).start().awaitCompletion(); } private String createAndStartContainer() { @@ -123,25 +123,22 @@ private int waitAccordingToLifetime( var policy = lifetime != null ? lifetime.getCleanup() : null; try (var cb = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) { - if (policy == ContainerCleanupPolicy.EVENTUALLY) { Duration timeout = resolveAfter(lifetime, workflowContext, taskContext, input); - int exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS); - - if (isRunning(id)) { - safeStop(id, Duration.ofSeconds(10)); + try { + Integer exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS); + safeStop(id); + return exit != null ? exit : 0; + } catch (DockerClientException timeoutOrOther) { + safeStop(id); } - safeRemove(id); - return exit; - } else { - int exit = cb.awaitStatusCode(); - if (policy == ContainerCleanupPolicy.ALWAYS) { - safeRemove(id); - } - return exit; + return cb.awaitStatusCode(); } + } catch (NotFoundException e) { + // container already removed } + return 0; } private Duration resolveAfter( @@ -167,6 +164,20 @@ private boolean isRunning(String id) { } } + private void safeStop(String id) { + if (isRunning(id)) { + safeStop(id, Duration.ofSeconds(10)); + try (var cb2 = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) { + cb2.awaitStatusCode(); + safeRemove(id); + } catch (Exception ignore) { + // we can ignore this + } + } else { + safeRemove(id); + } + } + private void safeStop(String id, Duration timeout) { try { dockerClient.stopContainerCmd(id).withTimeout((int) Math.max(1, timeout.toSeconds())).exec(); @@ -184,9 +195,8 @@ private void safeRemove(String id) { } } - private static CompletableFuture mapExitCode(int exit, T ok) { + private static Exception mapExitCode(int exit) { return switch (exit) { - case 0 -> CompletableFuture.completedFuture(ok); case 1 -> failed("General error (exit code 1)"); case 2 -> failed("Shell syntax error (exit code 2)"); case 126 -> failed("Command found but not executable (exit code 126)"); @@ -199,10 +209,8 @@ private static CompletableFuture mapExitCode(int exit, T ok) { }; } - private static CompletableFuture failed(String message) { - CompletableFuture f = new CompletableFuture<>(); - f.completeExceptionally(new RuntimeException(message)); - return f; + private static RuntimeException failed(String message) { + return new RuntimeException(message); } static ContainerRunnerBuilder builder() { diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java index b254d9d2..52454778 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java @@ -20,6 +20,7 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; import io.serverlessworkflow.api.types.ContainerLifetime; +import java.util.function.Function; class LifetimePropertySetter extends ContainerPropertySetter { @@ -28,7 +29,7 @@ class LifetimePropertySetter extends ContainerPropertySetter { } @Override - public void accept(StringExpressionResolver resolver) { + public void accept(Function resolver) { // case of cleanup=eventually processed at ContainerRunner if (configuration.getLifetime() != null) { ContainerLifetime lifetime = configuration.getLifetime(); diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java index 52878196..d8c820c5 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java @@ -17,6 +17,7 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; +import java.util.function.Function; class NamePropertySetter extends ContainerPropertySetter { @@ -25,9 +26,9 @@ class NamePropertySetter extends ContainerPropertySetter { } @Override - public void accept(StringExpressionResolver resolver) { + public void accept(Function resolver) { if (configuration.getName() != null && !configuration.getName().isEmpty()) { - String resolvedName = resolver.resolve(configuration.getName()); + String resolvedName = resolver.apply(configuration.getName()); createContainerCmd.withName(resolvedName); } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java index b13da8e0..882abfa6 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; class PortsPropertySetter extends ContainerPropertySetter { @@ -30,7 +31,7 @@ class PortsPropertySetter extends ContainerPropertySetter { } @Override - public void accept(StringExpressionResolver resolver) { + public void accept(Function resolver) { if (configuration.getPorts() != null && configuration.getPorts().getAdditionalProperties() != null) { Ports portBindings = new Ports(); diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java index 0e40b891..adf7482b 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java @@ -41,7 +41,7 @@ public void init(RunContainer taskConfiguration, WorkflowDefinition definition) @Override public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - return containerRunner.startSync(workflowContext, taskContext, input); + return containerRunner.start(workflowContext, taskContext, input); } @Override diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java index ce2ee2df..e5848cee 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java @@ -20,8 +20,9 @@ import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.expressions.ExpressionUtils; +import java.util.function.Function; -class StringExpressionResolver { +class StringExpressionResolver implements Function { private final WorkflowContext workflowContext; private final TaskContext taskContext; @@ -34,9 +35,9 @@ class StringExpressionResolver { this.input = input; } - String resolve(String expression) { + public String apply(String expression) { if (ExpressionUtils.isExpr(expression)) { - WorkflowUtils.buildStringResolver( + return WorkflowUtils.buildStringResolver( workflowContext.definition().application(), expression, taskContext.input().asJavaObject()) diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java index 5effe641..bb7215ca 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; class VolumesPropertySetter extends ContainerPropertySetter { @@ -30,7 +31,7 @@ class VolumesPropertySetter extends ContainerPropertySetter { } @Override - public void accept(StringExpressionResolver resolver) { + public void accept(Function resolver) { if (configuration.getVolumes() != null && configuration.getVolumes().getAdditionalProperties() != null) { List binds = new ArrayList<>(); @@ -38,8 +39,8 @@ public void accept(StringExpressionResolver resolver) { configuration.getVolumes().getAdditionalProperties().entrySet()) { String hostPath = entry.getKey(); if (entry.getValue() instanceof String containerPath) { - String resolvedHostPath = resolver.resolve(hostPath); - String resolvedContainerPath = resolver.resolve(containerPath); + String resolvedHostPath = resolver.apply(hostPath); + String resolvedContainerPath = resolver.apply(containerPath); binds.add(new Bind(resolvedHostPath, new Volume(resolvedContainerPath))); } else { throw new IllegalArgumentException("Volume container paths must be strings"); diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java index 45bf8c28..a30c3fab 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java @@ -16,19 +16,155 @@ package io.serverlessworkflow.impl.test; import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.InspectContainerResponse; +import com.github.dockerjava.api.model.Container; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.Frame; +import com.github.dockerjava.api.model.Ports; +import com.github.dockerjava.core.DefaultDockerClientConfig; +import com.github.dockerjava.core.DockerClientImpl; +import com.github.dockerjava.core.command.LogContainerResultCallback; +import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; import java.util.Map; import org.junit.jupiter.api.Test; public class ContainerTest { + private static final DefaultDockerClientConfig DEFAULT_CONFIG = + DefaultDockerClientConfig.createDefaultConfigBuilder().build(); + + private static final DockerClient dockerClient = + DockerClientImpl.getInstance( + DEFAULT_CONFIG, + new ApacheDockerHttpClient.Builder().dockerHost(DEFAULT_CONFIG.getDockerHost()).build()); + + @Test + public void testContainer() throws IOException, InterruptedException { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-test-command.yaml"); + Map result; + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + result = + app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); + } catch (Exception e) { + throw new RuntimeException("Workflow execution failed", e); + } + + String containerName = "hello-world"; + String containerId = findContainerIdByName(containerName); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + dockerClient + .logContainerCmd(containerId) + .withStdOut(true) + .withStdErr(true) + .withTimestamps(true) + .exec( + new LogContainerResultCallback() { + @Override + public void onNext(Frame frame) { + output.writeBytes(frame.getPayload()); + } + }) + .awaitCompletion(); + + assertTrue(output.toString().contains("Hello World")); + assertNotNull(result); + dockerClient.removeContainerCmd(containerId).withForce(true).exec(); + } + + @Test + public void testContainerEnv() throws IOException, InterruptedException { + Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-env.yaml"); + + Map input = Map.of("someValue", "Tested"); + + Map result; + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + result = app.workflowDefinition(workflow).instance(input).start().get().asMap().orElseThrow(); + } catch (Exception e) { + throw new RuntimeException("Workflow execution failed", e); + } + + String containerName = "hello-world-envs"; + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + dockerClient + .logContainerCmd(findContainerIdByName(containerName)) + .withStdOut(true) + .withStdErr(true) + .withTimestamps(true) + .exec( + new LogContainerResultCallback() { + @Override + public void onNext(Frame frame) { + output.writeBytes(frame.getPayload()); + } + }) + .awaitCompletion(); + assertTrue(output.toString().contains("BAR=FOO")); + assertTrue(output.toString().contains("FOO=Tested")); + assertNotNull(result); + String containerId = findContainerIdByName(containerName); + dockerClient.removeContainerCmd(containerId).withForce(true).exec(); + } + + @Test + public void testContainerTimeout() throws IOException { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-timeout.yaml"); + + Map result; + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + result = + app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); + } catch (Exception e) { + throw new RuntimeException("Workflow execution failed", e); + } + + String containerName = "hello-world-timeout"; + String containerId = findContainerIdByName(containerName); + + assertTrue(isContainerGone(containerId)); + assertNotNull(result); + } + + @Test + public void testContainerCleanup() throws IOException { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-cleanup.yaml"); + + Map result; + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + result = + app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); + } catch (Exception e) { + throw new RuntimeException("Workflow execution failed", e); + } + + String containerName = "hello-world-cleanup"; + String containerId = findContainerIdByName(containerName); + assertTrue(isContainerGone(containerId)); + assertNotNull(result); + } + @Test - public void testContainer() throws IOException { - Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container.yaml"); + public void testContainerCleanupDefault() throws IOException { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-cleanup-default.yaml"); + Map result; try (WorkflowApplication app = WorkflowApplication.builder().build()) { result = @@ -37,6 +173,70 @@ public void testContainer() throws IOException { throw new RuntimeException("Workflow execution failed", e); } + String containerName = "hello-world-cleanup-default"; + String containerId = findContainerIdByName(containerName); + assertFalse(isContainerGone(containerId)); assertNotNull(result); + + dockerClient.removeContainerCmd(containerId).withForce(true).exec(); + } + + @Test + void testPortBindings() throws Exception { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-ports.yaml"); + + new Thread( + () -> { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + app.workflowDefinition(workflow) + .instance(Map.of()) + .start() + .get() + .asMap() + .orElseThrow(); + } catch (Exception e) { + // we can ignore exceptions here, as the workflow will end when the container is + // removed + } + }) + .start(); + + String containerName = "hello-world-ports"; + await() + .pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(10)) + .until(() -> findContainerIdByName(containerName) != null); + + String containerId = findContainerIdByName(containerName); + InspectContainerResponse inspect = dockerClient.inspectContainerCmd(containerId).exec(); + Map ports = inspect.getNetworkSettings().getPorts().getBindings(); + + assertTrue(ports.containsKey(ExposedPort.tcp(8880))); + assertTrue(ports.containsKey(ExposedPort.tcp(8881))); + assertTrue(ports.containsKey(ExposedPort.tcp(8882))); + + dockerClient.removeContainerCmd(containerId).withForce(true).exec(); + } + + private static String findContainerIdByName(String containerName) { + var containers = dockerClient.listContainersCmd().withShowAll(true).exec(); + + return containers.stream() + .filter( + c -> + c.getNames() != null + && Arrays.stream(c.getNames()).anyMatch(n -> n.equals("/" + containerName))) + .map(Container::getId) + .findFirst() + .orElse(null); + } + + private static boolean isContainerGone(String id) { + if (id == null) { + return true; + } + var containers = dockerClient.listContainersCmd().withShowAll(true).exec(); + return containers.stream().noneMatch(c -> c.getId().startsWith(id)); } } diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml new file mode 100644 index 00000000..208b1faa --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml @@ -0,0 +1,12 @@ +document: + dsl: '1.0.2' + namespace: test + name: run-example + version: '0.1.0' +do: + - runContainer: + run: + container: + image: alpine:latest + command: echo Hello World + name: hello-world-cleanup-default diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml new file mode 100644 index 00000000..b7501a75 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml @@ -0,0 +1,14 @@ +document: + dsl: '1.0.2' + namespace: test + name: run-example + version: '0.1.0' +do: + - runContainer: + run: + container: + image: alpine:latest + command: echo Hello World + name: hello-world-cleanup + lifetime: + cleanup: always \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/container/container-env.yaml b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml new file mode 100644 index 00000000..9876480b --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml @@ -0,0 +1,17 @@ +document: + dsl: '1.0.2' + namespace: test + name: run-example + version: '0.1.0' +do: + - runContainer: + run: + container: + image: alpine:latest + command: printenv + name: hello-world-envs + lifetime: + cleanup: never + environment: + FOO: ${ .someValue } + BAR: FOO diff --git a/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml new file mode 100644 index 00000000..bc330610 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: run-example + version: '0.1.0' +do: + - runContainer: + run: + container: + image: alpine:latest + command: sleep 300 + name: hello-world-ports + ports: + 8880: 8880 + 8881: 8881 + 8882: 8882 + lifetime: + cleanup: never \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml new file mode 100644 index 00000000..068137fc --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml @@ -0,0 +1,14 @@ +document: + dsl: '1.0.2' + namespace: test + name: run-example + version: '0.1.0' +do: + - runContainer: + run: + container: + image: alpine:3.20 + command: echo Hello World + name: hello-world + lifetime: + cleanup: never diff --git a/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml new file mode 100644 index 00000000..b06c0bb8 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml @@ -0,0 +1,16 @@ +document: + dsl: '1.0.2' + namespace: test + name: run-example + version: '0.1.0' +do: + - runContainer: + run: + container: + image: alpine:latest + command: sleep 300 + name: hello-world-timeout + lifetime: + cleanup: eventually + after: + seconds: 1 \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/container/container.yaml b/impl/test/src/test/resources/workflows-samples/container/container.yaml deleted file mode 100644 index 07a87fe1..00000000 --- a/impl/test/src/test/resources/workflows-samples/container/container.yaml +++ /dev/null @@ -1,30 +0,0 @@ -document: - dsl: '1.0.2' - namespace: test - name: run-example - version: '0.1.0' -do: - - runContainer: - run: - container: - image: alpine:latest - #command: echo Hello World - #command: printenv - #command: sleep 30 - command: "ls -la /treblereel" - name: hello-world - ports: - 8880: 8880 - 8881: 8881 - 8882: 8882 - lifetime: - #cleanup: never - cleanup: never - #cleanup: eventually - #after: - # seconds: 100 - environment: - FOO: BAR - BAR: FOO - volumes: - "/Users/treblereel/temp": "/treblereel" From 5146bf0ce467e26675b3132736a44b9b3a9b0205 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Wed, 5 Nov 2025 14:08:34 +0100 Subject: [PATCH 4/4] Review comments Signed-off-by: fjtirado --- .../executors/CommandPropertySetter.java | 34 +++- .../ContainerEnvironmentPropertySetter.java | 55 +++--- .../executors/ContainerPropertySetter.java | 21 +- .../container/executors/ContainerRunner.java | 184 +++++++++--------- .../executors/LifetimePropertySetter.java | 33 ++-- .../executors/NamePropertySetter.java | 35 +++- .../executors/PortsPropertySetter.java | 49 +++-- .../executors/StringExpressionResolver.java | 48 ----- .../executors/VolumesPropertySetter.java | 58 ++++-- .../impl/WorkflowUtils.java | 4 + .../impl/test/ContainerTest.java | 88 ++++----- .../container/container-cleanup-default.yaml | 2 +- .../container/container-cleanup.yaml | 2 +- .../container/container-env.yaml | 2 +- .../container/container-ports.yaml | 2 +- .../container/container-test-command.yaml | 2 +- .../container/container-timeout.yaml | 2 +- 17 files changed, 325 insertions(+), 296 deletions(-) delete mode 100644 impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java index 7c125c72..b0af3191 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java @@ -15,20 +15,38 @@ */ package io.serverlessworkflow.impl.container.executors; +import static io.serverlessworkflow.impl.WorkflowUtils.isValid; + import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; -import java.util.function.Function; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.util.Optional; + +class CommandPropertySetter implements ContainerPropertySetter { -class CommandPropertySetter extends ContainerPropertySetter { + private Optional> command; - CommandPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); + CommandPropertySetter(WorkflowDefinition definition, Container configuration) { + String commandName = configuration.getCommand(); + command = + isValid(commandName) + ? Optional.of(WorkflowUtils.buildStringFilter(definition.application(), commandName)) + : Optional.empty(); } @Override - public void accept(Function resolver) { - if (configuration.getCommand() != null && !configuration.getCommand().isEmpty()) { - createContainerCmd.withCmd("sh", "-c", configuration.getCommand()); - } + public void accept( + CreateContainerCmd containerCmd, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + command + .map(c -> c.apply(workflowContext, taskContext, model)) + .ifPresent(c -> containerCmd.withCmd("sh", "-c", c)); } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java index 18a4dc85..c4f143a3 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java @@ -17,34 +17,43 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; -import java.util.ArrayList; -import java.util.List; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; import java.util.Map; -import java.util.function.Function; +import java.util.Optional; -class ContainerEnvironmentPropertySetter extends ContainerPropertySetter { +class ContainerEnvironmentPropertySetter implements ContainerPropertySetter { - ContainerEnvironmentPropertySetter( - CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); + private final Optional>> envResolver; + + ContainerEnvironmentPropertySetter(WorkflowDefinition definition, Container configuration) { + + this.envResolver = + configuration.getEnvironment() != null + && configuration.getEnvironment().getAdditionalProperties() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + definition.application(), + null, + configuration.getEnvironment().getAdditionalProperties())) + : Optional.empty(); } @Override - public void accept(Function resolver) { - if (!(configuration.getEnvironment() == null - || configuration.getEnvironment().getAdditionalProperties() == null)) { - List envs = new ArrayList<>(); - for (Map.Entry entry : - configuration.getEnvironment().getAdditionalProperties().entrySet()) { - String key = entry.getKey(); - if (entry.getValue() instanceof String value) { - String resolvedValue = resolver.apply(value); - envs.add(key + "=" + resolvedValue); - } else { - throw new IllegalArgumentException("Environment variable values must be strings"); - } - } - createContainerCmd.withEnv(envs.toArray(new String[0])); - } + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + envResolver + .map(env -> env.apply(workflowContext, taskContext, model)) + .ifPresent( + envs -> + command.withEnv( + envs.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).toList())); } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java index 44b62ebe..9524a92a 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java @@ -16,17 +16,14 @@ package io.serverlessworkflow.impl.container.executors; import com.github.dockerjava.api.command.CreateContainerCmd; -import io.serverlessworkflow.api.types.Container; -import java.util.function.Consumer; -import java.util.function.Function; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; -abstract class ContainerPropertySetter implements Consumer> { - - protected final CreateContainerCmd createContainerCmd; - protected final Container configuration; - - ContainerPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - this.createContainerCmd = createContainerCmd; - this.configuration = configuration; - } +interface ContainerPropertySetter { + abstract void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model); } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java index 8fb8d65f..3385f26d 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java @@ -21,115 +21,122 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import com.github.dockerjava.api.command.CreateContainerResponse; import com.github.dockerjava.api.command.WaitContainerResultCallback; -import com.github.dockerjava.api.exception.DockerClientException; import com.github.dockerjava.api.exception.NotFoundException; import com.github.dockerjava.core.DefaultDockerClientConfig; import com.github.dockerjava.core.DockerClientImpl; import com.github.dockerjava.core.NameParser; import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.api.types.ContainerLifetime; +import io.serverlessworkflow.api.types.TimeoutAfter; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.List; +import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; class ContainerRunner { private static final DefaultDockerClientConfig DEFAULT_CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build(); - private static final DockerClient dockerClient = - DockerClientImpl.getInstance( - DEFAULT_CONFIG, - new ApacheDockerHttpClient.Builder().dockerHost(DEFAULT_CONFIG.getDockerHost()).build()); + private static class DockerClientHolder { + private static final DockerClient dockerClient = + DockerClientImpl.getInstance( + DEFAULT_CONFIG, + new ApacheDockerHttpClient.Builder() + .dockerHost(DEFAULT_CONFIG.getDockerHost()) + .build()); + } - private final CreateContainerCmd createContainerCmd; - private final Container container; - private final List propertySetters; - private final WorkflowDefinition definition; + private final Collection propertySetters; + private final Optional> timeout; + private final ContainerCleanupPolicy policy; + private final String containerImage; - private ContainerRunner( - CreateContainerCmd createContainerCmd, WorkflowDefinition definition, Container container) { - this.createContainerCmd = createContainerCmd; - this.definition = definition; - this.container = container; - this.propertySetters = new ArrayList<>(); + private ContainerRunner(ContainerRunnerBuilder builder) { + this.propertySetters = builder.propertySetters; + this.timeout = Optional.ofNullable(builder.timeout); + this.policy = builder.policy; + this.containerImage = builder.containerImage; } CompletableFuture start( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { return CompletableFuture.supplyAsync( () -> startSync(workflowContext, taskContext, input), - definition.application().executorService()); + workflowContext.definition().application().executorService()); } private WorkflowModel startSync( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - try { - var resolver = new StringExpressionResolver(workflowContext, taskContext, input); - applyPropertySetters(resolver); - pullImageIfNeeded(container.getImage()); - - String id = createAndStartContainer(); - int exit = waitAccordingToLifetime(id, workflowContext, taskContext, input); - if (exit == 0) { - return input; - } + Integer exit = executeContainer(workflowContext, taskContext, input); + if (exit == null || exit == 0) { + return input; + } else { throw mapExitCode(exit); + } + } + + private Integer executeContainer( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + try { + CreateContainerCmd containerCommand = + DockerClientHolder.dockerClient.createContainerCmd(containerImage); + pullImageIfNeeded(containerImage); + propertySetters.forEach(p -> p.accept(containerCommand, workflowContext, taskContext, input)); + String id = createAndStartContainer(containerCommand); + return waitAccordingToLifetime(id, workflowContext, taskContext, input); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw failed("Container execution failed with exit code " + ie.getMessage()); - } catch (Exception e) { + } catch (IOException e) { throw failed("Container execution failed with exit code " + e.getMessage()); } } - private void applyPropertySetters(Function resolver) { - propertySetters.forEach(setter -> setter.accept(resolver)); - } - private void pullImageIfNeeded(String imageRef) throws InterruptedException { NameParser.ReposTag rt = NameParser.parseRepositoryTag(imageRef); - NameParser.HostnameReposName hr = NameParser.resolveRepositoryName(imageRef); - - String repository = hr.reposName; - String tag = (rt.tag == null || rt.tag.isBlank()) ? "latest" : rt.tag; - dockerClient.pullImageCmd(repository).withTag(tag).start().awaitCompletion(); + DockerClientHolder.dockerClient + .pullImageCmd(NameParser.resolveRepositoryName(imageRef).reposName) + .withTag(WorkflowUtils.isValid(rt.tag) ? rt.tag : "latest") + .start() + .awaitCompletion(); } - private String createAndStartContainer() { - CreateContainerResponse resp = createContainerCmd.exec(); + private String createAndStartContainer(CreateContainerCmd containerCommand) { + CreateContainerResponse resp = containerCommand.exec(); String id = resp.getId(); if (id == null || id.isEmpty()) { throw new IllegalStateException("Container creation failed: empty ID"); } - dockerClient.startContainerCmd(id).exec(); + DockerClientHolder.dockerClient.startContainerCmd(id).exec(); return id; } - private int waitAccordingToLifetime( + private Integer waitAccordingToLifetime( String id, WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) - throws Exception { - - var lifetime = container.getLifetime(); - var policy = lifetime != null ? lifetime.getCleanup() : null; - - try (var cb = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) { + throws IOException { + try (var cb = + DockerClientHolder.dockerClient + .waitContainerCmd(id) + .exec(new WaitContainerResultCallback())) { if (policy == ContainerCleanupPolicy.EVENTUALLY) { - Duration timeout = resolveAfter(lifetime, workflowContext, taskContext, input); + Duration timeout = + this.timeout + .map(t -> t.apply(workflowContext, taskContext, input)) + .orElse(Duration.ZERO); try { - Integer exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS); - safeStop(id); - return exit != null ? exit : 0; - } catch (DockerClientException timeoutOrOther) { + return cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS); + } finally { safeStop(id); } } else { @@ -141,23 +148,9 @@ private int waitAccordingToLifetime( return 0; } - private Duration resolveAfter( - io.serverlessworkflow.api.types.ContainerLifetime lifetime, - WorkflowContext workflowContext, - TaskContext taskContext, - WorkflowModel input) { - - if (lifetime == null || lifetime.getAfter() == null) { - return Duration.ZERO; - } - WorkflowValueResolver r = - WorkflowUtils.fromTimeoutAfter(definition.application(), lifetime.getAfter()); - return r.apply(workflowContext, taskContext, input); - } - private boolean isRunning(String id) { try { - var st = dockerClient.inspectContainerCmd(id).exec().getState(); + var st = DockerClientHolder.dockerClient.inspectContainerCmd(id).exec().getState(); return st != null && Boolean.TRUE.equals(st.getRunning()); } catch (Exception e) { return false; // must be already removed @@ -167,7 +160,10 @@ private boolean isRunning(String id) { private void safeStop(String id) { if (isRunning(id)) { safeStop(id, Duration.ofSeconds(10)); - try (var cb2 = dockerClient.waitContainerCmd(id).exec(new WaitContainerResultCallback())) { + try (var cb2 = + DockerClientHolder.dockerClient + .waitContainerCmd(id) + .exec(new WaitContainerResultCallback())) { cb2.awaitStatusCode(); safeRemove(id); } catch (Exception ignore) { @@ -180,7 +176,10 @@ private void safeStop(String id) { private void safeStop(String id, Duration timeout) { try { - dockerClient.stopContainerCmd(id).withTimeout((int) Math.max(1, timeout.toSeconds())).exec(); + DockerClientHolder.dockerClient + .stopContainerCmd(id) + .withTimeout((int) Math.max(1, timeout.toSeconds())) + .exec(); } catch (Exception ignore) { // we can ignore this } @@ -189,13 +188,13 @@ private void safeStop(String id, Duration timeout) { // must be removed because of withAutoRemove(true), but just in case private void safeRemove(String id) { try { - dockerClient.removeContainerCmd(id).withForce(true).exec(); + DockerClientHolder.dockerClient.removeContainerCmd(id).withForce(true).exec(); } catch (Exception ignore) { // we can ignore this } } - private static Exception mapExitCode(int exit) { + private static RuntimeException mapExitCode(int exit) { return switch (exit) { case 1 -> failed("General error (exit code 1)"); case 2 -> failed("Shell syntax error (exit code 2)"); @@ -218,8 +217,12 @@ static ContainerRunnerBuilder builder() { } public static class ContainerRunnerBuilder { - private Container container = null; - private WorkflowDefinition workflowDefinition; + private Container container; + private WorkflowDefinition definition; + private WorkflowValueResolver timeout; + private ContainerCleanupPolicy policy; + private String containerImage; + private Collection propertySetters = new ArrayList<>(); private ContainerRunnerBuilder() {} @@ -229,28 +232,31 @@ ContainerRunnerBuilder withContainer(Container container) { } public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) { - this.workflowDefinition = definition; + this.definition = definition; return this; } ContainerRunner build() { - if (container.getImage() == null || container.getImage().isEmpty()) { + propertySetters.add(new NamePropertySetter(definition, container)); + propertySetters.add(new CommandPropertySetter(definition, container)); + propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container)); + propertySetters.add(new LifetimePropertySetter(container)); + propertySetters.add(new PortsPropertySetter(container)); + propertySetters.add(new VolumesPropertySetter(definition, container)); + + containerImage = container.getImage(); + if (containerImage == null || container.getImage().isBlank()) { throw new IllegalArgumentException("Container image must be provided"); } + ContainerLifetime lifetime = container.getLifetime(); + if (lifetime != null) { + policy = lifetime.getCleanup(); + TimeoutAfter afterTimeout = lifetime.getAfter(); + if (afterTimeout != null) + timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout); + } - CreateContainerCmd createContainerCmd = dockerClient.createContainerCmd(container.getImage()); - - ContainerRunner runner = - new ContainerRunner(createContainerCmd, workflowDefinition, container); - - runner.propertySetters.add(new CommandPropertySetter(createContainerCmd, container)); - runner.propertySetters.add( - new ContainerEnvironmentPropertySetter(createContainerCmd, container)); - runner.propertySetters.add(new NamePropertySetter(createContainerCmd, container)); - runner.propertySetters.add(new PortsPropertySetter(createContainerCmd, container)); - runner.propertySetters.add(new VolumesPropertySetter(createContainerCmd, container)); - runner.propertySetters.add(new LifetimePropertySetter(createContainerCmd, container)); - return runner; + return new ContainerRunner(this); } } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java index 52454778..3606ae93 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java @@ -19,26 +19,29 @@ import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; -import io.serverlessworkflow.api.types.ContainerLifetime; -import java.util.function.Function; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; -class LifetimePropertySetter extends ContainerPropertySetter { +class LifetimePropertySetter implements ContainerPropertySetter { - LifetimePropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); + private final ContainerCleanupPolicy cleanupPolicy; + + LifetimePropertySetter(Container configuration) { + this.cleanupPolicy = + configuration.getLifetime() != null ? configuration.getLifetime().getCleanup() : null; } @Override - public void accept(Function resolver) { - // case of cleanup=eventually processed at ContainerRunner - if (configuration.getLifetime() != null) { - ContainerLifetime lifetime = configuration.getLifetime(); - ContainerCleanupPolicy cleanupPolicy = lifetime.getCleanup(); - if (cleanupPolicy.equals(ContainerCleanupPolicy.ALWAYS)) { - createContainerCmd.getHostConfig().withAutoRemove(true); - } else if (cleanupPolicy.equals(ContainerCleanupPolicy.NEVER)) { - createContainerCmd.getHostConfig().withAutoRemove(false); - } + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + if (ContainerCleanupPolicy.ALWAYS.equals(cleanupPolicy)) { + command.getHostConfig().withAutoRemove(true); + } else if (ContainerCleanupPolicy.NEVER.equals(cleanupPolicy)) { + command.getHostConfig().withAutoRemove(false); } } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java index d8c820c5..70bc8d83 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java @@ -15,21 +15,38 @@ */ package io.serverlessworkflow.impl.container.executors; +import static io.serverlessworkflow.impl.WorkflowUtils.isValid; + import com.github.dockerjava.api.command.CreateContainerCmd; import io.serverlessworkflow.api.types.Container; -import java.util.function.Function; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.util.Optional; + +class NamePropertySetter implements ContainerPropertySetter { -class NamePropertySetter extends ContainerPropertySetter { + private final Optional> containerName; - NamePropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); + NamePropertySetter(WorkflowDefinition definition, Container container) { + String containerName = container.getName(); + this.containerName = + isValid(containerName) + ? Optional.of(WorkflowUtils.buildStringFilter(definition.application(), containerName)) + : Optional.empty(); } @Override - public void accept(Function resolver) { - if (configuration.getName() != null && !configuration.getName().isEmpty()) { - String resolvedName = resolver.apply(configuration.getName()); - createContainerCmd.withName(resolvedName); - } + public void accept( + CreateContainerCmd createContainerCmd, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + containerName + .map(c -> c.apply(workflowContext, taskContext, model)) + .ifPresent(createContainerCmd::withName); } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java index 882abfa6..b176b110 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java @@ -19,34 +19,51 @@ import com.github.dockerjava.api.model.ExposedPort; import com.github.dockerjava.api.model.Ports; import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.function.Function; -class PortsPropertySetter extends ContainerPropertySetter { +class PortsPropertySetter implements ContainerPropertySetter { - PortsPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); - } + private Ports portBindings = new Ports(); + private List exposed = new ArrayList<>(); - @Override - public void accept(Function resolver) { + PortsPropertySetter(Container configuration) { if (configuration.getPorts() != null && configuration.getPorts().getAdditionalProperties() != null) { - Ports portBindings = new Ports(); - List exposed = new ArrayList<>(); - for (Map.Entry entry : configuration.getPorts().getAdditionalProperties().entrySet()) { - int hostPort = Integer.parseInt(entry.getKey()); - int containerPort = Integer.parseInt(entry.getValue().toString()); - ExposedPort exposedPort = ExposedPort.tcp(containerPort); - portBindings.bind(exposedPort, Ports.Binding.bindPort(hostPort)); + ExposedPort exposedPort = ExposedPort.tcp(Integer.parseInt(entry.getKey())); exposed.add(exposedPort); + portBindings.bind(exposedPort, Ports.Binding.bindPort(from(entry.getValue()))); } - createContainerCmd.withExposedPorts(exposed.toArray(new ExposedPort[0])); - createContainerCmd.getHostConfig().withPortBindings(portBindings); + } + } + + @Override + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + command.withExposedPorts(exposed); + command.getHostConfig().withPortBindings(portBindings); + } + + private static Integer from(Object obj) { + if (obj instanceof Integer number) { + return number; + } else if (obj instanceof Number number) { + return number.intValue(); + } else if (obj instanceof String str) { + return Integer.parseInt(str); + } else if (obj != null) { + return Integer.parseInt(obj.toString()); + } else { + throw new IllegalArgumentException("Null value for port key"); } } } diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java deleted file mode 100644 index e5848cee..00000000 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/StringExpressionResolver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.container.executors; - -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowUtils; -import io.serverlessworkflow.impl.expressions.ExpressionUtils; -import java.util.function.Function; - -class StringExpressionResolver implements Function { - - private final WorkflowContext workflowContext; - private final TaskContext taskContext; - private final WorkflowModel input; - - StringExpressionResolver( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - this.workflowContext = workflowContext; - this.taskContext = taskContext; - this.input = input; - } - - public String apply(String expression) { - if (ExpressionUtils.isExpr(expression)) { - return WorkflowUtils.buildStringResolver( - workflowContext.definition().application(), - expression, - taskContext.input().asJavaObject()) - .apply(workflowContext, taskContext, input); - } - return expression; - } -} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java index bb7215ca..7217742b 100644 --- a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java @@ -19,34 +19,56 @@ import com.github.dockerjava.api.model.Bind; import com.github.dockerjava.api.model.Volume; import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; import java.util.ArrayList; -import java.util.List; +import java.util.Collection; import java.util.Map; -import java.util.function.Function; +import java.util.Objects; -class VolumesPropertySetter extends ContainerPropertySetter { +class VolumesPropertySetter implements ContainerPropertySetter { - VolumesPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) { - super(createContainerCmd, configuration); - } + private record HostContainer( + WorkflowValueResolver host, WorkflowValueResolver container) {} - @Override - public void accept(Function resolver) { + private final Collection binds = new ArrayList<>(); + + VolumesPropertySetter(WorkflowDefinition definition, Container configuration) { if (configuration.getVolumes() != null && configuration.getVolumes().getAdditionalProperties() != null) { - List binds = new ArrayList<>(); for (Map.Entry entry : configuration.getVolumes().getAdditionalProperties().entrySet()) { - String hostPath = entry.getKey(); - if (entry.getValue() instanceof String containerPath) { - String resolvedHostPath = resolver.apply(hostPath); - String resolvedContainerPath = resolver.apply(containerPath); - binds.add(new Bind(resolvedHostPath, new Volume(resolvedContainerPath))); - } else { - throw new IllegalArgumentException("Volume container paths must be strings"); - } + binds.add( + new HostContainer( + WorkflowUtils.buildStringFilter(definition.application(), entry.getKey()), + WorkflowUtils.buildStringFilter( + definition.application(), + Objects.requireNonNull( + entry.getValue(), "Volume value must be a not null string") + .toString()))); } - createContainerCmd.getHostConfig().withBinds(binds); } } + + @Override + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + command + .getHostConfig() + .withBinds( + binds.stream() + .map( + r -> + new Bind( + r.host().apply(workflowContext, taskContext, model), + new Volume(r.container.apply(workflowContext, taskContext, model)))) + .toList()); + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 7b6b3403..6fef5a85 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -59,6 +59,10 @@ public static Optional getSchemaValidator( return Optional.empty(); } + public static boolean isValid(String str) { + return str != null && !str.isBlank(); + } + public static Optional buildWorkflowFilter( WorkflowApplication app, InputFrom from) { return from != null diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java index a30c3fab..a4d25419 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java @@ -38,29 +38,38 @@ import java.time.Duration; import java.util.Arrays; import java.util.Map; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; public class ContainerTest { - private static final DefaultDockerClientConfig DEFAULT_CONFIG = - DefaultDockerClientConfig.createDefaultConfigBuilder().build(); + private static DockerClient dockerClient; + private static WorkflowApplication app; + + @BeforeAll + static void init() { + DefaultDockerClientConfig defaultConfig = + DefaultDockerClientConfig.createDefaultConfigBuilder().build(); + dockerClient = + DockerClientImpl.getInstance( + defaultConfig, + new ApacheDockerHttpClient.Builder().dockerHost(defaultConfig.getDockerHost()).build()); + app = WorkflowApplication.builder().build(); + } - private static final DockerClient dockerClient = - DockerClientImpl.getInstance( - DEFAULT_CONFIG, - new ApacheDockerHttpClient.Builder().dockerHost(DEFAULT_CONFIG.getDockerHost()).build()); + @AfterAll + static void cleanup() throws IOException { + dockerClient.close(); + app.close(); + } @Test public void testContainer() throws IOException, InterruptedException { Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-test-command.yaml"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = - app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); String containerName = "hello-world"; String containerId = findContainerIdByName(containerName); @@ -91,12 +100,8 @@ public void testContainerEnv() throws IOException, InterruptedException { Map input = Map.of("someValue", "Tested"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = app.workflowDefinition(workflow).instance(input).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } + Map result = + app.workflowDefinition(workflow).instance(input).start().join().asMap().orElseThrow(); String containerName = "hello-world-envs"; ByteArrayOutputStream output = new ByteArrayOutputStream(); @@ -126,13 +131,8 @@ public void testContainerTimeout() throws IOException { Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-timeout.yaml"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = - app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); String containerName = "hello-world-timeout"; String containerId = findContainerIdByName(containerName); @@ -146,13 +146,8 @@ public void testContainerCleanup() throws IOException { Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-cleanup.yaml"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = - app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); String containerName = "hello-world-cleanup"; String containerId = findContainerIdByName(containerName); @@ -165,14 +160,8 @@ public void testContainerCleanupDefault() throws IOException { Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-cleanup-default.yaml"); - Map result; - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - result = - app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); - } catch (Exception e) { - throw new RuntimeException("Workflow execution failed", e); - } - + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); String containerName = "hello-world-cleanup-default"; String containerId = findContainerIdByName(containerName); assertFalse(isContainerGone(containerId)); @@ -188,17 +177,12 @@ void testPortBindings() throws Exception { new Thread( () -> { - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - app.workflowDefinition(workflow) - .instance(Map.of()) - .start() - .get() - .asMap() - .orElseThrow(); - } catch (Exception e) { - // we can ignore exceptions here, as the workflow will end when the container is - // removed - } + app.workflowDefinition(workflow) + .instance(Map.of()) + .start() + .join() + .asMap() + .orElseThrow(); }) .start(); diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml index 208b1faa..2a6cea6d 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-cleanup-default version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml index b7501a75..968cc34c 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: cointaner-cleanup version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-env.yaml b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml index 9876480b..71988b84 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-env.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-env version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml index bc330610..0639e326 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-ports version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml index 068137fc..874c9fbf 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-test-command version: '0.1.0' do: - runContainer: diff --git a/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml index b06c0bb8..b8a3b5e0 100644 --- a/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml +++ b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml @@ -1,7 +1,7 @@ document: dsl: '1.0.2' namespace: test - name: run-example + name: container-timeout version: '0.1.0' do: - runContainer: