Skip to content

Commit 4ade908

Browse files
authored
Merge pull request #244 from graphql-java/update-doco-and-reactor-new-code
Updated the documentation of the CacheMap and also some refactoring
2 parents aafdc1e + 9671d60 commit 4ade908

File tree

4 files changed

+68
-29
lines changed

4 files changed

+68
-29
lines changed

src/main/java/org/dataloader/CacheMap.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828
* CacheMap is used by data loaders that use caching promises to values aka {@link CompletableFuture}<V>. A better name for this
2929
* class might have been FutureCache but that is history now.
3030
* <p>
31-
* The default implementation used by the data loader is based on a {@link java.util.LinkedHashMap}.
31+
* The default implementation used by the data loader is based on a {@link java.util.concurrent.ConcurrentHashMap} because
32+
* the data loader code requires the cache to prove atomic writes especially the {@link #putIfAbsentAtomically(Object, CompletableFuture)}
33+
* method.
34+
* <p>
35+
* The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your
36+
* own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is
37+
* returned always.
3238
* <p>
3339
* This is really a cache of completed {@link CompletableFuture}&lt;V&gt; values in memory. It is used, when caching is enabled, to
3440
* give back the same future to any code that may call it. If you need a cache of the underlying values that is possible external to the JVM
@@ -42,7 +48,7 @@
4248
*/
4349
@PublicSpi
4450
@NullMarked
45-
public interface CacheMap<K, V> {
51+
public interface CacheMap<K,V> {
4652

4753
/**
4854
* Creates a new cache map, using the default implementation that is based on a {@link java.util.LinkedHashMap}.
@@ -84,14 +90,21 @@ static <K, V> CacheMap<K, V> simpleMap() {
8490
Collection<CompletableFuture<V>> getAll();
8591

8692
/**
87-
* Creates a new cache map entry with the specified key and value, or updates the value if the key already exists.
93+
* Atomically creates a new cache map entry with the specified key and value, or updates the value if the key already exists.
94+
* <p>
95+
* The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your
96+
* own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is
97+
* returned always.
98+
* <p>
99+
* The default implementation of this method uses {@link java.util.concurrent.ConcurrentHashMap} has its implementation so these CAS
100+
* writes work as expected.
88101
*
89102
* @param key the key to cache
90103
* @param value the value to cache
91104
*
92-
* @return the cache map for fluent coding
105+
* @return atomically the previous value for the key or null if the value is not present.
93106
*/
94-
CompletableFuture<V> putIfAbsentAtomically(K key, CompletableFuture<V> value);
107+
@Nullable CompletableFuture<V> putIfAbsentAtomically(K key, CompletableFuture<V> value);
95108

96109
/**
97110
* Deletes the entry with the specified key from the cache map, if it exists.
@@ -114,7 +127,7 @@ static <K, V> CacheMap<K, V> simpleMap() {
114127
* and intended for testing and debugging.
115128
* If a cache doesn't support it, it can throw an Exception.
116129
*
117-
* @return
130+
* @return the size of the cache
118131
*/
119132
int size();
120133
}

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
@Internal
5050
class DataLoaderHelper<K, V> {
5151

52-
static class LoaderQueueEntry<K, V> {
52+
private static class LoaderQueueEntry<K, V> {
5353

5454
final K key;
5555
final CompletableFuture<V> value;
@@ -155,11 +155,8 @@ CompletableFuture<V> load(K key, Object loadContext) {
155155
try {
156156
CompletableFuture<V> cachedFuture = futureCache.get(cacheKey);
157157
if (cachedFuture != null) {
158-
// We already have a promise for this key, no need to check value cache or queue up load
159-
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
160-
ctx.onDispatched();
161-
cachedFuture.whenComplete(ctx::onCompleted);
162-
return cachedFuture;
158+
// We already have a promise for this key, no need to check value cache or queue this load
159+
return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture);
163160
}
164161
} catch (Exception ignored) {
165162
}
@@ -170,11 +167,8 @@ CompletableFuture<V> load(K key, Object loadContext) {
170167
if (futureCachingEnabled) {
171168
CompletableFuture<V> cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture);
172169
if (cachedFuture != null) {
173-
// another thread was faster and created a matching CF ... hence this is really a cachehit and we are done
174-
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
175-
ctx.onDispatched();
176-
cachedFuture.whenComplete(ctx::onCompleted);
177-
return cachedFuture;
170+
// another thread was faster and created a matching CF ... hence this is really a cache hit and we are done
171+
return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture);
178172
}
179173
}
180174
addEntryToLoaderQueue(key, loadCallFuture, loadContext);
@@ -186,12 +180,9 @@ CompletableFuture<V> load(K key, Object loadContext) {
186180
CompletableFuture<V> cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture);
187181
if (cachedFuture != null) {
188182
// another thread was faster and the loader was invoked twice with the same key
189-
// we are disregarding the resul of our dispatch call and use the already cached value
183+
// we are disregarding the result of our dispatch call and use the already cached value
190184
// meaning this is a cache hit and we are done
191-
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
192-
ctx.onDispatched();
193-
cachedFuture.whenComplete(ctx::onCompleted);
194-
return cachedFuture;
185+
return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture);
195186
}
196187
}
197188
}
@@ -201,6 +192,13 @@ CompletableFuture<V> load(K key, Object loadContext) {
201192
return loadCallFuture;
202193
}
203194

195+
private CompletableFuture<V> incrementCacheHitAndReturnCF(DataLoaderInstrumentationContext<Object> ctx, K key, Object loadContext, CompletableFuture<V> cachedFuture) {
196+
stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext));
197+
ctx.onDispatched();
198+
cachedFuture.whenComplete(ctx::onCompleted);
199+
return cachedFuture;
200+
}
201+
204202
private void addEntryToLoaderQueue(K key, CompletableFuture<V> future, Object loadContext) {
205203
while (true) {
206204
LoaderQueueEntry<K, V> prev = loaderQueue.get();
@@ -223,6 +221,7 @@ Object getCacheKeyWithContext(K key, Object context) {
223221
loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key;
224222
}
225223

224+
@SuppressWarnings("unchecked")
226225
DispatchResult<V> dispatch() {
227226
DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));
228227

@@ -232,6 +231,8 @@ DispatchResult<V> dispatch() {
232231
while (true) {
233232
loaderQueueEntryHead = loaderQueue.get();
234233
if (loaderQueue.compareAndSet(loaderQueueEntryHead, null)) {
234+
// one or more threads competed and this one won. This thread holds
235+
// the loader queue root in the local variable loaderQueueEntryHead
235236
break;
236237
}
237238
}

src/main/java/org/dataloader/impl/DefaultCacheMap.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.dataloader.CacheMap;
2020
import org.dataloader.annotations.Internal;
21+
import org.jspecify.annotations.NullMarked;
22+
import org.jspecify.annotations.Nullable;
2123

2224
import java.util.Collection;
2325
import java.util.concurrent.CompletableFuture;
@@ -32,6 +34,7 @@
3234
* @author <a href="https://github.com/aschrijver/">Arnold Schrijver</a>
3335
*/
3436
@Internal
37+
@NullMarked
3538
public class DefaultCacheMap<K, V> implements CacheMap<K, V> {
3639

3740
private final ConcurrentHashMap<K, CompletableFuture<V>> cache;
@@ -56,7 +59,7 @@ public boolean containsKey(K key) {
5659
* {@inheritDoc}
5760
*/
5861
@Override
59-
public CompletableFuture<V> get(K key) {
62+
public @Nullable CompletableFuture<V> get(K key) {
6063
return cache.get(key);
6164
}
6265

@@ -72,7 +75,7 @@ public Collection<CompletableFuture<V>> getAll() {
7275
* {@inheritDoc}
7376
*/
7477
@Override
75-
public CompletableFuture<V> putIfAbsentAtomically(K key, CompletableFuture<V> value) {
78+
public @Nullable CompletableFuture<V> putIfAbsentAtomically(K key, CompletableFuture<V> value) {
7679
return cache.putIfAbsent(key, value);
7780
}
7881

src/test/java/org/dataloader/DataLoaderCacheMapTest.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
/**
1515
* Tests for cacheMap functionality..
1616
*/
17+
@SuppressWarnings("NullableProblems")
1718
public class DataLoaderCacheMapTest {
1819

1920
private <T> BatchLoader<T, T> keysAsValues() {
@@ -24,12 +25,33 @@ private <T> BatchLoader<T, T> keysAsValues() {
2425
public void should_provide_all_futures_from_cache() {
2526
DataLoader<Integer, Integer> dataLoader = newDataLoader(keysAsValues());
2627

27-
dataLoader.load(1);
28-
dataLoader.load(2);
29-
dataLoader.load(1);
28+
CompletableFuture<Integer> cf1 = dataLoader.load(1);
29+
CompletableFuture<Integer> cf2 = dataLoader.load(2);
30+
CompletableFuture<Integer> cf3 = dataLoader.load(3);
31+
32+
CacheMap<Object, Integer> cacheMap = dataLoader.getCacheMap();
33+
Collection<CompletableFuture<Integer>> futures = cacheMap.getAll();
34+
assertThat(futures.size(), equalTo(3));
35+
36+
37+
assertThat(cacheMap.get(1), equalTo(cf1));
38+
assertThat(cacheMap.get(2), equalTo(cf2));
39+
assertThat(cacheMap.get(3), equalTo(cf3));
40+
assertThat(cacheMap.containsKey(1), equalTo(true));
41+
assertThat(cacheMap.containsKey(2), equalTo(true));
42+
assertThat(cacheMap.containsKey(3), equalTo(true));
43+
assertThat(cacheMap.containsKey(4), equalTo(false));
44+
45+
cacheMap.delete(1);
46+
assertThat(cacheMap.containsKey(1), equalTo(false));
47+
assertThat(cacheMap.containsKey(2), equalTo(true));
48+
49+
cacheMap.clear();
50+
assertThat(cacheMap.containsKey(1), equalTo(false));
51+
assertThat(cacheMap.containsKey(2), equalTo(false));
52+
assertThat(cacheMap.containsKey(3), equalTo(false));
53+
assertThat(cacheMap.containsKey(4), equalTo(false));
3054

31-
Collection<CompletableFuture<Integer>> futures = dataLoader.getCacheMap().getAll();
32-
assertThat(futures.size(), equalTo(2));
3355
}
3456

3557
@Test

0 commit comments

Comments
 (0)