@@ -51,6 +51,7 @@ public void failedEvent(String uid, Event event) {}
5151 private final ExecutorService executor ;
5252 private final String controllerName ;
5353 private final ReentrantLock lock = new ReentrantLock ();
54+ private volatile boolean running ;
5455 private DefaultEventSourceManager <R > eventSourceManager ;
5556
5657 public DefaultEventHandler (ConfiguredController <R > controller ) {
@@ -67,6 +68,7 @@ public DefaultEventHandler(ConfiguredController<R> controller) {
6768
6869 private DefaultEventHandler (ExecutorService executor , String relatedControllerName ,
6970 EventDispatcher <R > eventDispatcher , Retry retry ) {
71+ this .running = true ;
7072 this .executor =
7173 executor == null
7274 ? new ScheduledThreadPoolExecutor (
@@ -75,27 +77,28 @@ private DefaultEventHandler(ExecutorService executor, String relatedControllerNa
7577 this .controllerName = relatedControllerName ;
7678 this .eventDispatcher = eventDispatcher ;
7779 this .retry = retry ;
78- eventBuffer = new EventBuffer ();
79- }
80-
81- public void setEventSourceManager (DefaultEventSourceManager <R > eventSourceManager ) {
82- this .eventSourceManager = eventSourceManager ;
80+ this .eventBuffer = new EventBuffer ();
8381 }
8482
8583 public static void setEventMonitor (EventMonitor monitor ) {
8684 DefaultEventHandler .monitor = monitor ;
8785 }
8886
89- public interface EventMonitor {
90- void processedEvent (String uid , Event event );
91-
92- void failedEvent (String uid , Event event );
87+ public void setEventSourceManager (DefaultEventSourceManager <R > eventSourceManager ) {
88+ this .eventSourceManager = eventSourceManager ;
9389 }
9490
9591 @ Override
9692 public void handleEvent (Event event ) {
93+
9794 try {
9895 lock .lock ();
96+
97+ if (!this .running ) {
98+ log .debug ("Skipping event: {} because the event handler is shutting down" , event );
99+ return ;
100+ }
101+
99102 log .debug ("Received event: {}" , event );
100103
101104 final Predicate <CustomResource > selector = event .getCustomResourcesSelector ();
@@ -109,6 +112,16 @@ public void handleEvent(Event event) {
109112 }
110113 }
111114
115+ @ Override
116+ public void close () {
117+ try {
118+ lock .lock ();
119+ this .running = false ;
120+ } finally {
121+ lock .unlock ();
122+ }
123+ }
124+
112125 private void executeBufferedEvents (String customResourceUid ) {
113126 boolean newEventForResourceId = eventBuffer .containsEvents (customResourceUid );
114127 boolean controllerUnderExecution = isControllerUnderExecution (customResourceUid );
@@ -143,6 +156,10 @@ void eventProcessingFinished(
143156 ExecutionScope <R > executionScope , PostExecutionControl <R > postExecutionControl ) {
144157 try {
145158 lock .lock ();
159+ if (!running ) {
160+ return ;
161+ }
162+
146163 log .debug (
147164 "Event processing finished. Scope: {}, PostExecutionControl: {}" ,
148165 executionScope ,
@@ -279,6 +296,12 @@ private void unsetUnderExecution(String customResourceUid) {
279296 underProcessing .remove (customResourceUid );
280297 }
281298
299+ public interface EventMonitor {
300+ void processedEvent (String uid , Event event );
301+
302+ void failedEvent (String uid , Event event );
303+ }
304+
282305 private class ControllerExecution implements Runnable {
283306 private final ExecutionScope <R > executionScope ;
284307
0 commit comments