diff --git a/scheduling/cpu_scheduling_interface.py b/scheduling/cpu_scheduling_interface.py new file mode 100644 index 000000000000..fb51194125f7 --- /dev/null +++ b/scheduling/cpu_scheduling_interface.py @@ -0,0 +1,366 @@ +import copy +import threading +import time +import tkinter as tk +from tkinter import messagebox, ttk + +import matplotlib.pyplot as plt +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg + + +# ===================== Scheduler Engine ===================== # +class SchedulerEngine: + def __init__(self, processes, algorithm, quantum: int = 2): + self.original = copy.deepcopy(processes) + self.processes = [p.copy() for p in processes] + self.algorithm = algorithm + self.quantum = quantum + for p in self.processes: + p["remaining"] = p["burst"] + self.timeline: list[tuple[int, str]] = [] # [(time, pid)] + self.stats: list[tuple] = [] + + def simulate(self): + algo = self.algorithm.lower() + if algo == "fcfs": + yield from self._simulate_fcfs() + elif algo == "sjf (non-preemptive)": + yield from self._simulate_sjf_np() + elif algo == "sjf (preemptive)": + yield from self._simulate_sjf_p() + elif algo == "priority (non-preemptive)": + yield from self._simulate_priority_np() + elif algo == "priority (preemptive)": + yield from self._simulate_priority_p() + elif algo == "round robin": + yield from self._simulate_rr() + self._calculate_stats() + + # first come first serve + def _simulate_fcfs(self): + t = 0 + processes = sorted(self.processes, key=lambda p: p["arrival"]) + for p in processes: + t = max(t, p["arrival"]) + for _ in range(p["burst"]): + self.timeline.append((t, p["pid"])) + yield (t, p["pid"], []) + t += 1 + p["completion"] = t + + # shortest job first non preemptive + def _simulate_sjf_np(self): + t = 0 + processes = sorted(self.processes, key=lambda p: p["arrival"]) + done = 0 + while done < len(processes): + ready = [ + p for p in processes if p["arrival"] <= t and "completion" not in p + ] + + if not ready: + t += 1 + yield (t, None, []) + continue + p = min(ready, key=lambda x: x["burst"]) + for _ in range(p["burst"]): + self.timeline.append((t, p["pid"])) + yield (t, p["pid"], []) + t += 1 + p["completion"] = t + done += 1 + + # shortest job first preemptive + def _simulate_sjf_p(self): + t = 0 + processes = sorted(self.processes, key=lambda p: p["arrival"]) + done = 0 + while done < len(processes): + ready = [p for p in processes if p["arrival"] <= t and p["remaining"] > 0] + if not ready: + t += 1 + yield (t, None, []) + continue + p = min(ready, key=lambda x: x["remaining"]) + self.timeline.append((t, p["pid"])) + yield (t, p["pid"], []) + p["remaining"] -= 1 + if p["remaining"] == 0: + p["completion"] = t + 1 + done += 1 + t += 1 + + # priority non preemptive + def _simulate_priority_np(self): + t = 0 + done = 0 + while done < len(self.processes): + ready = [ + p for p in self.processes if p["arrival"] <= t and "completion" not in p + ] + + if not ready: + t += 1 + yield (t, None, []) + continue + p = min(ready, key=lambda x: x["priority"]) + for _ in range(p["burst"]): + self.timeline.append((t, p["pid"])) + yield (t, p["pid"], []) + t += 1 + p["completion"] = t + done += 1 + + # priority preemptive + def _simulate_priority_p(self): + t = 0 + done = 0 + while done < len(self.processes): + ready = [ + p for p in self.processes if p["arrival"] <= t and p["remaining"] > 0 + ] + + if not ready: + t += 1 + yield (t, None, []) + continue + p = min(ready, key=lambda x: x["priority"]) + self.timeline.append((t, p["pid"])) + yield (t, p["pid"], []) + p["remaining"] -= 1 + if p["remaining"] == 0: + p["completion"] = t + 1 + done += 1 + t += 1 + + # round robin + def _simulate_rr(self): + t = 0 + q: list[dict] = [] + processes = sorted(self.processes, key=lambda p: p["arrival"]) + i = 0 + done = 0 + while done < len(processes): + while i < len(processes) and processes[i]["arrival"] <= t: + q.append(processes[i]) + i += 1 + if not q: + t += 1 + yield (t, None, []) + continue + p = q.pop(0) + burst = min(self.quantum, p["remaining"]) + for _ in range(burst): + self.timeline.append((t, p["pid"])) + yield (t, p["pid"], [x["pid"] for x in q]) + t += 1 + p["remaining"] -= 1 + while i < len(processes) and processes[i]["arrival"] <= t: + q.append(processes[i]) + i += 1 + if p["remaining"] > 0: + q.append(p) + else: + p["completion"] = t + done += 1 + + def _calculate_stats(self): + for p in self.processes: + pid = p["pid"] + arrival = p["arrival"] + burst = p["burst"] + completion = p["completion"] + first_exec = next((t for t, pid2 in self.timeline if pid2 == pid), arrival) + tat = completion - arrival + wt = tat - burst + rt = first_exec - arrival + self.stats.append((pid, arrival, burst, completion, tat, wt, rt)) + + +# Interface +class CPUSchedulerGUI: + def __init__(self, root): + self.root = root + self.root.title("CPU Scheduling Visualizer") + self.root.geometry("1000x700") + + self.processes: list[dict] = [] + self.setup_ui() + + def setup_ui(self): + top_frame = ttk.Frame(self.root) + top_frame.pack(pady=10) + + self.tree = ttk.Treeview( + top_frame, + columns=("pid", "arrival", "burst", "priority"), + show="headings", + ) + + for col in self.tree["columns"]: + self.tree.heading(col, text=col.capitalize()) + self.tree.pack(side="left") + + form = ttk.Frame(top_frame) + form.pack(side="left", padx=10) + ttk.Label(form, text="PID").grid(row=0, column=0) + ttk.Label(form, text="Arrival").grid(row=1, column=0) + ttk.Label(form, text="Burst").grid(row=2, column=0) + ttk.Label(form, text="Priority").grid(row=3, column=0) + self.pid_e = ttk.Entry(form) + self.arrival_e = ttk.Entry(form) + self.burst_e = ttk.Entry(form) + self.priority_e = ttk.Entry(form) + self.pid_e.grid(row=0, column=1) + self.arrival_e.grid(row=1, column=1) + self.burst_e.grid(row=2, column=1) + self.priority_e.grid(row=3, column=1) + ttk.Button(form, text="Add", command=self.add_process).grid( + row=4, column=0, pady=5 + ) + + ttk.Button(form, text="Delete", command=self.delete_process).grid( + row=4, column=1 + ) + + algo_frame = ttk.Frame(self.root) + algo_frame.pack(pady=10) + ttk.Label(algo_frame, text="Algorithm:").pack(side="left") + self.algo_cb = ttk.Combobox( + algo_frame, + values=[ + "FCFS", + "SJF (Non-Preemptive)", + "SJF (Preemptive)", + "Priority (Non-Preemptive)", + "Priority (Preemptive)", + "Round Robin", + ], + ) + self.algo_cb.current(0) + self.algo_cb.pack(side="left", padx=5) + ttk.Label(algo_frame, text="Quantum:").pack(side="left") + self.quantum_e = ttk.Entry(algo_frame, width=5) + self.quantum_e.insert(0, "2") + self.quantum_e.pack(side="left") + ttk.Button(algo_frame, text="Run", command=self.run_scheduling).pack( + side="left", padx=10 + ) + + self.ready_label = ttk.Label(self.root, text="Ready Queue:") + self.ready_list = tk.Listbox(self.root, height=3) + self.ready_label.pack_forget() + self.ready_list.pack_forget() + + self.figure, self.ax = plt.subplots(figsize=(8, 3)) + self.canvas = FigureCanvasTkAgg(self.figure, master=self.root) + self.canvas.get_tk_widget().pack() + + self.result_box = ttk.Treeview( + self.root, + columns=("pid", "arrival", "burst", "completion", "tat", "wt", "rt"), + show="headings", + height=6, + ) + + for col in self.result_box["columns"]: + self.result_box.heading(col, text=col.upper()) + self.result_box.pack(pady=10) + + self.avg_label = ttk.Label(self.root, text="", font=("Arial", 11, "bold")) + self.avg_label.pack() + + def add_process(self): + try: + pid = self.pid_e.get() + arrival = int(self.arrival_e.get()) + burst = int(self.burst_e.get()) + priority = int(self.priority_e.get() or 0) + self.processes.append( + { + "pid": pid, + "arrival": arrival, + "burst": burst, + "priority": priority, + } + ) + self.tree.insert("", "end", values=(pid, arrival, burst, priority)) + except ValueError: + messagebox.showerror("Error", "Invalid input") + + def delete_process(self): + if sel := self.tree.selection(): + pid = self.tree.item(sel[0])["values"][0] + self.processes = [p for p in self.processes if p["pid"] != pid] + self.tree.delete(sel[0]) + + def run_scheduling(self): + algo = self.algo_cb.get() + quantum = int(self.quantum_e.get() or 2) + if algo.lower() == "round robin": + self.ready_label.pack() + self.ready_list.pack() + else: + self.ready_label.pack_forget() + self.ready_list.pack_forget() + + self.engine = SchedulerEngine(self.processes, algo, quantum) + threading.Thread(target=self.animate, daemon=True).start() + + def animate(self): + self.ax.clear() + x, colors, current_pid = 0, {}, None + for step in self.engine.simulate(): + _, pid, rq = step + if pid: + if pid != current_pid: + self.ax.axvline(x, color="black", linewidth=0.8) + current_pid = pid + colors.setdefault(pid, plt.cm.tab20(len(colors) % 20)) + self.ax.barh(0, 1, left=x, color=colors[pid]) + self.ax.text( + x + 0.5, + 0, + pid, + ha="center", + va="center", + color="white", + fontsize=9, + ) + + x += 1 + self.ax.set_xticks(range(x + 1)) + self.ax.set_yticks([]) + self.ax.set_xlabel("Time") + self.canvas.draw() + if rq: + self.ready_list.delete(0, tk.END) + for pid_r in rq: + self.ready_list.insert(tk.END, pid_r) + time.sleep(0.3) + self.show_results() + + def show_results(self): + for item in self.result_box.get_children(): + self.result_box.delete(item) + total_wt = total_tat = total_rt = 0 + for row in self.engine.stats: + self.result_box.insert("", "end", values=row) + total_wt += row[5] + total_tat += row[4] + total_rt += row[6] + n = len(self.engine.stats) or 1 + self.avg_label.config( + text=( + f"AVG WT = {total_wt / n:.2f} | " + f"AVG TAT = {total_tat / n:.2f} | " + f"AVG RT = {total_rt / n:.2f}" + ) + ) + + +# main call +if __name__ == "__main__": + root = tk.Tk() + CPUSchedulerGUI(root) + root.mainloop() diff --git a/scheduling/cpuschedulinginterface.md b/scheduling/cpuschedulinginterface.md new file mode 100644 index 000000000000..5ab91f56c9c3 --- /dev/null +++ b/scheduling/cpuschedulinginterface.md @@ -0,0 +1,118 @@ +