Skip to content

Commit 76b39b1

Browse files
committed
removing tombstones
Signed-off-by: Steve Hawkins <shawkins@redhat.com>
1 parent 27f050c commit 76b39b1

File tree

11 files changed

+123
-202
lines changed

11 files changed

+123
-202
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -491,18 +491,16 @@ default Set<Class<? extends HasMetadata>> withPreviousAnnotationForDependentReso
491491

492492
/**
493493
* If the event logic should parse the resourceVersion to determine the ordering of dependent
494-
* resource events. This is typically not needed.
494+
* resource events.
495495
*
496-
* <p>Disabled by default as Kubernetes does not support, and discourages, this interpretation of
497-
* resourceVersions. Enable only if your api server event processing seems to lag the operator
498-
* logic, and you want to further minimize the amount of work done / updates issued by the
499-
* operator.
496+
* <p>Enabled by default as Kubernetes does support this interpretation of resourceVersions.
497+
* Disable only if your api server provides non comparable resource versions..
500498
*
501499
* @return if resource version should be parsed (as integer)
502500
* @since 4.5.0
503501
*/
504502
default boolean parseResourceVersionsForEventFilteringAndCaching() {
505-
return false;
503+
return true;
506504
}
507505

508506
/**

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,11 @@ public static <P extends HasMetadata> P addFinalizerWithSSA(
451451
}
452452
}
453453

454+
public static int compareResourceVersions(HasMetadata h1, HasMetadata h2) {
455+
return compareResourceVersions(
456+
h1.getMetadata().getResourceVersion(), h2.getMetadata().getResourceVersion());
457+
}
458+
454459
public static int compareResourceVersions(String v1, String v2) {
455460
var v1Length = v1.length();
456461
if (v1Length == 0) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class ControllerEventSource<T extends HasMetadata>
4747

4848
@SuppressWarnings({"unchecked", "rawtypes"})
4949
public ControllerEventSource(Controller<T> controller) {
50-
super(NAME, controller.getCRClient(), controller.getConfiguration(), false);
50+
super(NAME, controller.getCRClient(), controller.getConfiguration(), true);
5151
this.controller = controller;
5252

5353
final var config = controller.getConfiguration();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public InformerEventSource(
9292
}
9393

9494
InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
95-
this(configuration, client, false);
95+
this(configuration, client, true);
9696
}
9797

9898
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -207,21 +207,8 @@ private synchronized void onAddOrUpdate(
207207
}
208208

209209
private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
210-
var res = temporaryResourceCache.getResourceFromCache(resourceID);
211-
if (res.isEmpty()) {
212-
return isEventKnownFromAnnotation(newObject, oldObject);
213-
}
214-
boolean resVersionsEqual =
215-
newObject
216-
.getMetadata()
217-
.getResourceVersion()
218-
.equals(res.get().getMetadata().getResourceVersion());
219-
log.debug(
220-
"Resource found in temporal cache for id: {} resource versions equal: {}",
221-
resourceID,
222-
resVersionsEqual);
223-
return resVersionsEqual
224-
|| temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject);
210+
return temporaryResourceCache.canSkipEvent(resourceID, newObject)
211+
|| isEventKnownFromAnnotation(newObject, oldObject);
225212
}
226213

227214
private boolean isEventKnownFromAnnotation(R newObject, R oldObject) {
@@ -301,11 +288,7 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
301288

302289
private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
303290
primaryToSecondaryIndex.onAddOrUpdate(newResource);
304-
temporaryResourceCache.putResource(
305-
newResource,
306-
Optional.ofNullable(oldResource)
307-
.map(r -> r.getMetadata().getResourceVersion())
308-
.orElse(null));
291+
temporaryResourceCache.putResource(newResource);
309292
}
310293

311294
private boolean useSecondaryToPrimaryIndex() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,11 @@ public Optional<R> get(ResourceID resourceID) {
221221
: r);
222222
}
223223

224+
public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
225+
return getSource(namespace.orElse(WATCH_ALL_NAMESPACES))
226+
.map(source -> source.getLastSyncResourceVersion());
227+
}
228+
224229
@Override
225230
public Stream<ResourceID> keys() {
226231
return sources.values().stream().flatMap(Cache::keys);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ public Optional<T> get(ResourceID resourceID) {
156156
return Optional.ofNullable(cache.getByKey(getKey(resourceID)));
157157
}
158158

159+
public String getLastSyncResourceVersion() {
160+
return this.informer.lastSyncResourceVersion();
161+
}
162+
159163
private String getKey(ResourceID resourceID) {
160164
return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName());
161165
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
3535
import io.javaoperatorsdk.operator.api.config.Informable;
3636
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
37+
import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
3738
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
3839
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
3940
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
@@ -122,30 +123,38 @@ public synchronized void stop() {
122123
@Override
123124
public void handleRecentResourceUpdate(
124125
ResourceID resourceID, R resource, R previousVersionOfResource) {
125-
temporaryResourceCache.putResource(
126-
resource, previousVersionOfResource.getMetadata().getResourceVersion());
126+
temporaryResourceCache.putResource(resource);
127127
}
128128

129129
@Override
130130
public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
131-
temporaryResourceCache.putAddedResource(resource);
131+
temporaryResourceCache.putResource(resource);
132132
}
133133

134134
@Override
135135
public Optional<R> get(ResourceID resourceID) {
136+
var res = cache.get(resourceID);
136137
Optional<R> resource = temporaryResourceCache.getResourceFromCache(resourceID);
137-
if (resource.isPresent()) {
138-
log.debug("Resource found in temporary cache for Resource ID: {}", resourceID);
138+
if (parseResourceVersions
139+
&& resource.isPresent()
140+
&& res.filter(
141+
r ->
142+
PrimaryUpdateAndCacheUtils.compareResourceVersions(r, resource.orElseThrow())
143+
> 0)
144+
.isEmpty()) {
145+
log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID);
139146
return resource;
140-
} else {
141-
log.debug(
142-
"Resource not found in temporary cache reading it from informer cache,"
143-
+ " for Resource ID: {}",
144-
resourceID);
145-
var res = cache.get(resourceID);
146-
log.debug("Resource found in cache: {} for id: {}", res.isPresent(), resourceID);
147-
return res;
148147
}
148+
log.debug(
149+
"Resource not found, or older, in temporary cache. Found in informer cache {}, for"
150+
+ " Resource ID: {}",
151+
res.isPresent(),
152+
resourceID);
153+
return res;
154+
}
155+
156+
public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
157+
return cache.getLastSyncResourceVersion(namespace);
149158
}
150159

151160
@SuppressWarnings("unused")

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 45 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package io.javaoperatorsdk.operator.processing.event.source.informer;
1717

18-
import java.util.LinkedHashMap;
1918
import java.util.Map;
2019
import java.util.Optional;
2120
import java.util.concurrent.ConcurrentHashMap;
@@ -24,7 +23,7 @@
2423
import org.slf4j.LoggerFactory;
2524

2625
import io.fabric8.kubernetes.api.model.HasMetadata;
27-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
26+
import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
2827
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
2928
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3029

@@ -46,53 +45,10 @@
4645
*/
4746
public class TemporaryResourceCache<T extends HasMetadata> {
4847

49-
static class ExpirationCache<K> {
50-
private final LinkedHashMap<K, Long> cache;
51-
private final int ttlMs;
52-
53-
public ExpirationCache(int maxEntries, int ttlMs) {
54-
this.ttlMs = ttlMs;
55-
this.cache =
56-
new LinkedHashMap<>() {
57-
@Override
58-
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
59-
return size() > maxEntries;
60-
}
61-
};
62-
}
63-
64-
public void add(K key) {
65-
clean();
66-
cache.putIfAbsent(key, System.currentTimeMillis());
67-
}
68-
69-
public boolean contains(K key) {
70-
clean();
71-
return cache.get(key) != null;
72-
}
73-
74-
void clean() {
75-
if (!cache.isEmpty()) {
76-
long currentTimeMillis = System.currentTimeMillis();
77-
var iter = cache.entrySet().iterator();
78-
// the order will already be from oldest to newest, clean a fixed number of entries to
79-
// amortize the cost amongst multiple calls
80-
for (int i = 0; i < 10 && iter.hasNext(); i++) {
81-
var entry = iter.next();
82-
if (currentTimeMillis - entry.getValue() > ttlMs) {
83-
iter.remove();
84-
}
85-
}
86-
}
87-
}
88-
}
89-
9048
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
9149

