Skip to content

Commit cdc2557

Browse files
committed
Add initial RunContainer Task support
Signed-off-by: Dmitrii Tikhomirov <chani.liet@gmail.com> Signed-off-by: Dmitrii Tikhomirov <chani.liet@gmail.com>
1 parent 0d27f2e commit cdc2557

File tree

16 files changed

+691
-0
lines changed

16 files changed

+691
-0
lines changed

impl/container/pom.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>io.serverlessworkflow</groupId>
8+
<artifactId>serverlessworkflow-impl</artifactId>
9+
<version>8.0.0-SNAPSHOT</version>
10+
</parent>
11+
<artifactId>serverlessworkflow-impl-container</artifactId>
12+
<name>Serverless Workflow :: Impl :: OpenAPI</name>
13+
14+
<dependencies>
15+
<dependency>
16+
<groupId>io.serverlessworkflow</groupId>
17+
<artifactId>serverlessworkflow-impl-core</artifactId>
18+
</dependency>
19+
<dependency>
20+
<groupId>io.serverlessworkflow</groupId>
21+
<artifactId>serverlessworkflow-types</artifactId>
22+
</dependency>
23+
<dependency>
24+
<groupId>com.github.docker-java</groupId>
25+
<artifactId>docker-java-core</artifactId>
26+
</dependency>
27+
<dependency>
28+
<groupId>com.github.docker-java</groupId>
29+
<artifactId>docker-java-transport-httpclient5</artifactId>
30+
</dependency>
31+
</dependencies>
32+
33+
</project>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.container.executors;
17+
18+
import com.github.dockerjava.api.command.CreateContainerCmd;
19+
import io.serverlessworkflow.api.types.Container;
20+
21+
class CommandPropertySetter extends ContainerPropertySetter {
22+
23+
CommandPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) {
24+
super(createContainerCmd, configuration);
25+
}
26+
27+
@Override
28+
public void accept(StringExpressionResolver resolver) {
29+
if (configuration.getCommand() != null && !configuration.getCommand().isEmpty()) {
30+
createContainerCmd.withCmd("sh", "-c", configuration.getCommand());
31+
}
32+
}
33+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.container.executors;
17+
18+
import com.github.dockerjava.api.command.CreateContainerCmd;
19+
import io.serverlessworkflow.api.types.Container;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
class ContainerEnvironmentPropertySetter extends ContainerPropertySetter {
25+
26+
ContainerEnvironmentPropertySetter(
27+
CreateContainerCmd createContainerCmd, Container configuration) {
28+
super(createContainerCmd, configuration);
29+
}
30+
31+
@Override
32+
public void accept(StringExpressionResolver resolver) {
33+
if (!(configuration.getEnvironment() == null
34+
|| configuration.getEnvironment().getAdditionalProperties() == null)) {
35+
List<String> envs = new ArrayList<>();
36+
for (Map.Entry<String, Object> entry :
37+
configuration.getEnvironment().getAdditionalProperties().entrySet()) {
38+
String key = entry.getKey();
39+
if (entry.getValue() instanceof String value) {
40+
String resolvedValue = resolver.resolve(value);
41+
envs.add(key + "=" + resolvedValue);
42+
} else {
43+
throw new IllegalArgumentException("Environment variable values must be strings");
44+
}
45+
}
46+
createContainerCmd.withEnv(envs.toArray(new String[0]));
47+
}
48+
}
49+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.container.executors;
17+
18+
import com.github.dockerjava.api.command.CreateContainerCmd;
19+
import io.serverlessworkflow.api.types.Container;
20+
import java.util.function.Consumer;
21+
22+
abstract class ContainerPropertySetter implements Consumer<StringExpressionResolver> {
23+
24+
protected final CreateContainerCmd createContainerCmd;
25+
protected final Container configuration;
26+
27+
ContainerPropertySetter(CreateContainerCmd createContainerCmd, Container configuration) {
28+
this.createContainerCmd = createContainerCmd;
29+
this.configuration = configuration;
30+
}
31+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.container.executors;
17+
18+
import static io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy.*;
19+
20+
import com.github.dockerjava.api.DockerClient;
21+
import com.github.dockerjava.api.command.CreateContainerCmd;
22+
import com.github.dockerjava.api.command.CreateContainerResponse;
23+
import com.github.dockerjava.api.command.WaitContainerResultCallback;
24+
import com.github.dockerjava.api.exception.DockerClientException;
25+
import com.github.dockerjava.core.DefaultDockerClientConfig;
26+
import com.github.dockerjava.core.DockerClientImpl;
27+
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
28+
import io.serverlessworkflow.api.types.Container;
29+
import io.serverlessworkflow.impl.TaskContext;
30+
import io.serverlessworkflow.impl.WorkflowContext;
31+
import io.serverlessworkflow.impl.WorkflowDefinition;
32+
import io.serverlessworkflow.impl.WorkflowModel;
33+
import io.serverlessworkflow.impl.WorkflowUtils;
34+
import io.serverlessworkflow.impl.WorkflowValueResolver;
35+
import java.io.IOException;
36+
import java.time.Duration;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.TimeUnit;
41+
42+
class ContainerRunner {
43+
44+
private static final DefaultDockerClientConfig DEFAULT_CONFIG =
45+
DefaultDockerClientConfig.createDefaultConfigBuilder().build();
46+
47+
private static final DockerClient dockerClient =
48+
DockerClientImpl.getInstance(
49+
DEFAULT_CONFIG,
50+
new ApacheDockerHttpClient.Builder().dockerHost(DEFAULT_CONFIG.getDockerHost()).build());
51+
52+
private final CreateContainerCmd createContainerCmd;
53+
private final Container container;
54+
private final List<ContainerPropertySetter> propertySetters;
55+
private final WorkflowDefinition definition;
56+
57+
private ContainerRunner(
58+
CreateContainerCmd createContainerCmd, WorkflowDefinition definition, Container container) {
59+
this.createContainerCmd = createContainerCmd;
60+
this.definition = definition;
61+
this.container = container;
62+
this.propertySetters = new ArrayList<>();
63+
}
64+
65+
/**
66+
* Blocking container execution according to the lifetime policy. Returns an already completed
67+
* CompletableFuture: - completedFuture(input) if exitCode == 0 - exceptionally completed if the
68+
* exit code is non-zero or an error occurs. The method blocks the calling thread until the
69+
* container finishes or the timeout expires.
70+
*/
71+
CompletableFuture<WorkflowModel> startSync(
72+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
73+
74+
StringExpressionResolver resolver =
75+
new StringExpressionResolver(workflowContext, taskContext, input);
76+
77+
propertySetters.forEach(setter -> setter.accept(resolver));
78+
79+
CreateContainerResponse createContainerResponse = createContainerCmd.exec();
80+
String containerId = createContainerResponse.getId();
81+
82+
if (containerId == null || containerId.isEmpty()) {
83+
return failed("Container creation failed: empty container ID");
84+
}
85+
86+
dockerClient.startContainerCmd(containerId).exec();
87+
88+
int exitCode;
89+
try (WaitContainerResultCallback resultCallback =
90+
dockerClient.waitContainerCmd(containerId).exec(new WaitContainerResultCallback())) {
91+
if (container.getLifetime() != null
92+
&& container.getLifetime().getCleanup() != null
93+
&& container.getLifetime().getCleanup().equals(EVENTUALLY)) {
94+
try {
95+
WorkflowValueResolver<Duration> durationResolver =
96+
WorkflowUtils.fromTimeoutAfter(
97+
definition.application(), container.getLifetime().getAfter());
98+
Duration timeout = durationResolver.apply(workflowContext, taskContext, input);
99+
exitCode = resultCallback.awaitStatusCode(timeout.toMillis(), TimeUnit.MILLISECONDS);
100+
} catch (DockerClientException e) {
101+
return failed(
102+
String.format("Error while waiting for container to finish: %s ", e.getMessage()));
103+
} finally {
104+
dockerClient.removeContainerCmd(containerId).withForce(true).exec();
105+
}
106+
} else {
107+
exitCode = resultCallback.awaitStatusCode();
108+
}
109+
} catch (IOException e) {
110+
return failed(
111+
String.format("Error while waiting for container to finish: %s ", e.getMessage()));
112+
}
113+
114+
return switch (exitCode) {
115+
case 0 -> CompletableFuture.completedFuture(input);
116+
case 1 -> failed("General error (exit code 1)");
117+
case 2 -> failed("Shell syntax error (exit code 2)");
118+
case 126 -> failed("Command found but not executable (exit code 126)");
119+
case 127 -> failed("Command not found (exit code 127)");
120+
case 130 -> failed("Interrupted by SIGINT (exit code 130)");
121+
case 137 -> failed("Killed by SIGKILL (exit code 137)");
122+
case 139 -> failed("Segmentation fault (exit code 139)");
123+
case 143 -> failed("Terminated by SIGTERM (exit code 143)");
124+
default -> failed("Process exited with code " + exitCode);
125+
};
126+
}
127+
128+
private static <T> CompletableFuture<T> failed(String message) {
129+
CompletableFuture<T> f = new CompletableFuture<>();
130+
f.completeExceptionally(new RuntimeException(message));
131+
return f;
132+
}
133+
134+
static ContainerRunnerBuilder builder() {
135+
return new ContainerRunnerBuilder();
136+
}
137+
138+
public static class ContainerRunnerBuilder {
139+
private Container container = null;
140+
private WorkflowDefinition workflowDefinition;
141+
142+
private ContainerRunnerBuilder() {}
143+
144+
ContainerRunnerBuilder withContainer(Container container) {
145+
this.container = container;
146+
return this;
147+
}
148+
149+
public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) {
150+
this.workflowDefinition = definition;
151+
return this;
152+
}
153+
154+
ContainerRunner build() {
155+
if (container.getImage() == null || container.getImage().isEmpty()) {
156+
throw new IllegalArgumentException("Container image must be provided");
157+
}
158+
159+
CreateContainerCmd createContainerCmd = dockerClient.createContainerCmd(container.getImage());
160+
161+
ContainerRunner runner =
162+
new ContainerRunner(createContainerCmd, workflowDefinition, container);
163+
164+
runner.propertySetters.add(new CommandPropertySetter(createContainerCmd, container));
165+
runner.propertySetters.add(
166+
new ContainerEnvironmentPropertySetter(createContainerCmd, container));
167+
runner.propertySetters.add(new NamePropertySetter(createContainerCmd, container));
168+
runner.propertySetters.add(new PortsPropertySetter(createContainerCmd, container));
169+
runner.propertySetters.add(new VolumesPropertySetter(createContainerCmd, container));
170+
runner.propertySetters.add(new LifetimePropertySetter(createContainerCmd, container));
171+
return runner;
172+
}
173+
}
174+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.container.executors;
17+
18+
import static io.serverlessworkflow.api.types.ContainerLifetime.*;
19+
20+
import com.github.dockerjava.api.command.CreateContainerCmd;
21+
import io.serverlessworkflow.api.types.Container;
22+
import io.serverlessworkflow.api.types.ContainerLifetime;
23+
24+
class LifetimePropertySetter extends ContainerPropertySetter {
25+
26+
LifetimePropertySetter(CreateContainerCmd createContainerCmd, Container configuration) {
27+
super(createContainerCmd, configuration);
28+
}
29+
30+
@Override
31+
public void accept(StringExpressionResolver resolver) {
32+
// case of cleanup=eventually processed at ContainerRunner
33+
if (configuration.getLifetime() != null) {
34+
ContainerLifetime lifetime = configuration.getLifetime();
35+
ContainerCleanupPolicy cleanupPolicy = lifetime.getCleanup();
36+
if (cleanupPolicy.equals(ContainerCleanupPolicy.ALWAYS)) {
37+
createContainerCmd.getHostConfig().withAutoRemove(true);
38+
} else if (cleanupPolicy.equals(ContainerCleanupPolicy.NEVER)) {
39+
createContainerCmd.getHostConfig().withAutoRemove(false);
40+
}
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)