11package io .javaoperatorsdk .operator .processing .event ;
22
3- import java .util .*;
3+ import java .util .Collections ;
4+ import java .util .HashMap ;
5+ import java .util .List ;
6+ import java .util .Map ;
7+ import java .util .Objects ;
48import java .util .concurrent .ConcurrentNavigableMap ;
59import java .util .concurrent .ConcurrentSkipListMap ;
610import java .util .stream .Collectors ;
@@ -20,15 +24,14 @@ class EventSources<R extends HasMetadata> {
2024 public static final String RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME =
2125 "RetryAndRescheduleTimerEventSource" ;
2226
23- private final ConcurrentNavigableMap <String , Map <String , EventSource >> sources =
27+ private final ConcurrentNavigableMap <String , Map <String , NamedEventSource >> sources =
2428 new ConcurrentSkipListMap <>();
2529 private final TimerEventSource <R > retryAndRescheduleTimerEventSource = new TimerEventSource <>();
2630 private ControllerResourceEventSource <R > controllerResourceEventSource ;
2731
2832
29- ControllerResourceEventSource < R > initControllerEventSource (Controller <R > controller ) {
33+ void initControllerEventSource (Controller <R > controller ) {
3034 controllerResourceEventSource = new ControllerResourceEventSource <>(controller );
31- return controllerResourceEventSource ;
3235 }
3336
3437 ControllerResourceEventSource <R > controllerResourceEventSource () {
@@ -49,7 +52,7 @@ public Stream<NamedEventSource> additionalNamedEventSources() {
4952 Stream <EventSource > additionalEventSources () {
5053 return Stream .concat (
5154 Stream .of (retryEventSource ()).filter (Objects ::nonNull ),
52- sources . values ().stream (). flatMap ( c -> c . values (). stream () ));
55+ flatMappedSources ().map ( NamedEventSource :: original ));
5356 }
5457
5558 NamedEventSource namedControllerResourceEventSource () {
@@ -58,29 +61,32 @@ NamedEventSource namedControllerResourceEventSource() {
5861 }
5962
6063 Stream <NamedEventSource > flatMappedSources () {
61- return sources .values ().stream ().flatMap (c -> c .entrySet ().stream ()
62- .map (esEntry -> new NamedEventSource (esEntry .getValue (), esEntry .getKey ())));
64+ return sources .values ().stream ().flatMap (c -> c .values ().stream ());
6365 }
6466
6567 public void clear () {
6668 sources .clear ();
6769 }
6870
69- public boolean contains (String name , EventSource source ) {
71+ private NamedEventSource existing (String name , EventSource source ) {
7072 final var eventSources = sources .get (keyFor (source ));
7173 if (eventSources == null || eventSources .isEmpty ()) {
72- return false ;
74+ return null ;
7375 }
74- return eventSources .containsKey (name );
76+ return eventSources .get (name );
7577 }
7678
77- public void add (String name , EventSource eventSource ) {
78- if (contains (name , eventSource )) {
79- throw new IllegalArgumentException ("An event source is already registered for the "
80- + keyAsString (getResourceType (eventSource ), name )
79+ public void add (NamedEventSource eventSource ) {
80+ final var name = eventSource .name ();
81+ final var original = eventSource .original ();
82+ final var existing = existing (name , original );
83+ if (existing != null && !eventSource .equals (existing )) {
84+ throw new IllegalArgumentException ("Event source " + existing .original ()
85+ + " is already registered for the "
86+ + keyAsString (getResourceType (original ), name )
8187 + " class/name combination" );
8288 }
83- sources .computeIfAbsent (keyFor (eventSource ), k -> new HashMap <>()).put (name , eventSource );
89+ sources .computeIfAbsent (keyFor (original ), k -> new HashMap <>()).put (name , eventSource );
8490 }
8591
8692 @ SuppressWarnings ("rawtypes" )
@@ -91,6 +97,10 @@ private Class<?> getResourceType(EventSource source) {
9197 }
9298
9399 private String keyFor (EventSource source ) {
100+ if (source instanceof NamedEventSource ) {
101+ source = ((NamedEventSource ) source ).original ();
102+ }
103+
94104 return keyFor (getResourceType (source ));
95105 }
96106
@@ -100,16 +110,20 @@ private String keyFor(Class<?> dependentType) {
100110
101111 @ SuppressWarnings ("unchecked" )
102112 public <S > ResourceEventSource <S , R > get (Class <S > dependentType , String name ) {
113+ if (dependentType == null ) {
114+ throw new IllegalArgumentException ("Must pass a dependent type to retrieve event sources" );
115+ }
116+
103117 final var sourcesForType = sources .get (keyFor (dependentType ));
104118 if (sourcesForType == null || sourcesForType .isEmpty ()) {
105119 throw new IllegalArgumentException (
106120 "There is no event source found for class:" + dependentType .getName ());
107121 }
108122
109123 final var size = sourcesForType .size ();
110- final EventSource source ;
124+ NamedEventSource source ;
111125 if (size == 1 && name == null ) {
112- source = sourcesForType .values ().stream ().findFirst ().orElse ( null );
126+ source = sourcesForType .values ().stream ().findFirst ().orElseThrow ( );
113127 } else {
114128 if (name == null || name .isBlank ()) {
115129 throw new IllegalArgumentException ("There are multiple EventSources registered for type "
@@ -125,15 +139,16 @@ public <S> ResourceEventSource<S, R> get(Class<S> dependentType, String name) {
125139 }
126140 }
127141
128- if (!(source instanceof ResourceEventSource )) {
142+ EventSource original = source .original ();
143+ if (!(original instanceof ResourceEventSource )) {
129144 throw new IllegalArgumentException (source + " associated with "
130145 + keyAsString (dependentType , name ) + " is not a "
131146 + ResourceEventSource .class .getSimpleName ());
132147 }
133- final var res = (ResourceEventSource <S , R >) source ;
148+ final var res = (ResourceEventSource <S , R >) original ;
134149 final var resourceClass = res .resourceType ();
135150 if (!resourceClass .isAssignableFrom (dependentType )) {
136- throw new IllegalArgumentException (source + " associated with "
151+ throw new IllegalArgumentException (original + " associated with "
137152 + keyAsString (dependentType , name )
138153 + " is handling " + resourceClass .getName () + " resources but asked for "
139154 + dependentType .getName ());
@@ -151,7 +166,12 @@ private String keyAsString(Class dependentType, String name) {
151166 @ SuppressWarnings ("unchecked" )
152167 public <S > List <ResourceEventSource <S , R >> getEventSources (Class <S > dependentType ) {
153168 final var sourcesForType = sources .get (keyFor (dependentType ));
169+ if (sourcesForType == null ) {
170+ return Collections .emptyList ();
171+ }
172+
154173 return sourcesForType .values ().stream ()
174+ .map (NamedEventSource ::original )
155175 .filter (ResourceEventSource .class ::isInstance )
156176 .map (es -> (ResourceEventSource <S , R >) es )
157177 .collect (Collectors .toList ());
0 commit comments