diff --git a/impl/container/pom.xml b/impl/container/pom.xml
new file mode 100644
index 00000000..51ced574
--- /dev/null
+++ b/impl/container/pom.xml
@@ -0,0 +1,33 @@
+
+
+ 4.0.0
+
+ io.serverlessworkflow
+ serverlessworkflow-impl
+ 8.0.0-SNAPSHOT
+
+ serverlessworkflow-impl-container
+ Serverless Workflow :: Impl :: Container
+
+
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-core
+
+
+ io.serverlessworkflow
+ serverlessworkflow-types
+
+
+ com.github.docker-java
+ docker-java-core
+
+
+ com.github.docker-java
+ docker-java-transport-httpclient5
+
+
+
+
\ No newline at end of file
diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java
new file mode 100644
index 00000000..b0af3191
--- /dev/null
+++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.container.executors;
+
+import static io.serverlessworkflow.impl.WorkflowUtils.isValid;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import io.serverlessworkflow.api.types.Container;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowUtils;
+import io.serverlessworkflow.impl.WorkflowValueResolver;
+import java.util.Optional;
+
+class CommandPropertySetter implements ContainerPropertySetter {
+
+ private Optional> command;
+
+ CommandPropertySetter(WorkflowDefinition definition, Container configuration) {
+ String commandName = configuration.getCommand();
+ command =
+ isValid(commandName)
+ ? Optional.of(WorkflowUtils.buildStringFilter(definition.application(), commandName))
+ : Optional.empty();
+ }
+
+ @Override
+ public void accept(
+ CreateContainerCmd containerCmd,
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ WorkflowModel model) {
+ command
+ .map(c -> c.apply(workflowContext, taskContext, model))
+ .ifPresent(c -> containerCmd.withCmd("sh", "-c", c));
+ }
+}
diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java
new file mode 100644
index 00000000..c4f143a3
--- /dev/null
+++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.container.executors;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import io.serverlessworkflow.api.types.Container;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowUtils;
+import io.serverlessworkflow.impl.WorkflowValueResolver;
+import java.util.Map;
+import java.util.Optional;
+
+class ContainerEnvironmentPropertySetter implements ContainerPropertySetter {
+
+ private final Optional>> envResolver;
+
+ ContainerEnvironmentPropertySetter(WorkflowDefinition definition, Container configuration) {
+
+ this.envResolver =
+ configuration.getEnvironment() != null
+ && configuration.getEnvironment().getAdditionalProperties() != null
+ ? Optional.of(
+ WorkflowUtils.buildMapResolver(
+ definition.application(),
+ null,
+ configuration.getEnvironment().getAdditionalProperties()))
+ : Optional.empty();
+ }
+
+ @Override
+ public void accept(
+ CreateContainerCmd command,
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ WorkflowModel model) {
+ envResolver
+ .map(env -> env.apply(workflowContext, taskContext, model))
+ .ifPresent(
+ envs ->
+ command.withEnv(
+ envs.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).toList()));
+ }
+}
diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java
new file mode 100644
index 00000000..9524a92a
--- /dev/null
+++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.container.executors;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+
+interface ContainerPropertySetter {
+ abstract void accept(
+ CreateContainerCmd command,
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ WorkflowModel model);
+}
diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java
new file mode 100644
index 00000000..5c14a3ac
--- /dev/null
+++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.container.executors;
+
+import static io.serverlessworkflow.api.types.ContainerLifetime.*;
+import static io.serverlessworkflow.impl.WorkflowUtils.isValid;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import com.github.dockerjava.api.command.CreateContainerResponse;
+import com.github.dockerjava.api.command.WaitContainerResultCallback;
+import com.github.dockerjava.api.exception.DockerClientException;
+import com.github.dockerjava.api.exception.NotFoundException;
+import com.github.dockerjava.core.DefaultDockerClientConfig;
+import com.github.dockerjava.core.DockerClientImpl;
+import com.github.dockerjava.core.NameParser;
+import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
+import io.serverlessworkflow.api.types.Container;
+import io.serverlessworkflow.api.types.ContainerLifetime;
+import io.serverlessworkflow.api.types.TimeoutAfter;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowUtils;
+import io.serverlessworkflow.impl.WorkflowValueResolver;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+class ContainerRunner {
+
+ private static final DefaultDockerClientConfig DEFAULT_CONFIG =
+ DefaultDockerClientConfig.createDefaultConfigBuilder().build();
+
+ private static class DockerClientHolder {
+ private static final DockerClient dockerClient =
+ DockerClientImpl.getInstance(
+ DEFAULT_CONFIG,
+ new ApacheDockerHttpClient.Builder()
+ .dockerHost(DEFAULT_CONFIG.getDockerHost())
+ .build());
+ }
+
+ private final Collection propertySetters;
+ private final Optional> timeout;
+ private final ContainerCleanupPolicy policy;
+ private final String containerImage;
+
+ private ContainerRunner(ContainerRunnerBuilder builder) {
+ this.propertySetters = builder.propertySetters;
+ this.timeout = Optional.ofNullable(builder.timeout);
+ this.policy = builder.policy;
+ this.containerImage = builder.containerImage;
+ }
+
+ CompletableFuture start(
+ WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
+ return CompletableFuture.supplyAsync(
+ () -> startSync(workflowContext, taskContext, input),
+ workflowContext.definition().application().executorService());
+ }
+
+ private WorkflowModel startSync(
+ WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
+ Integer exit = executeContainer(workflowContext, taskContext, input);
+ if (exit == null || exit == 0) {
+ return input;
+ } else {
+ throw mapExitCode(exit);
+ }
+ }
+
+ private Integer executeContainer(
+ WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
+ try {
+ pullImageIfNeeded(containerImage);
+ CreateContainerCmd containerCommand =
+ DockerClientHolder.dockerClient.createContainerCmd(containerImage);
+ propertySetters.forEach(p -> p.accept(containerCommand, workflowContext, taskContext, input));
+ return waitAccordingToLifetime(
+ createAndStartContainer(containerCommand), workflowContext, taskContext, input);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw failed("Container execution failed with exit code " + ie.getMessage());
+ } catch (IOException e) {
+ throw failed("Container execution failed with exit code " + e.getMessage());
+ }
+ }
+
+ private void pullImageIfNeeded(String imageRef) throws InterruptedException {
+ NameParser.ReposTag rt = NameParser.parseRepositoryTag(imageRef);
+ DockerClientHolder.dockerClient
+ .pullImageCmd(NameParser.resolveRepositoryName(imageRef).reposName)
+ .withTag(WorkflowUtils.isValid(rt.tag) ? rt.tag : "latest")
+ .start()
+ .awaitCompletion();
+ }
+
+ private String createAndStartContainer(CreateContainerCmd containerCommand) {
+ CreateContainerResponse resp = containerCommand.exec();
+ String id = resp.getId();
+ if (!isValid(id)) {
+ throw new IllegalStateException("Container creation failed: empty ID");
+ }
+ DockerClientHolder.dockerClient.startContainerCmd(id).exec();
+ return id;
+ }
+
+ private Integer waitAccordingToLifetime(
+ String id, WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input)
+ throws IOException {
+ try (var cb =
+ DockerClientHolder.dockerClient
+ .waitContainerCmd(id)
+ .exec(new WaitContainerResultCallback())) {
+ if (policy == ContainerCleanupPolicy.EVENTUALLY) {
+ Duration timeout =
+ this.timeout
+ .map(t -> t.apply(workflowContext, taskContext, input))
+ .orElse(Duration.ZERO);
+ try {
+ Integer exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ safeStop(id);
+ return exit;
+ } catch (DockerClientException timeoutOrOther) {
+ safeStop(id);
+ }
+ } else {
+ return cb.awaitStatusCode();
+ }
+ } catch (NotFoundException e) {
+ // container already removed
+ }
+ return 0;
+ }
+
+ private boolean isRunning(String id) {
+ try {
+ var st = DockerClientHolder.dockerClient.inspectContainerCmd(id).exec().getState();
+ return st != null && Boolean.TRUE.equals(st.getRunning());
+ } catch (Exception e) {
+ return false; // must be already removed
+ }
+ }
+
+ private void safeStop(String id) {
+ if (isRunning(id)) {
+ safeStop(id, Duration.ofSeconds(10));
+ try (var cb2 =
+ DockerClientHolder.dockerClient
+ .waitContainerCmd(id)
+ .exec(new WaitContainerResultCallback())) {
+ cb2.awaitStatusCode();
+ safeRemove(id);
+ } catch (Exception ignore) {
+ // we can ignore this
+ }
+ } else {
+ safeRemove(id);
+ }
+ }
+
+ private void safeStop(String id, Duration timeout) {
+ try {
+ DockerClientHolder.dockerClient
+ .stopContainerCmd(id)
+ .withTimeout((int) Math.max(1, timeout.toSeconds()))
+ .exec();
+ } catch (Exception ignore) {
+ // we can ignore this
+ }
+ }
+
+ // must be removed because of withAutoRemove(true), but just in case
+ private void safeRemove(String id) {
+ try {
+ DockerClientHolder.dockerClient.removeContainerCmd(id).withForce(true).exec();
+ } catch (Exception ignore) {
+ // we can ignore this
+ }
+ }
+
+ private static RuntimeException mapExitCode(int exit) {
+ return switch (exit) {
+ case 1 -> failed("General error (exit code 1)");
+ case 2 -> failed("Shell syntax error (exit code 2)");
+ case 126 -> failed("Command found but not executable (exit code 126)");
+ case 127 -> failed("Command not found (exit code 127)");
+ case 130 -> failed("Interrupted by SIGINT (exit code 130)");
+ case 137 -> failed("Killed by SIGKILL (exit code 137)");
+ case 139 -> failed("Segmentation fault (exit code 139)");
+ case 143 -> failed("Terminated by SIGTERM (exit code 143)");
+ default -> failed("Process exited with code " + exit);
+ };
+ }
+
+ private static RuntimeException failed(String message) {
+ return new RuntimeException(message);
+ }
+
+ static ContainerRunnerBuilder builder() {
+ return new ContainerRunnerBuilder();
+ }
+
+ public static class ContainerRunnerBuilder {
+ private Container container;
+ private WorkflowDefinition definition;
+ private WorkflowValueResolver timeout;
+ private ContainerCleanupPolicy policy;
+ private String containerImage;
+ private Collection propertySetters = new ArrayList<>();
+
+ private ContainerRunnerBuilder() {}
+
+ ContainerRunnerBuilder withContainer(Container container) {
+ this.container = container;
+ return this;
+ }
+
+ public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) {
+ this.definition = definition;
+ return this;
+ }
+
+ ContainerRunner build() {
+ propertySetters.add(new NamePropertySetter(definition, container));
+ propertySetters.add(new CommandPropertySetter(definition, container));
+ propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container));
+ propertySetters.add(new LifetimePropertySetter(container));
+ propertySetters.add(new PortsPropertySetter(container));
+ propertySetters.add(new VolumesPropertySetter(definition, container));
+
+ containerImage = container.getImage();
+ if (containerImage == null || container.getImage().isBlank()) {
+ throw new IllegalArgumentException("Container image must be provided");
+ }
+ ContainerLifetime lifetime = container.getLifetime();
+ if (lifetime != null) {
+ policy = lifetime.getCleanup();
+ TimeoutAfter afterTimeout = lifetime.getAfter();
+ if (afterTimeout != null)
+ timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout);
+ }
+
+ return new ContainerRunner(this);
+ }
+ }
+}
diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java
new file mode 100644
index 00000000..3606ae93
--- /dev/null
+++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.container.executors;
+
+import static io.serverlessworkflow.api.types.ContainerLifetime.*;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import io.serverlessworkflow.api.types.Container;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+
+class LifetimePropertySetter implements ContainerPropertySetter {
+
+ private final ContainerCleanupPolicy cleanupPolicy;
+
+ LifetimePropertySetter(Container configuration) {
+ this.cleanupPolicy =
+ configuration.getLifetime() != null ? configuration.getLifetime().getCleanup() : null;
+ }
+
+ @Override
+ public void accept(
+ CreateContainerCmd command,
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ WorkflowModel model) {
+ if (ContainerCleanupPolicy.ALWAYS.equals(cleanupPolicy)) {
+ command.getHostConfig().withAutoRemove(true);
+ } else if (ContainerCleanupPolicy.NEVER.equals(cleanupPolicy)) {
+ command.getHostConfig().withAutoRemove(false);
+ }
+ }
+}
diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java
new file mode 100644
index 00000000..70bc8d83
--- /dev/null
+++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.container.executors;
+
+import static io.serverlessworkflow.impl.WorkflowUtils.isValid;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import io.serverlessworkflow.api.types.Container;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowUtils;
+import io.serverlessworkflow.impl.WorkflowValueResolver;
+import java.util.Optional;
+
+class NamePropertySetter implements ContainerPropertySetter {
+
+ private final Optional> containerName;
+
+ NamePropertySetter(WorkflowDefinition definition, Container container) {
+ String containerName = container.getName();
+ this.containerName =
+ isValid(containerName)
+ ? Optional.of(WorkflowUtils.buildStringFilter(definition.application(), containerName))
+ : Optional.empty();
+ }
+
+ @Override
+ public void accept(
+ CreateContainerCmd createContainerCmd,
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ WorkflowModel model) {
+ containerName
+ .map(c -> c.apply(workflowContext, taskContext, model))
+ .ifPresent(createContainerCmd::withName);
+ }
+}
diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java
new file mode 100644
index 00000000..b176b110
--- /dev/null
+++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.container.executors;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.Ports;
+import io.serverlessworkflow.api.types.Container;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+class PortsPropertySetter implements ContainerPropertySetter {
+
+ private Ports portBindings = new Ports();
+ private List exposed = new ArrayList<>();
+
+ PortsPropertySetter(Container configuration) {
+ if (configuration.getPorts() != null
+ && configuration.getPorts().getAdditionalProperties() != null) {
+ for (Map.Entry entry :
+ configuration.getPorts().getAdditionalProperties().entrySet()) {
+ ExposedPort exposedPort = ExposedPort.tcp(Integer.parseInt(entry.getKey()));
+ exposed.add(exposedPort);
+ portBindings.bind(exposedPort, Ports.Binding.bindPort(from(entry.getValue())));
+ }
+ }
+ }
+
+ @Override
+ public void accept(
+ CreateContainerCmd command,
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ WorkflowModel model) {
+ command.withExposedPorts(exposed);
+ command.getHostConfig().withPortBindings(portBindings);
+ }
+
+ private static Integer from(Object obj) {
+ if (obj instanceof Integer number) {
+ return number;
+ } else if (obj instanceof Number number) {
+ return number.intValue();
+ } else if (obj instanceof String str) {
+ return Integer.parseInt(str);
+ } else if (obj != null) {
+ return Integer.parseInt(obj.toString());
+ } else {
+ throw new IllegalArgumentException("Null value for port key");
+ }
+ }
+}
diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java
new file mode 100644
index 00000000..adf7482b
--- /dev/null
+++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.container.executors;
+
+import io.serverlessworkflow.api.types.Container;
+import io.serverlessworkflow.api.types.RunContainer;
+import io.serverlessworkflow.api.types.RunTaskConfiguration;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.executors.RunnableTask;
+import java.util.concurrent.CompletableFuture;
+
+public class RunContainerExecutor implements RunnableTask {
+
+ private ContainerRunner containerRunner;
+
+ public void init(RunContainer taskConfiguration, WorkflowDefinition definition) {
+ Container container = taskConfiguration.getContainer();
+ containerRunner =
+ ContainerRunner.builder()
+ .withContainer(container)
+ .withWorkflowDefinition(definition)
+ .build();
+ }
+
+ @Override
+ public CompletableFuture apply(
+ WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
+ return containerRunner.start(workflowContext, taskContext, input);
+ }
+
+ @Override
+ public boolean accept(Class extends RunTaskConfiguration> clazz) {
+ return RunContainer.class.equals(clazz);
+ }
+}
diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java
new file mode 100644
index 00000000..63830f34
--- /dev/null
+++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.container.executors;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import com.github.dockerjava.api.model.Bind;
+import com.github.dockerjava.api.model.Volume;
+import io.serverlessworkflow.api.types.Container;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowUtils;
+import io.serverlessworkflow.impl.WorkflowValueResolver;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+
+class VolumesPropertySetter implements ContainerPropertySetter {
+
+ private static record HostContainer(
+ WorkflowValueResolver host, WorkflowValueResolver container) {}
+
+ private final Collection binds = new ArrayList<>();
+
+ VolumesPropertySetter(WorkflowDefinition definition, Container configuration) {
+ if (configuration.getVolumes() != null
+ && configuration.getVolumes().getAdditionalProperties() != null) {
+ for (Map.Entry entry :
+ configuration.getVolumes().getAdditionalProperties().entrySet()) {
+ binds.add(
+ new HostContainer(
+ WorkflowUtils.buildStringFilter(definition.application(), entry.getKey()),
+ WorkflowUtils.buildStringFilter(
+ definition.application(),
+ Objects.requireNonNull(
+ entry.getValue(), "Volume value must be a not null string")
+ .toString())));
+ }
+ }
+ }
+
+ @Override
+ public void accept(
+ CreateContainerCmd command,
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ WorkflowModel model) {
+ command
+ .getHostConfig()
+ .withBinds(
+ binds.stream()
+ .map(
+ r ->
+ new Bind(
+ r.host().apply(workflowContext, taskContext, model),
+ new Volume(r.container.apply(workflowContext, taskContext, model))))
+ .toList());
+ }
+}
diff --git a/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask
new file mode 100644
index 00000000..c1450d21
--- /dev/null
+++ b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask
@@ -0,0 +1 @@
+io.serverlessworkflow.impl.container.executors.RunContainerExecutor
\ No newline at end of file
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java
index 7b6b3403..6fef5a85 100644
--- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java
@@ -59,6 +59,10 @@ public static Optional getSchemaValidator(
return Optional.empty();
}
+ public static boolean isValid(String str) {
+ return str != null && !str.isBlank();
+ }
+
public static Optional buildWorkflowFilter(
WorkflowApplication app, InputFrom from) {
return from != null
diff --git a/impl/pom.xml b/impl/pom.xml
index e4698812..c59df5fa 100644
--- a/impl/pom.xml
+++ b/impl/pom.xml
@@ -15,6 +15,7 @@
1.6.0
3.1.11
9.2.1
+ 3.6.0
@@ -93,6 +94,11 @@
serverlessworkflow-impl-openapi
${project.version}
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-container
+ ${project.version}
+
net.thisptr
jackson-jq
@@ -130,6 +136,16 @@
cron-utils
${version.com.cronutils}
+
+ com.github.docker-java
+ docker-java-core
+ ${version.docker.java}
+
+
+ com.github.docker-java
+ docker-java-transport-httpclient5
+ ${version.docker.java}
+
@@ -145,6 +161,7 @@
model
lifecycleevent
validation
+ container
test
diff --git a/impl/test/pom.xml b/impl/test/pom.xml
index 7672bbef..f4e846cb 100644
--- a/impl/test/pom.xml
+++ b/impl/test/pom.xml
@@ -41,6 +41,10 @@
io.serverlessworkflow
serverlessworkflow-impl-openapi
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-container
+
org.glassfish.jersey.core
jersey-client
diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java
new file mode 100644
index 00000000..989ae671
--- /dev/null
+++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.test;
+
+import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import com.github.dockerjava.api.model.Container;
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.Frame;
+import com.github.dockerjava.api.model.Ports;
+import com.github.dockerjava.core.DefaultDockerClientConfig;
+import com.github.dockerjava.core.DockerClientImpl;
+import com.github.dockerjava.core.command.LogContainerResultCallback;
+import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
+import io.serverlessworkflow.api.types.Workflow;
+import io.serverlessworkflow.impl.WorkflowApplication;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Map;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@DisabledIf("checkDocker")
+public class ContainerTest {
+
+ private static DockerClient dockerClient;
+ private static Logger logger = LoggerFactory.getLogger(ContainerTest.class);
+
+ {
+ DefaultDockerClientConfig defaultConfig =
+ DefaultDockerClientConfig.createDefaultConfigBuilder().build();
+ dockerClient =
+ DockerClientImpl.getInstance(
+ defaultConfig,
+ new ApacheDockerHttpClient.Builder().dockerHost(defaultConfig.getDockerHost()).build());
+ }
+
+ @SuppressWarnings("unused")
+ private static boolean checkDocker() {
+ try {
+ dockerClient.pingCmd().exec();
+ return false;
+ } catch (Exception ex) {
+ logger.warn("Docker is not running, disabling container test");
+ return true;
+ }
+ }
+
+ private static WorkflowApplication app;
+
+ @BeforeAll
+ static void init() {
+ app = WorkflowApplication.builder().build();
+ }
+
+ @AfterAll
+ static void cleanup() throws IOException {
+ app.close();
+ }
+
+ @Test
+ public void testContainer() throws IOException, InterruptedException {
+ Workflow workflow =
+ readWorkflowFromClasspath("workflows-samples/container/container-test-command.yaml");
+ String containerName = "hello-world";
+ try {
+ Map result =
+ app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow();
+
+ String containerId = findContainerIdByName(containerName);
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ dockerClient
+ .logContainerCmd(containerId)
+ .withStdOut(true)
+ .withStdErr(true)
+ .withTimestamps(true)
+ .exec(
+ new LogContainerResultCallback() {
+ @Override
+ public void onNext(Frame frame) {
+ output.writeBytes(frame.getPayload());
+ }
+ })
+ .awaitCompletion();
+
+ assertTrue(output.toString().contains("Hello World"));
+ assertNotNull(result);
+ } finally {
+ dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec();
+ }
+ }
+
+ @Test
+ public void testContainerEnv() throws IOException, InterruptedException {
+ Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-env.yaml");
+ String containerName = "hello-world-envs";
+
+ Map input = Map.of("someValue", "Tested");
+
+ try {
+ Map result =
+ app.workflowDefinition(workflow).instance(input).start().join().asMap().orElseThrow();
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ dockerClient
+ .logContainerCmd(findContainerIdByName(containerName))
+ .withStdOut(true)
+ .withStdErr(true)
+ .withTimestamps(true)
+ .exec(
+ new LogContainerResultCallback() {
+ @Override
+ public void onNext(Frame frame) {
+ output.writeBytes(frame.getPayload());
+ }
+ })
+ .awaitCompletion();
+ assertTrue(output.toString().contains("BAR=FOO"));
+ assertTrue(output.toString().contains("FOO=Tested"));
+ assertNotNull(result);
+ } finally {
+ dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec();
+ }
+ }
+
+ @Test
+ public void testContainerTimeout() throws IOException {
+ String containerName = "hello-world-timeout";
+ try {
+ Workflow workflow =
+ readWorkflowFromClasspath("workflows-samples/container/container-timeout.yaml");
+
+ Map result =
+ app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow();
+
+ String containerId = findContainerIdByName(containerName);
+
+ assertTrue(isContainerGone(containerId));
+ assertNotNull(result);
+ } finally {
+ dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec();
+ }
+ }
+
+ @Test
+ public void testContainerCleanup() throws IOException {
+ String containerName = "hello-world-cleanup";
+ try {
+ Workflow workflow =
+ readWorkflowFromClasspath("workflows-samples/container/container-cleanup.yaml");
+
+ Map result =
+ app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow();
+
+ String containerId = findContainerIdByName(containerName);
+ assertTrue(isContainerGone(containerId));
+ assertNotNull(result);
+ } finally {
+ dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec();
+ }
+ }
+
+ @Test
+ public void testContainerCleanupDefault() throws IOException {
+ String containerName = "hello-world-cleanup-default";
+ try {
+ Workflow workflow =
+ readWorkflowFromClasspath("workflows-samples/container/container-cleanup-default.yaml");
+
+ Map result =
+ app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow();
+ String containerId = findContainerIdByName(containerName);
+ assertFalse(isContainerGone(containerId));
+ assertNotNull(result);
+ } finally {
+ dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec();
+ }
+ }
+
+ @Test
+ void testPortBindings() throws Exception {
+ Workflow workflow =
+ readWorkflowFromClasspath("workflows-samples/container/container-ports.yaml");
+ String containerName = "hello-world-ports";
+
+ try {
+ new Thread(
+ () -> {
+ app.workflowDefinition(workflow)
+ .instance(Map.of())
+ .start()
+ .join()
+ .asMap()
+ .orElseThrow();
+ })
+ .start();
+
+ await()
+ .pollInterval(Duration.ofSeconds(1))
+ .atMost(Duration.ofSeconds(10))
+ .until(() -> findContainerIdByName(containerName) != null);
+
+ String containerId = findContainerIdByName(containerName);
+ InspectContainerResponse inspect = dockerClient.inspectContainerCmd(containerId).exec();
+ Map ports =
+ inspect.getNetworkSettings().getPorts().getBindings();
+
+ assertTrue(ports.containsKey(ExposedPort.tcp(8880)));
+ assertTrue(ports.containsKey(ExposedPort.tcp(8881)));
+ assertTrue(ports.containsKey(ExposedPort.tcp(8882)));
+ } finally {
+ dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec();
+ }
+ }
+
+ private static String findContainerIdByName(String containerName) {
+ var containers = dockerClient.listContainersCmd().withShowAll(true).exec();
+
+ return containers.stream()
+ .filter(
+ c ->
+ c.getNames() != null
+ && Arrays.stream(c.getNames()).anyMatch(n -> n.equals("/" + containerName)))
+ .map(Container::getId)
+ .findFirst()
+ .orElse(null);
+ }
+
+ private static boolean isContainerGone(String id) {
+ if (id == null) {
+ return true;
+ }
+ var containers = dockerClient.listContainersCmd().withShowAll(true).exec();
+ return containers.stream().noneMatch(c -> c.getId().startsWith(id));
+ }
+}
diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml
new file mode 100644
index 00000000..a1a10b80
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml
@@ -0,0 +1,12 @@
+document:
+ dsl: '1.0.2'
+ namespace: test
+ name: container-cleanup-default
+ version: '0.1.0'
+do:
+ - runContainer:
+ run:
+ container:
+ image: busybox:latest
+ command: echo Hello World
+ name: hello-world-cleanup-default
diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml
new file mode 100644
index 00000000..aa5680f4
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml
@@ -0,0 +1,14 @@
+document:
+ dsl: '1.0.2'
+ namespace: test
+ name: cointaner-cleanup
+ version: '0.1.0'
+do:
+ - runContainer:
+ run:
+ container:
+ image: busybox:latest
+ command: echo Hello World
+ name: hello-world-cleanup
+ lifetime:
+ cleanup: always
\ No newline at end of file
diff --git a/impl/test/src/test/resources/workflows-samples/container/container-env.yaml b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml
new file mode 100644
index 00000000..8d50dbab
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml
@@ -0,0 +1,17 @@
+document:
+ dsl: '1.0.2'
+ namespace: test
+ name: container-env
+ version: '0.1.0'
+do:
+ - runContainer:
+ run:
+ container:
+ image: busybox:latest
+ command: printenv
+ name: hello-world-envs
+ lifetime:
+ cleanup: never
+ environment:
+ FOO: ${ .someValue }
+ BAR: FOO
diff --git a/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml
new file mode 100644
index 00000000..3840b876
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml
@@ -0,0 +1,18 @@
+document:
+ dsl: '1.0.2'
+ namespace: test
+ name: container-ports
+ version: '0.1.0'
+do:
+ - runContainer:
+ run:
+ container:
+ image: busybox:latest
+ command: sleep 300
+ name: hello-world-ports
+ ports:
+ 8880: 8880
+ 8881: 8881
+ 8882: 8882
+ lifetime:
+ cleanup: never
\ No newline at end of file
diff --git a/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml
new file mode 100644
index 00000000..54887ecb
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml
@@ -0,0 +1,14 @@
+document:
+ dsl: '1.0.2'
+ namespace: test
+ name: container-test-command
+ version: '0.1.0'
+do:
+ - runContainer:
+ run:
+ container:
+ image: busybox:3.20
+ command: echo Hello World
+ name: hello-world
+ lifetime:
+ cleanup: never
diff --git a/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml
new file mode 100644
index 00000000..82f18891
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml
@@ -0,0 +1,16 @@
+document:
+ dsl: '1.0.2'
+ namespace: test
+ name: container-timeout
+ version: '0.1.0'
+do:
+ - runContainer:
+ run:
+ container:
+ image: busybox:latest
+ command: sleep 300
+ name: hello-world-timeout
+ lifetime:
+ cleanup: eventually
+ after:
+ seconds: 1
\ No newline at end of file