Skip to content

Commit edbe2b6

Browse files
committed
removing tombstones
if version parsing / comparable versions is not enabled, then the temporary cache will effectively be disabled Signed-off-by: Steve Hawkins <shawkins@redhat.com>
1 parent 2902bff commit edbe2b6

File tree

8 files changed

+72
-146
lines changed

8 files changed

+72
-146
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -491,18 +491,16 @@ default Set<Class<? extends HasMetadata>> withPreviousAnnotationForDependentReso
491491

492492
/**
493493
* If the event logic should parse the resourceVersion to determine the ordering of dependent
494-
* resource events. This is typically not needed.
494+
* resource events.
495495
*
496-
* <p>Disabled by default as Kubernetes does not support, and discourages, this interpretation of
497-
* resourceVersions. Enable only if your api server event processing seems to lag the operator
498-
* logic, and you want to further minimize the amount of work done / updates issued by the
499-
* operator.
496+
* <p>Enabled by default as Kubernetes does support this interpretation of resourceVersions.
497+
* Disable only if your api server provides non comparable resource versions..
500498
*
501499
* @return if resource version should be parsed (as integer)
502500
* @since 4.5.0
503501
*/
504502
default boolean parseResourceVersionsForEventFilteringAndCaching() {
505-
return false;
503+
return true;
506504
}
507505

508506
/**

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,11 @@ public static <P extends HasMetadata> P addFinalizerWithSSA(
451451
}
452452
}
453453

454+
public static int compareResourceVersions(HasMetadata h1, HasMetadata h2) {
455+
return compareResourceVersions(
456+
h1.getMetadata().getResourceVersion(), h2.getMetadata().getResourceVersion());
457+
}
458+
454459
public static int compareResourceVersions(String v1, String v2) {
455460
var v1Length = v1.length();
456461
if (v1Length == 0) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public InformerEventSource(
9292
}
9393

9494
InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
95-
this(configuration, client, false);
95+
this(configuration, client, true);
9696
}
9797

9898
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -211,17 +211,7 @@ private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
211211
if (res.isEmpty()) {
212212
return isEventKnownFromAnnotation(newObject, oldObject);
213213
}
214-
boolean resVersionsEqual =
215-
newObject
216-
.getMetadata()
217-
.getResourceVersion()
218-
.equals(res.get().getMetadata().getResourceVersion());
219-
log.debug(
220-
"Resource found in temporal cache for id: {} resource versions equal: {}",
221-
resourceID,
222-
resVersionsEqual);
223-
return resVersionsEqual
224-
|| temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject);
214+
return temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject);
225215
}
226216

227217
private boolean isEventKnownFromAnnotation(R newObject, R oldObject) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,11 @@ public Optional<R> get(ResourceID resourceID) {
221221
: r);
222222
}
223223

224+
public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
225+
return getSource(namespace.orElse(WATCH_ALL_NAMESPACES))
226+
.map(source -> source.getLastSyncResourceVersion());
227+
}
228+
224229
@Override
225230
public Stream<ResourceID> keys() {
226231
return sources.values().stream().flatMap(Cache::keys);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ public Optional<T> get(ResourceID resourceID) {
156156
return Optional.ofNullable(cache.getByKey(getKey(resourceID)));
157157
}
158158

159+
public String getLastSyncResourceVersion() {
160+
return this.informer.lastSyncResourceVersion();
161+
}
162+
159163
private String getKey(ResourceID resourceID) {
160164
return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName());
161165
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
3535
import io.javaoperatorsdk.operator.api.config.Informable;
3636
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
37+
import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
3738
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
3839
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
3940
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
@@ -133,19 +134,27 @@ public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
133134

134135
@Override
135136
public Optional<R> get(ResourceID resourceID) {
137+
var res = cache.get(resourceID);
136138
Optional<R> resource = temporaryResourceCache.getResourceFromCache(resourceID);
137-
if (resource.isPresent()) {
138-
log.debug("Resource found in temporary cache for Resource ID: {}", resourceID);
139+
if (resource.isPresent()
140+
&& res.filter(
141+
r ->
142+
PrimaryUpdateAndCacheUtils.compareResourceVersions(r, resource.orElseThrow())
143+
> 0)
144+
.isEmpty()) {
145+
log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID);
139146
return resource;
140-
} else {
141-
log.debug(
142-
"Resource not found in temporary cache reading it from informer cache,"
143-
+ " for Resource ID: {}",
144-
resourceID);
145-
var res = cache.get(resourceID);
146-
log.debug("Resource found in cache: {} for id: {}", res.isPresent(), resourceID);
147-
return res;
148147
}
148+
log.debug(
149+
"Resource not found, or older, in temporary cache. Found in informer cache {}, for"
150+
+ " Resource ID: {}",
151+
res.isPresent(),
152+
resourceID);
153+
return res;
154+
}
155+
156+
public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
157+
return cache.getLastSyncResourceVersion(namespace);
149158
}
150159

151160
@SuppressWarnings("unused")

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 25 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

18-
import java.util.LinkedHashMap;
1918
import java.util.Map;
2019
import java.util.Optional;
2120
import java.util.concurrent.ConcurrentHashMap;
@@ -25,6 +24,7 @@
2524

2625
import io.fabric8.kubernetes.api.model.HasMetadata;
2726
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
27+
import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
2828
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
2929
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3030

@@ -46,53 +46,10 @@
4646
*/
4747
public class TemporaryResourceCache<T extends HasMetadata> {
4848

49-
static class ExpirationCache<K> {
50-
private final LinkedHashMap<K, Long> cache;
51-
private final int ttlMs;
52-
53-
public ExpirationCache(int maxEntries, int ttlMs) {
54-
this.ttlMs = ttlMs;
55-
this.cache =
56-
new LinkedHashMap<>() {
57-
@Override
58-
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
59-
return size() > maxEntries;
60-
}
61-
};
62-
}
63-
64-
public void add(K key) {
65-
clean();
66-
cache.putIfAbsent(key, System.currentTimeMillis());
67-
}
68-
69-
public boolean contains(K key) {
70-
clean();
71-
return cache.get(key) != null;
72-
}
73-
74-
void clean() {
75-
if (!cache.isEmpty()) {
76-
long currentTimeMillis = System.currentTimeMillis();
77-
var iter = cache.entrySet().iterator();
78-
// the order will already be from oldest to newest, clean a fixed number of entries to
79-
// amortize the cost amongst multiple calls
80-
for (int i = 0; i < 10 && iter.hasNext(); i++) {
81-
var entry = iter.next();
82-
if (currentTimeMillis - entry.getValue() > ttlMs) {
83-
iter.remove();
84-
}
85-
}
86-
}
87-
}
88-
}
89-
9049
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
9150