9250
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
9351

94-
// keep up to the last million deletions for up to 10 minutes
95-
private final ExpirationCache<String> tombstones = new ExpirationCache<>(1000000, 1200000);
9652
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
9753
private final boolean parseResourceVersions;
9854

@@ -104,7 +60,6 @@ public TemporaryResourceCache(
10460
}
10561

10662
public synchronized void onDeleteEvent(T resource, boolean unknownState) {
107-
tombstones.add(resource.getMetadata().getUid());
10863
onEvent(resource, unknownState);
10964
}
11065

@@ -116,74 +71,68 @@ synchronized void onEvent(T resource, boolean unknownState) {
11671
cache.computeIfPresent(
11772
ResourceID.fromResource(resource),
11873
(id, cached) ->
119-
(unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached);
120-
}
121-
122-
public synchronized void putAddedResource(T newResource) {
123-
putResource(newResource, null);
74+
(unknownState
75+
|| PrimaryUpdateAndCacheUtils.compareResourceVersions(resource, cached) >= 0)
76+
? null
77+
: cached);
12478
}
12579

12680
/**
12781
* put the item into the cache if the previousResourceVersion matches the current state. If not
12882
* the currently cached item is removed.
129-
*
130-
* @param previousResourceVersion null indicates an add
13183
*/
132-
public synchronized void putResource(T newResource, String previousResourceVersion) {
84+
public synchronized void putResource(T newResource) {
85+
if (!parseResourceVersions) {
86+
return;
87+
}
88+
13389
var resourceId = ResourceID.fromResource(newResource);
134-
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);
13590

136-
boolean moveAhead = false;
137-
if (previousResourceVersion == null && cachedResource == null) {
138-
if (tombstones.contains(newResource.getMetadata().getUid())) {
139-
log.debug(
140-
"Won't resurrect uid {} for resource id: {}",
141-
newResource.getMetadata().getUid(),
142-
resourceId);
143-
return;
144-
}
145-
// we can skip further checks as this is a simple add and there's no previous entry to
146-
// consider
147-
moveAhead = true;
91+
if (newResource.getMetadata().getResourceVersion() == null) {
92+
log.warn(
93+
"Resource {}: with no resourceVersion put in temporary cache. This is not the expected"
94+
+ " usage pattern, only resources returned from the api server should be put in the"
95+
+ " cache.",
96+
resourceId);
97+
return;
14898
}
14999

150-
if (moveAhead
151-
|| (cachedResource != null
152-
&& (cachedResource
153-
.getMetadata()
154-
.getResourceVersion()
155-
.equals(previousResourceVersion))
156-
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
100+
// first check against the source in general - this also prevents resurrecting resources when
101+
// we've already seen the deletion event
102+
String latest =
103+
managedInformerEventSource
104+
.getLastSyncResourceVersion(resourceId.getNamespace())
105+
.orElse(null);
106+
if (latest != null
107+
&& PrimaryUpdateAndCacheUtils.compareResourceVersions(
108+
latest, newResource.getMetadata().getResourceVersion())
109+
> 0) {
157110
log.debug(
158-
"Temporarily moving ahead to target version {} for resource id: {}",
111+
"Resource {}: resourceVersion {} is not later than latest {}",
112+
resourceId,
159113
newResource.getMetadata().getResourceVersion(),
160-
resourceId);
161-
cache.put(resourceId, newResource);
162-
} else if (cache.remove(resourceId) != null) {
163-
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
114+
latest);
115+
return;
164116
}
165-
}
166117

167-
/**
168-
* @return true if {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()}
169-
* is enabled and the resourceVersion of newResource is numerically greater than
170-
* cachedResource, otherwise false
171-
*/
172-
public boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
173-
try {
174-
if (parseResourceVersions
175-
&& Long.parseLong(newResource.getMetadata().getResourceVersion())
176-
> Long.parseLong(cachedResource.getMetadata().getResourceVersion())) {
177-
return true;
178-
}
179-
} catch (NumberFormatException e) {
118+
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);
119+
120+
if (cachedResource == null
121+
|| PrimaryUpdateAndCacheUtils.compareResourceVersions(newResource, cachedResource) >= 0) {
180122
log.debug(
181-
"Could not compare resourceVersions {} and {} for {}",
123+
"Temporarily moving ahead to target version {} for resource id: {}",
182124
newResource.getMetadata().getResourceVersion(),
183-
cachedResource.getMetadata().getResourceVersion(),
184125
resourceId);
126+
cache.put(resourceId, newResource);
185127
}
186-
return false;
128+
}
129+
130+
public boolean canSkipEvent(ResourceID resourceID, T resource) {
131+
return parseResourceVersions
132+
&& getResourceFromCache(resourceID)
133+
.filter(
134+
cached -> PrimaryUpdateAndCacheUtils.compareResourceVersions(cached, resource) >= 0)
135+
.isPresent();
187136
}
188137

189138
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,12 @@ public synchronized void start() {}
9494
}
9595

9696
@Test
97-
void skipsEventPropagationIfResourceWithSameVersionInResourceCache() {
97+
void skipsEventPropagation() {
9898
when(temporaryResourceCacheMock.getResourceFromCache(any()))
9999
.thenReturn(Optional.of(testDeployment()));
100100

101+
when(temporaryResourceCacheMock.canSkipEvent(any(), any())).thenReturn(true);
102+
101103
informerEventSource.onAdd(testDeployment());
102104
informerEventSource.onUpdate(testDeployment(), testDeployment());
103105

0 commit comments

Comments
 (0)