diff --git a/impl/container/pom.xml b/impl/container/pom.xml new file mode 100644 index 00000000..51ced574 --- /dev/null +++ b/impl/container/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-container + Serverless Workflow :: Impl :: Container + + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + io.serverlessworkflow + serverlessworkflow-types + + + com.github.docker-java + docker-java-core + + + com.github.docker-java + docker-java-transport-httpclient5 + + + + \ No newline at end of file diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java new file mode 100644 index 00000000..b0af3191 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/CommandPropertySetter.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import static io.serverlessworkflow.impl.WorkflowUtils.isValid; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.util.Optional; + +class CommandPropertySetter implements ContainerPropertySetter { + + private Optional> command; + + CommandPropertySetter(WorkflowDefinition definition, Container configuration) { + String commandName = configuration.getCommand(); + command = + isValid(commandName) + ? Optional.of(WorkflowUtils.buildStringFilter(definition.application(), commandName)) + : Optional.empty(); + } + + @Override + public void accept( + CreateContainerCmd containerCmd, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + command + .map(c -> c.apply(workflowContext, taskContext, model)) + .ifPresent(c -> containerCmd.withCmd("sh", "-c", c)); + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java new file mode 100644 index 00000000..c4f143a3 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerEnvironmentPropertySetter.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.util.Map; +import java.util.Optional; + +class ContainerEnvironmentPropertySetter implements ContainerPropertySetter { + + private final Optional>> envResolver; + + ContainerEnvironmentPropertySetter(WorkflowDefinition definition, Container configuration) { + + this.envResolver = + configuration.getEnvironment() != null + && configuration.getEnvironment().getAdditionalProperties() != null + ? Optional.of( + WorkflowUtils.buildMapResolver( + definition.application(), + null, + configuration.getEnvironment().getAdditionalProperties())) + : Optional.empty(); + } + + @Override + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + envResolver + .map(env -> env.apply(workflowContext, taskContext, model)) + .ifPresent( + envs -> + command.withEnv( + envs.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).toList())); + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java new file mode 100644 index 00000000..9524a92a --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerPropertySetter.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; + +interface ContainerPropertySetter { + abstract void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model); +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java new file mode 100644 index 00000000..5c14a3ac --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java @@ -0,0 +1,266 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import static io.serverlessworkflow.api.types.ContainerLifetime.*; +import static io.serverlessworkflow.impl.WorkflowUtils.isValid; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.CreateContainerCmd; +import com.github.dockerjava.api.command.CreateContainerResponse; +import com.github.dockerjava.api.command.WaitContainerResultCallback; +import com.github.dockerjava.api.exception.DockerClientException; +import com.github.dockerjava.api.exception.NotFoundException; +import com.github.dockerjava.core.DefaultDockerClientConfig; +import com.github.dockerjava.core.DockerClientImpl; +import com.github.dockerjava.core.NameParser; +import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.api.types.ContainerLifetime; +import io.serverlessworkflow.api.types.TimeoutAfter; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +class ContainerRunner { + + private static final DefaultDockerClientConfig DEFAULT_CONFIG = + DefaultDockerClientConfig.createDefaultConfigBuilder().build(); + + private static class DockerClientHolder { + private static final DockerClient dockerClient = + DockerClientImpl.getInstance( + DEFAULT_CONFIG, + new ApacheDockerHttpClient.Builder() + .dockerHost(DEFAULT_CONFIG.getDockerHost()) + .build()); + } + + private final Collection propertySetters; + private final Optional> timeout; + private final ContainerCleanupPolicy policy; + private final String containerImage; + + private ContainerRunner(ContainerRunnerBuilder builder) { + this.propertySetters = builder.propertySetters; + this.timeout = Optional.ofNullable(builder.timeout); + this.policy = builder.policy; + this.containerImage = builder.containerImage; + } + + CompletableFuture start( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + return CompletableFuture.supplyAsync( + () -> startSync(workflowContext, taskContext, input), + workflowContext.definition().application().executorService()); + } + + private WorkflowModel startSync( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + Integer exit = executeContainer(workflowContext, taskContext, input); + if (exit == null || exit == 0) { + return input; + } else { + throw mapExitCode(exit); + } + } + + private Integer executeContainer( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + try { + pullImageIfNeeded(containerImage); + CreateContainerCmd containerCommand = + DockerClientHolder.dockerClient.createContainerCmd(containerImage); + propertySetters.forEach(p -> p.accept(containerCommand, workflowContext, taskContext, input)); + return waitAccordingToLifetime( + createAndStartContainer(containerCommand), workflowContext, taskContext, input); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw failed("Container execution failed with exit code " + ie.getMessage()); + } catch (IOException e) { + throw failed("Container execution failed with exit code " + e.getMessage()); + } + } + + private void pullImageIfNeeded(String imageRef) throws InterruptedException { + NameParser.ReposTag rt = NameParser.parseRepositoryTag(imageRef); + DockerClientHolder.dockerClient + .pullImageCmd(NameParser.resolveRepositoryName(imageRef).reposName) + .withTag(WorkflowUtils.isValid(rt.tag) ? rt.tag : "latest") + .start() + .awaitCompletion(); + } + + private String createAndStartContainer(CreateContainerCmd containerCommand) { + CreateContainerResponse resp = containerCommand.exec(); + String id = resp.getId(); + if (!isValid(id)) { + throw new IllegalStateException("Container creation failed: empty ID"); + } + DockerClientHolder.dockerClient.startContainerCmd(id).exec(); + return id; + } + + private Integer waitAccordingToLifetime( + String id, WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) + throws IOException { + try (var cb = + DockerClientHolder.dockerClient + .waitContainerCmd(id) + .exec(new WaitContainerResultCallback())) { + if (policy == ContainerCleanupPolicy.EVENTUALLY) { + Duration timeout = + this.timeout + .map(t -> t.apply(workflowContext, taskContext, input)) + .orElse(Duration.ZERO); + try { + Integer exit = cb.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS); + safeStop(id); + return exit; + } catch (DockerClientException timeoutOrOther) { + safeStop(id); + } + } else { + return cb.awaitStatusCode(); + } + } catch (NotFoundException e) { + // container already removed + } + return 0; + } + + private boolean isRunning(String id) { + try { + var st = DockerClientHolder.dockerClient.inspectContainerCmd(id).exec().getState(); + return st != null && Boolean.TRUE.equals(st.getRunning()); + } catch (Exception e) { + return false; // must be already removed + } + } + + private void safeStop(String id) { + if (isRunning(id)) { + safeStop(id, Duration.ofSeconds(10)); + try (var cb2 = + DockerClientHolder.dockerClient + .waitContainerCmd(id) + .exec(new WaitContainerResultCallback())) { + cb2.awaitStatusCode(); + safeRemove(id); + } catch (Exception ignore) { + // we can ignore this + } + } else { + safeRemove(id); + } + } + + private void safeStop(String id, Duration timeout) { + try { + DockerClientHolder.dockerClient + .stopContainerCmd(id) + .withTimeout((int) Math.max(1, timeout.toSeconds())) + .exec(); + } catch (Exception ignore) { + // we can ignore this + } + } + + // must be removed because of withAutoRemove(true), but just in case + private void safeRemove(String id) { + try { + DockerClientHolder.dockerClient.removeContainerCmd(id).withForce(true).exec(); + } catch (Exception ignore) { + // we can ignore this + } + } + + private static RuntimeException mapExitCode(int exit) { + return switch (exit) { + case 1 -> failed("General error (exit code 1)"); + case 2 -> failed("Shell syntax error (exit code 2)"); + case 126 -> failed("Command found but not executable (exit code 126)"); + case 127 -> failed("Command not found (exit code 127)"); + case 130 -> failed("Interrupted by SIGINT (exit code 130)"); + case 137 -> failed("Killed by SIGKILL (exit code 137)"); + case 139 -> failed("Segmentation fault (exit code 139)"); + case 143 -> failed("Terminated by SIGTERM (exit code 143)"); + default -> failed("Process exited with code " + exit); + }; + } + + private static RuntimeException failed(String message) { + return new RuntimeException(message); + } + + static ContainerRunnerBuilder builder() { + return new ContainerRunnerBuilder(); + } + + public static class ContainerRunnerBuilder { + private Container container; + private WorkflowDefinition definition; + private WorkflowValueResolver timeout; + private ContainerCleanupPolicy policy; + private String containerImage; + private Collection propertySetters = new ArrayList<>(); + + private ContainerRunnerBuilder() {} + + ContainerRunnerBuilder withContainer(Container container) { + this.container = container; + return this; + } + + public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) { + this.definition = definition; + return this; + } + + ContainerRunner build() { + propertySetters.add(new NamePropertySetter(definition, container)); + propertySetters.add(new CommandPropertySetter(definition, container)); + propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container)); + propertySetters.add(new LifetimePropertySetter(container)); + propertySetters.add(new PortsPropertySetter(container)); + propertySetters.add(new VolumesPropertySetter(definition, container)); + + containerImage = container.getImage(); + if (containerImage == null || container.getImage().isBlank()) { + throw new IllegalArgumentException("Container image must be provided"); + } + ContainerLifetime lifetime = container.getLifetime(); + if (lifetime != null) { + policy = lifetime.getCleanup(); + TimeoutAfter afterTimeout = lifetime.getAfter(); + if (afterTimeout != null) + timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout); + } + + return new ContainerRunner(this); + } + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java new file mode 100644 index 00000000..3606ae93 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/LifetimePropertySetter.java @@ -0,0 +1,47 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import static io.serverlessworkflow.api.types.ContainerLifetime.*; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; + +class LifetimePropertySetter implements ContainerPropertySetter { + + private final ContainerCleanupPolicy cleanupPolicy; + + LifetimePropertySetter(Container configuration) { + this.cleanupPolicy = + configuration.getLifetime() != null ? configuration.getLifetime().getCleanup() : null; + } + + @Override + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + if (ContainerCleanupPolicy.ALWAYS.equals(cleanupPolicy)) { + command.getHostConfig().withAutoRemove(true); + } else if (ContainerCleanupPolicy.NEVER.equals(cleanupPolicy)) { + command.getHostConfig().withAutoRemove(false); + } + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java new file mode 100644 index 00000000..70bc8d83 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/NamePropertySetter.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import static io.serverlessworkflow.impl.WorkflowUtils.isValid; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.util.Optional; + +class NamePropertySetter implements ContainerPropertySetter { + + private final Optional> containerName; + + NamePropertySetter(WorkflowDefinition definition, Container container) { + String containerName = container.getName(); + this.containerName = + isValid(containerName) + ? Optional.of(WorkflowUtils.buildStringFilter(definition.application(), containerName)) + : Optional.empty(); + } + + @Override + public void accept( + CreateContainerCmd createContainerCmd, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + containerName + .map(c -> c.apply(workflowContext, taskContext, model)) + .ifPresent(createContainerCmd::withName); + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java new file mode 100644 index 00000000..b176b110 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/PortsPropertySetter.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.Ports; +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +class PortsPropertySetter implements ContainerPropertySetter { + + private Ports portBindings = new Ports(); + private List exposed = new ArrayList<>(); + + PortsPropertySetter(Container configuration) { + if (configuration.getPorts() != null + && configuration.getPorts().getAdditionalProperties() != null) { + for (Map.Entry entry : + configuration.getPorts().getAdditionalProperties().entrySet()) { + ExposedPort exposedPort = ExposedPort.tcp(Integer.parseInt(entry.getKey())); + exposed.add(exposedPort); + portBindings.bind(exposedPort, Ports.Binding.bindPort(from(entry.getValue()))); + } + } + } + + @Override + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + command.withExposedPorts(exposed); + command.getHostConfig().withPortBindings(portBindings); + } + + private static Integer from(Object obj) { + if (obj instanceof Integer number) { + return number; + } else if (obj instanceof Number number) { + return number.intValue(); + } else if (obj instanceof String str) { + return Integer.parseInt(str); + } else if (obj != null) { + return Integer.parseInt(obj.toString()); + } else { + throw new IllegalArgumentException("Null value for port key"); + } + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java new file mode 100644 index 00000000..adf7482b --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.api.types.RunContainer; +import io.serverlessworkflow.api.types.RunTaskConfiguration; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.executors.RunnableTask; +import java.util.concurrent.CompletableFuture; + +public class RunContainerExecutor implements RunnableTask { + + private ContainerRunner containerRunner; + + public void init(RunContainer taskConfiguration, WorkflowDefinition definition) { + Container container = taskConfiguration.getContainer(); + containerRunner = + ContainerRunner.builder() + .withContainer(container) + .withWorkflowDefinition(definition) + .build(); + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + return containerRunner.start(workflowContext, taskContext, input); + } + + @Override + public boolean accept(Class clazz) { + return RunContainer.class.equals(clazz); + } +} diff --git a/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java new file mode 100644 index 00000000..63830f34 --- /dev/null +++ b/impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/VolumesPropertySetter.java @@ -0,0 +1,74 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.container.executors; + +import com.github.dockerjava.api.command.CreateContainerCmd; +import com.github.dockerjava.api.model.Bind; +import com.github.dockerjava.api.model.Volume; +import io.serverlessworkflow.api.types.Container; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; + +class VolumesPropertySetter implements ContainerPropertySetter { + + private static record HostContainer( + WorkflowValueResolver host, WorkflowValueResolver container) {} + + private final Collection binds = new ArrayList<>(); + + VolumesPropertySetter(WorkflowDefinition definition, Container configuration) { + if (configuration.getVolumes() != null + && configuration.getVolumes().getAdditionalProperties() != null) { + for (Map.Entry entry : + configuration.getVolumes().getAdditionalProperties().entrySet()) { + binds.add( + new HostContainer( + WorkflowUtils.buildStringFilter(definition.application(), entry.getKey()), + WorkflowUtils.buildStringFilter( + definition.application(), + Objects.requireNonNull( + entry.getValue(), "Volume value must be a not null string") + .toString()))); + } + } + } + + @Override + public void accept( + CreateContainerCmd command, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + command + .getHostConfig() + .withBinds( + binds.stream() + .map( + r -> + new Bind( + r.host().apply(workflowContext, taskContext, model), + new Volume(r.container.apply(workflowContext, taskContext, model)))) + .toList()); + } +} diff --git a/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask new file mode 100644 index 00000000..c1450d21 --- /dev/null +++ b/impl/container/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.RunnableTask @@ -0,0 +1 @@ +io.serverlessworkflow.impl.container.executors.RunContainerExecutor \ No newline at end of file diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 7b6b3403..6fef5a85 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -59,6 +59,10 @@ public static Optional getSchemaValidator( return Optional.empty(); } + public static boolean isValid(String str) { + return str != null && !str.isBlank(); + } + public static Optional buildWorkflowFilter( WorkflowApplication app, InputFrom from) { return from != null diff --git a/impl/pom.xml b/impl/pom.xml index e4698812..c59df5fa 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -15,6 +15,7 @@ 1.6.0 3.1.11 9.2.1 + 3.6.0 @@ -93,6 +94,11 @@ serverlessworkflow-impl-openapi ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-container + ${project.version} + net.thisptr jackson-jq @@ -130,6 +136,16 @@ cron-utils ${version.com.cronutils} + + com.github.docker-java + docker-java-core + ${version.docker.java} + + + com.github.docker-java + docker-java-transport-httpclient5 + ${version.docker.java} + @@ -145,6 +161,7 @@ model lifecycleevent validation + container test diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 7672bbef..f4e846cb 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -41,6 +41,10 @@ io.serverlessworkflow serverlessworkflow-impl-openapi + + io.serverlessworkflow + serverlessworkflow-impl-container + org.glassfish.jersey.core jersey-client diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java new file mode 100644 index 00000000..989ae671 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ContainerTest.java @@ -0,0 +1,263 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.InspectContainerResponse; +import com.github.dockerjava.api.model.Container; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.Frame; +import com.github.dockerjava.api.model.Ports; +import com.github.dockerjava.core.DefaultDockerClientConfig; +import com.github.dockerjava.core.DockerClientImpl; +import com.github.dockerjava.core.command.LogContainerResultCallback; +import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Map; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@DisabledIf("checkDocker") +public class ContainerTest { + + private static DockerClient dockerClient; + private static Logger logger = LoggerFactory.getLogger(ContainerTest.class); + + { + DefaultDockerClientConfig defaultConfig = + DefaultDockerClientConfig.createDefaultConfigBuilder().build(); + dockerClient = + DockerClientImpl.getInstance( + defaultConfig, + new ApacheDockerHttpClient.Builder().dockerHost(defaultConfig.getDockerHost()).build()); + } + + @SuppressWarnings("unused") + private static boolean checkDocker() { + try { + dockerClient.pingCmd().exec(); + return false; + } catch (Exception ex) { + logger.warn("Docker is not running, disabling container test"); + return true; + } + } + + private static WorkflowApplication app; + + @BeforeAll + static void init() { + app = WorkflowApplication.builder().build(); + } + + @AfterAll + static void cleanup() throws IOException { + app.close(); + } + + @Test + public void testContainer() throws IOException, InterruptedException { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-test-command.yaml"); + String containerName = "hello-world"; + try { + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); + + String containerId = findContainerIdByName(containerName); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + dockerClient + .logContainerCmd(containerId) + .withStdOut(true) + .withStdErr(true) + .withTimestamps(true) + .exec( + new LogContainerResultCallback() { + @Override + public void onNext(Frame frame) { + output.writeBytes(frame.getPayload()); + } + }) + .awaitCompletion(); + + assertTrue(output.toString().contains("Hello World")); + assertNotNull(result); + } finally { + dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec(); + } + } + + @Test + public void testContainerEnv() throws IOException, InterruptedException { + Workflow workflow = readWorkflowFromClasspath("workflows-samples/container/container-env.yaml"); + String containerName = "hello-world-envs"; + + Map input = Map.of("someValue", "Tested"); + + try { + Map result = + app.workflowDefinition(workflow).instance(input).start().join().asMap().orElseThrow(); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + dockerClient + .logContainerCmd(findContainerIdByName(containerName)) + .withStdOut(true) + .withStdErr(true) + .withTimestamps(true) + .exec( + new LogContainerResultCallback() { + @Override + public void onNext(Frame frame) { + output.writeBytes(frame.getPayload()); + } + }) + .awaitCompletion(); + assertTrue(output.toString().contains("BAR=FOO")); + assertTrue(output.toString().contains("FOO=Tested")); + assertNotNull(result); + } finally { + dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec(); + } + } + + @Test + public void testContainerTimeout() throws IOException { + String containerName = "hello-world-timeout"; + try { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-timeout.yaml"); + + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); + + String containerId = findContainerIdByName(containerName); + + assertTrue(isContainerGone(containerId)); + assertNotNull(result); + } finally { + dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec(); + } + } + + @Test + public void testContainerCleanup() throws IOException { + String containerName = "hello-world-cleanup"; + try { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-cleanup.yaml"); + + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); + + String containerId = findContainerIdByName(containerName); + assertTrue(isContainerGone(containerId)); + assertNotNull(result); + } finally { + dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec(); + } + } + + @Test + public void testContainerCleanupDefault() throws IOException { + String containerName = "hello-world-cleanup-default"; + try { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-cleanup-default.yaml"); + + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asMap().orElseThrow(); + String containerId = findContainerIdByName(containerName); + assertFalse(isContainerGone(containerId)); + assertNotNull(result); + } finally { + dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec(); + } + } + + @Test + void testPortBindings() throws Exception { + Workflow workflow = + readWorkflowFromClasspath("workflows-samples/container/container-ports.yaml"); + String containerName = "hello-world-ports"; + + try { + new Thread( + () -> { + app.workflowDefinition(workflow) + .instance(Map.of()) + .start() + .join() + .asMap() + .orElseThrow(); + }) + .start(); + + await() + .pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(10)) + .until(() -> findContainerIdByName(containerName) != null); + + String containerId = findContainerIdByName(containerName); + InspectContainerResponse inspect = dockerClient.inspectContainerCmd(containerId).exec(); + Map ports = + inspect.getNetworkSettings().getPorts().getBindings(); + + assertTrue(ports.containsKey(ExposedPort.tcp(8880))); + assertTrue(ports.containsKey(ExposedPort.tcp(8881))); + assertTrue(ports.containsKey(ExposedPort.tcp(8882))); + } finally { + dockerClient.removeContainerCmd(findContainerIdByName(containerName)).withForce(true).exec(); + } + } + + private static String findContainerIdByName(String containerName) { + var containers = dockerClient.listContainersCmd().withShowAll(true).exec(); + + return containers.stream() + .filter( + c -> + c.getNames() != null + && Arrays.stream(c.getNames()).anyMatch(n -> n.equals("/" + containerName))) + .map(Container::getId) + .findFirst() + .orElse(null); + } + + private static boolean isContainerGone(String id) { + if (id == null) { + return true; + } + var containers = dockerClient.listContainersCmd().withShowAll(true).exec(); + return containers.stream().noneMatch(c -> c.getId().startsWith(id)); + } +} diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml new file mode 100644 index 00000000..a1a10b80 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup-default.yaml @@ -0,0 +1,12 @@ +document: + dsl: '1.0.2' + namespace: test + name: container-cleanup-default + version: '0.1.0' +do: + - runContainer: + run: + container: + image: busybox:latest + command: echo Hello World + name: hello-world-cleanup-default diff --git a/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml new file mode 100644 index 00000000..aa5680f4 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-cleanup.yaml @@ -0,0 +1,14 @@ +document: + dsl: '1.0.2' + namespace: test + name: cointaner-cleanup + version: '0.1.0' +do: + - runContainer: + run: + container: + image: busybox:latest + command: echo Hello World + name: hello-world-cleanup + lifetime: + cleanup: always \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/container/container-env.yaml b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml new file mode 100644 index 00000000..8d50dbab --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-env.yaml @@ -0,0 +1,17 @@ +document: + dsl: '1.0.2' + namespace: test + name: container-env + version: '0.1.0' +do: + - runContainer: + run: + container: + image: busybox:latest + command: printenv + name: hello-world-envs + lifetime: + cleanup: never + environment: + FOO: ${ .someValue } + BAR: FOO diff --git a/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml new file mode 100644 index 00000000..3840b876 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-ports.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: container-ports + version: '0.1.0' +do: + - runContainer: + run: + container: + image: busybox:latest + command: sleep 300 + name: hello-world-ports + ports: + 8880: 8880 + 8881: 8881 + 8882: 8882 + lifetime: + cleanup: never \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml new file mode 100644 index 00000000..54887ecb --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-test-command.yaml @@ -0,0 +1,14 @@ +document: + dsl: '1.0.2' + namespace: test + name: container-test-command + version: '0.1.0' +do: + - runContainer: + run: + container: + image: busybox:3.20 + command: echo Hello World + name: hello-world + lifetime: + cleanup: never diff --git a/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml new file mode 100644 index 00000000..82f18891 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/container/container-timeout.yaml @@ -0,0 +1,16 @@ +document: + dsl: '1.0.2' + namespace: test + name: container-timeout + version: '0.1.0' +do: + - runContainer: + run: + container: + image: busybox:latest + command: sleep 300 + name: hello-world-timeout + lifetime: + cleanup: eventually + after: + seconds: 1 \ No newline at end of file