9251
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
9352

94-
// keep up to the last million deletions for up to 10 minutes
95-
private final ExpirationCache<String> tombstones = new ExpirationCache<>(1000000, 1200000);
9653
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
9754
private final boolean parseResourceVersions;
9855

@@ -104,7 +61,6 @@ public TemporaryResourceCache(
10461
}
10562

10663
public synchronized void onDeleteEvent(T resource, boolean unknownState) {
107-
tombstones.add(resource.getMetadata().getUid());
10864
onEvent(resource, unknownState);
10965
}
11066

@@ -130,37 +86,37 @@ public synchronized void putAddedResource(T newResource) {
13086
* @param previousResourceVersion null indicates an add
13187
*/
13288
public synchronized void putResource(T newResource, String previousResourceVersion) {
89+
if (!parseResourceVersions) {
90+
return;
91+
}
92+
13393
var resourceId = ResourceID.fromResource(newResource);
134-
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);
13594

136-
boolean moveAhead = false;
137-
if (previousResourceVersion == null && cachedResource == null) {
138-
if (tombstones.contains(newResource.getMetadata().getUid())) {
139-
log.debug(
140-
"Won't resurrect uid {} for resource id: {}",
141-
newResource.getMetadata().getUid(),
142-
resourceId);
143-
return;
144-
}
145-
// we can skip further checks as this is a simple add and there's no previous entry to
146-
// consider
147-
moveAhead = true;
95+
String latest =
96+
managedInformerEventSource
97+
.getLastSyncResourceVersion(resourceId.getNamespace())
98+
.orElse(null);
99+
if (latest != null
100+
&& PrimaryUpdateAndCacheUtils.compareResourceVersions(
101+
latest, newResource.getMetadata().getResourceVersion())
102+
>= 0) {
103+
log.debug(
104+
"Resource {}: resourceVersion {} is not later than latest {}",
105+
resourceId,
106+
newResource.getMetadata().getResourceVersion(),
107+
latest);
108+
return;
148109
}
149110

150-
if (moveAhead
151-
|| (cachedResource != null
152-
&& (cachedResource
153-
.getMetadata()
154-
.getResourceVersion()
155-
.equals(previousResourceVersion))
156-
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
111+
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);
112+
113+
if (cachedResource == null
114+
|| PrimaryUpdateAndCacheUtils.compareResourceVersions(newResource, cachedResource) > 0) {
157115
log.debug(
158116
"Temporarily moving ahead to target version {} for resource id: {}",
159117
newResource.getMetadata().getResourceVersion(),
160118
resourceId);
161119
cache.put(resourceId, newResource);
162-
} else if (cache.remove(resourceId) != null) {
163-
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
164120
}
165121
}
166122

@@ -170,20 +126,8 @@ public synchronized void putResource(T newResource, String previousResourceVersi
170126
* cachedResource, otherwise false
171127
*/
172128
public boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
173-
try {
174-
if (parseResourceVersions
175-
&& Long.parseLong(newResource.getMetadata().getResourceVersion())
176-
> Long.parseLong(cachedResource.getMetadata().getResourceVersion())) {
177-
return true;
178-
}
179-
} catch (NumberFormatException e) {
180-
log.debug(
181-
"Could not compare resourceVersions {} and {} for {}",
182-
newResource.getMetadata().getResourceVersion(),
183-
cachedResource.getMetadata().getResourceVersion(),
184-
resourceId);
185-
}
186-
return false;
129+
return parseResourceVersions
130+
&& PrimaryUpdateAndCacheUtils.compareResourceVersions(newResource, cachedResource) > 0;
187131
}
188132

189133
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@
1717

1818
import java.util.Map;
1919
import java.util.Optional;
20-
import java.util.concurrent.TimeUnit;
2120

22-
import org.awaitility.Awaitility;
2321
import org.junit.jupiter.api.BeforeEach;
2422
import org.junit.jupiter.api.Test;
2523

2624
import io.fabric8.kubernetes.api.model.ConfigMap;
2725
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
2826
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
2927
import io.javaoperatorsdk.operator.processing.event.ResourceID;
30-
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.ExpirationCache;
3128

3229
import static org.assertj.core.api.Assertions.assertThat;
3330
import static org.mockito.ArgumentMatchers.any;
@@ -46,17 +43,17 @@ class TemporaryPrimaryResourceCacheTest {
4643
@BeforeEach
4744
void setup() {
4845
informerEventSource = mock(InformerEventSource.class);
49-
temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false);
46+
temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true);
5047
}
5148

5249
@Test
5350
void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion() {
5451
var testResource = testResource();
5552
var prevTestResource = testResource();
56-
prevTestResource.getMetadata().setResourceVersion("0");
53+
prevTestResource.getMetadata().setResourceVersion("1");
5754
when(informerEventSource.get(any())).thenReturn(Optional.of(prevTestResource));
5855

59-
temporaryResourceCache.putResource(testResource, "0");
56+
temporaryResourceCache.putResource(testResource, "2");
6057

6158
var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource));
6259
assertThat(cached).isPresent();
@@ -66,10 +63,10 @@ void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion()
6663
void updateNotAddsTheResourceIntoCacheIfTheInformerHasOtherVersion() {
6764
var testResource = testResource();
6865
var informerCachedResource = testResource();
69-
informerCachedResource.getMetadata().setResourceVersion("x");
66+
informerCachedResource.getMetadata().setResourceVersion("2");
7067
when(informerEventSource.get(any())).thenReturn(Optional.of(informerCachedResource));
7168

72-
temporaryResourceCache.putResource(testResource, "0");
69+
temporaryResourceCache.putResource(testResource, "1");
7370

7471
var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource));
7572
assertThat(cached).isNotPresent();
@@ -143,41 +140,15 @@ void rapidDeletion() {
143140
.endMetadata()
144141
.build(),
145142
false);
143+
when(informerEventSource.getLastSyncResourceVersion(
144+
Optional.of(testResource.getMetadata().getNamespace())))
145+
.thenReturn(Optional.of("3"));
146146
temporaryResourceCache.putAddedResource(testResource);
147147

148148
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
149149
.isEmpty();
150150
}
151151

152-
@Test
153-
void expirationCacheMax() {
154-
ExpirationCache<Integer> cache = new ExpirationCache<>(2, Integer.MAX_VALUE);
155-
156-
cache.add(1);
157-
cache.add(2);
158-
cache.add(3);
159-
160-
assertThat(cache.contains(1)).isFalse();
161-
assertThat(cache.contains(2)).isTrue();
162-
assertThat(cache.contains(3)).isTrue();
163-
}
164-
165-
@Test
166-
void expirationCacheTtl() {
167-
ExpirationCache<Integer> cache = new ExpirationCache<>(2, 1);
168-
169-
cache.add(1);
170-
cache.add(2);
171-
172-
Awaitility.await()
173-
.atMost(1, TimeUnit.SECONDS)
174-
.untilAsserted(
175-
() -> {
176-
assertThat(cache.contains(1)).isFalse();
177-
assertThat(cache.contains(2)).isFalse();
178-
});
179-
}
180-
181152
private ConfigMap propagateTestResourceToCache() {
182153
var testResource = testResource();
183154
when(informerEventSource.get(any())).thenReturn(Optional.empty());

0 commit comments

Comments
 (0)