Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL.
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
Expand All @@ -22,95 +22,117 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.iluwatar.activeobject;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** ActiveCreature class is the base of the active object example. */
/**
* 🧠 The ActiveCreature class represents an "Active Object" pattern.
* Each creature has its own thread and request queue.
* Instead of performing actions directly, we submit them as Runnables to its queue.
* This helps separate method execution from method invocation (asynchronous behavior).
*/
public abstract class ActiveCreature {

// Logger to print activity messages
private static final Logger logger = LoggerFactory.getLogger(ActiveCreature.class.getName());

private BlockingQueue<Runnable> requests;
// Queue to store tasks (Runnables) that the creature will execute
private final BlockingQueue<Runnable> requests;

private String name;
// Name of the creature
private final String name;

private Thread thread; // Thread of execution.
// Thread on which this creature executes its actions
private final Thread thread;

private int status; // status of the thread of execution.
// Status of the thread (0 = OK, non-zero = error/interrupted)
private int status;

/** Constructor and initialization. */
/**
* 🏗️ Constructor initializes creature name, status, and starts its own thread.
* The thread continuously takes Runnables from the queue and executes them.
*/
protected ActiveCreature(String name) {
this.name = name;
this.status = 0;
this.requests = new LinkedBlockingQueue<>();
thread =
new Thread(
() -> {
boolean infinite = true;
while (infinite) {
try {
requests.take().run();
} catch (InterruptedException e) {
if (this.status != 0) {
logger.error("Thread was interrupted. --> {}", e.getMessage());
}
infinite = false;
Thread.currentThread().interrupt();
}
}
});

// Creating and starting a new thread for this creature
thread = new Thread(() -> {
boolean running = true;
while (running) {
try {
// Take next task from the queue and execute it
requests.take().run();
} catch (InterruptedException e) {
// If thread interrupted, log and stop the loop
if (this.status != 0) {
logger.error("Thread was interrupted. --> {}", e.getMessage());
}
running = false;
Thread.currentThread().interrupt();
}
}
});

// Start the creature's background thread
thread.start();
}

/**
* Eats the porridge.
*
* @throws InterruptedException due to firing a new Runnable.
* 🍲 The creature eats asynchronously.
* Instead of executing immediately, a Runnable is added to the queue.
*/
public void eat() throws InterruptedException {
requests.put(
() -> {
logger.info("{} is eating!", name());
logger.info("{} has finished eating!", name());
});
requests.put(() -> {
logger.info("{} is eating!", name());
try {
Thread.sleep(1000); // simulate eating delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
logger.info("{} has finished eating!", name());
});
}

/**
* Roam the wastelands.
*
* @throws InterruptedException due to firing a new Runnable.
* 🚶 The creature roams asynchronously.
*/
public void roam() throws InterruptedException {
requests.put(() -> logger.info("{} has started to roam in the wastelands.", name()));
requests.put(() -> {
logger.info("{} has started to roam in the wastelands.", name());
try {
Thread.sleep(1500); // simulate roaming delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
logger.info("{} has stopped roaming.", name());
});
}

/**
* Returns the name of the creature.
*
* @return the name of the creature.
* 📛 Returns the name of the creature.
*/
public String name() {
return this.name;
}

/**
* Kills the thread of execution.
*
* @param status of the thread of execution. 0 == OK, the rest is logging an error.
* 💀 Kills the thread of execution.
* @param status 0 = OK, other values indicate errors or manual stop.
*/
public void kill(int status) {
this.status = status;
this.thread.interrupt();
this.thread.interrupt(); // stops the creature's thread
}

/**
* Returns the status of the thread of execution.
*
* @return the status of the thread of execution.
* 📊 Returns the status of the creature's thread.
*/
public int getStatus() {
return this.status;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
* This project is licensed under the MIT license.
* Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
*
* The MIT License
* Copyright © 2014-2022 Ilkka Seppälä
Expand All @@ -16,8 +17,8 @@
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
Expand All @@ -28,10 +29,16 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.extern.slf4j.Slf4j;

/** Abstract class of all the instance implementation classes. */
/**
* Abstract base class for all instance implementations in the leader election system.
*
* Each instance runs on its own thread and processes incoming messages asynchronously.
* This version fixes the busy loop problem by adding a short sleep when no messages are present.
*/
@Slf4j
public abstract class AbstractInstance implements Instance, Runnable {

// Interval between heartbeats in milliseconds.
protected static final int HEARTBEAT_INTERVAL = 5000;
private static final String INSTANCE = "Instance ";

Expand All @@ -41,7 +48,13 @@ public abstract class AbstractInstance implements Instance, Runnable {
protected int leaderId;
protected boolean alive;

/** Constructor of BullyInstance. */
/**
* Constructor initializing the instance.
*
* @param messageManager manager to send/receive messages
* @param localId ID of this instance
* @param leaderId current leader ID
*/
public AbstractInstance(MessageManager messageManager, int localId, int leaderId) {
this.messageManager = messageManager;
this.messageQueue = new ConcurrentLinkedQueue<>();
Expand All @@ -50,20 +63,31 @@ public AbstractInstance(MessageManager messageManager, int localId, int leaderId
this.alive = true;
}

/** The instance will execute the message in its message queue periodically once it is alive. */
/**
* Thread run loop — continuously processes messages while instance is alive.
*
* 🟢 FIXED: Added small sleep when queue is empty to avoid busy looping.
*/
@Override
@SuppressWarnings("squid:S2189")
public void run() {
while (true) {
while (alive) {
if (!this.messageQueue.isEmpty()) {
this.processMessage(this.messageQueue.remove());
processMessage(this.messageQueue.poll());
} else {
try {
Thread.sleep(100); // 🔸 Prevents busy loop CPU overuse
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(INSTANCE + localId + " thread interrupted.");
break;
}
}
}
LOGGER.info(INSTANCE + localId + " stopped running.");
}

/**
* Once messages are sent to the certain instance, it will firstly be added to the queue and wait
* to be executed.
* Add a new message to the queue to be processed.
*
* @param message Message sent by other instances
*/
Expand All @@ -72,74 +96,58 @@ public void onMessage(Message message) {
messageQueue.offer(message);
}

/**
* Check if the instance is alive or not.
*
* @return {@code true} if the instance is alive.
*/
/** Check if this instance is alive. */
@Override
public boolean isAlive() {
return alive;
}

/**
* Set the health status of the certain instance.
*
* @param alive {@code true} for alive.
*/
/** Update the alive status of this instance. */
@Override
public void setAlive(boolean alive) {
this.alive = alive;
}

/**
* Process the message according to its type.
* Process the given message according to its type.
*
* @param message Message polled from queue.
* @param message message to process
*/
private void processMessage(Message message) {
switch (message.getType()) {
case ELECTION -> {
LOGGER.info(INSTANCE + localId + " - Election Message handling...");
LOGGER.info("{}{} - Handling Election Message...", INSTANCE, localId);
handleElectionMessage(message);
}
case LEADER -> {
LOGGER.info(INSTANCE + localId + " - Leader Message handling...");
LOGGER.info("{}{} - Handling Leader Message...", INSTANCE, localId);
handleLeaderMessage(message);
}
case HEARTBEAT -> {
LOGGER.info(INSTANCE + localId + " - Heartbeat Message handling...");
LOGGER.info("{}{} - Handling Heartbeat Message...", INSTANCE, localId);
handleHeartbeatMessage(message);
}
case ELECTION_INVOKE -> {
LOGGER.info(INSTANCE + localId + " - Election Invoke Message handling...");
LOGGER.info("{}{} - Handling Election Invoke...", INSTANCE, localId);
handleElectionInvokeMessage();
}
case LEADER_INVOKE -> {
LOGGER.info(INSTANCE + localId + " - Leader Invoke Message handling...");
LOGGER.info("{}{} - Handling Leader Invoke...", INSTANCE, localId);
handleLeaderInvokeMessage();
}
case HEARTBEAT_INVOKE -> {
LOGGER.info(INSTANCE + localId + " - Heartbeat Invoke Message handling...");
LOGGER.info("{}{} - Handling Heartbeat Invoke...", INSTANCE, localId);
handleHeartbeatInvokeMessage();
}
default -> {}
default -> LOGGER.warn("{}{} - Unknown message type received.", INSTANCE, localId);
}
}

/**
* Abstract methods to handle different types of message. These methods need to be implemented in
* concrete instance class to implement corresponding leader-selection pattern.
*/
// Abstract methods for handling various message types — to be implemented by subclasses.
protected abstract void handleElectionMessage(Message message);

protected abstract void handleElectionInvokeMessage();

protected abstract void handleLeaderMessage(Message message);

protected abstract void handleLeaderInvokeMessage();

protected abstract void handleHeartbeatMessage(Message message);

protected abstract void handleHeartbeatInvokeMessage();
}
Loading