Files
purposed_apps/Robocopy_GUI/robocopy_mirror_gui.py
2025-05-13 18:05:04 +08:00

220 lines
9.0 KiB
Python

import tkinter as tk
from tkinter import ttk, filedialog, messagebox, scrolledtext
import subprocess
import threading
import platform
import queue # For thread-safe communication
# --- Configuration ---
# Robocopy Flags for Mirroring (/MIR)
# /MIR :: MIRror a directory tree (equivalent to /E plus /PURGE).
# /Z :: copy files in restartable mode.
# /R:3 :: number of Retries on failed copies (default 1 million). Set to 3.
# /W:5 :: Wait time between retries (default 30 seconds). Set to 5.
# /NP :: No Progress - don't display % copied. Makes output cleaner.
# /TEE :: output to console window, as well as the log file. Essential for GUI.
# /V :: Verbose output, showing skipped and *deleted* files. Recommended for /MIR.
# Add other flags if needed, but /MIR is the core for this function.
MIRROR_ROBOCOPY_FLAGS = ["/MIR", "/Z", "/R:3", "/W:5", "/NP", "/TEE", "/V"]
# --- Main Application Class ---
class RobocopyMirrorApp: # Renamed class for clarity
def __init__(self, root):
self.root = root
# Updated Title
self.root.title("Robocopy Mirror GUI")
self.root.geometry("700x500") # Adjust size as needed
# Check OS
if platform.system() != "Windows":
messagebox.showerror("OS Error", "Robocopy is a Windows utility. This app only works on Windows.")
self.root.destroy()
return
self.source_var = tk.StringVar()
self.target_var = tk.StringVar()
self.output_queue = queue.Queue() # Queue for thread-safe GUI updates
self.process = None # To hold the running subprocess
self.is_running = False
# --- GUI Elements ---
# Frame for inputs
input_frame = ttk.Frame(root, padding="10")
input_frame.pack(fill=tk.X)
# Source Row
ttk.Label(input_frame, text="Source:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
self.source_entry = ttk.Entry(input_frame, textvariable=self.source_var, width=50)
self.source_entry.grid(row=0, column=1, padx=5, pady=5, sticky=tk.EW)
self.source_button = ttk.Button(input_frame, text="Browse...", command=self.browse_source)
self.source_button.grid(row=0, column=2, padx=5, pady=5)
# Target Row
ttk.Label(input_frame, text="Target:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
self.target_entry = ttk.Entry(input_frame, textvariable=self.target_var, width=50)
self.target_entry.grid(row=1, column=1, padx=5, pady=5, sticky=tk.EW)
self.target_button = ttk.Button(input_frame, text="Browse...", command=self.browse_target)
self.target_button.grid(row=1, column=2, padx=5, pady=5)
# Configure input frame column weights for resizing
input_frame.columnconfigure(1, weight=1)
# Run Button (Text could be changed to "Run Mirror" but "Run Robocopy" is fine)
self.run_button = ttk.Button(root, text="Run Robocopy Mirror", command=self.start_robocopy_thread)
self.run_button.pack(pady=10)
# Output Area Frame
output_frame = ttk.Frame(root, padding="10")
output_frame.pack(fill=tk.BOTH, expand=True)
ttk.Label(output_frame, text="Robocopy Output:").pack(anchor=tk.W)
self.output_text = scrolledtext.ScrolledText(output_frame, wrap=tk.WORD, height=15, state='disabled')
self.output_text.pack(fill=tk.BOTH, expand=True, pady=5)
# --- Start queue check ---
self.check_queue() # Start checking the queue for updates
def browse_source(self):
directory = filedialog.askdirectory(title="Select Source Directory")
if directory:
self.source_var.set(directory)
def browse_target(self):
directory = filedialog.askdirectory(title="Select Target Directory (will be mirrored)")
if directory:
self.target_var.set(directory)
def start_robocopy_thread(self):
"""Starts the robocopy mirror process in a separate thread."""
if self.is_running:
messagebox.showwarning("Busy", "Robocopy is already running.")
return
source = self.source_var.get().strip()
target = self.target_var.get().strip()
if not source or not target:
messagebox.showerror("Error", "Please select both Source and Target directories.")
return
# Confirmation for Mirroring (IMPORTANT!)
confirm = messagebox.askyesno(
"Confirm Mirror",
f"WARNING: Mirroring will delete files/folders in the TARGET directory:\n\n{target}\n\n if they do not exist in the SOURCE directory:\n\n{source}\n\n Are you sure you want to proceed?"
)
if not confirm:
return
# Disable run button, enable text area
self.run_button.config(state='disabled')
self.output_text.config(state='normal')
self.output_text.delete('1.0', tk.END)
self.add_output("Starting Robocopy Mirror operation...\n")
self.add_output(f"Source: {source}\n")
self.add_output(f"Target: {target}\n")
# Display the specific flags being used
self.add_output(f"Flags: {' '.join(MIRROR_ROBOCOPY_FLAGS)}\n")
self.add_output("-" * 30 + "\n")
self.is_running = True
# Create and start the thread
self.thread = threading.Thread(target=self.run_robocopy_process, args=(source, target), daemon=True)
self.thread.start()
def run_robocopy_process(self, source, target):
"""The actual function running in the thread."""
try:
# Use the MIRROR_ROBOCOPY_FLAGS
command = ["robocopy", source, target] + MIRROR_ROBOCOPY_FLAGS
self.process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Redirect stderr to stdout
text=True,
encoding='utf-8',
errors='replace',
bufsize=1,
creationflags=subprocess.CREATE_NO_WINDOW # Prevent cmd window popup
)
# Read output line by line
for line in iter(self.process.stdout.readline, ''):
if not line:
break
self.output_queue.put(line)
self.process.stdout.close()
return_code = self.process.wait()
# Send completion message based on return code
self.output_queue.put("\n" + "-" * 30 + "\n")
# Robocopy success codes (0-7) indicate completion, possibly with minor issues
# like skipped files, but the overall operation is considered successful.
# Codes 8 and higher indicate failures or significant issues.
if return_code < 8:
self.output_queue.put(f"Robocopy Mirror finished successfully (Return Code: {return_code}).\n")
else:
self.output_queue.put(f"Robocopy Mirror finished with errors (Return Code: {return_code}).\n")
except FileNotFoundError:
self.output_queue.put("\nERROR: 'robocopy' command not found. Is it in your system's PATH?\n")
except Exception as e:
self.output_queue.put(f"\nAn error occurred: {e}\n")
finally:
# Signal completion to the main thread
self.output_queue.put(None) # Sentinel value
def check_queue(self):
"""Checks the queue for messages from the worker thread."""
try:
while True:
line = self.output_queue.get_nowait()
if line is None:
self.on_robocopy_complete()
break
else:
self.add_output(line)
except queue.Empty:
pass
finally:
self.root.after(100, self.check_queue)
def add_output(self, text):
"""Appends text to the output area."""
self.output_text.insert(tk.END, text)
self.output_text.see(tk.END)
def on_robocopy_complete(self):
"""Called when the robocopy process finishes."""
self.run_button.config(state='normal')
self.output_text.config(state='disabled')
self.process = None
self.is_running = False
messagebox.showinfo("Finished", "Robocopy Mirror process has completed.")
def on_closing(self):
"""Handle window closing event."""
if self.is_running and self.process:
if messagebox.askyesno("Quit", "Robocopy might still be running. Terminate it and exit?"):
try:
self.process.terminate()
self.process.wait(timeout=2)
except subprocess.TimeoutExpired:
self.process.kill()
except Exception:
pass
self.root.destroy()
else:
return
else:
self.root.destroy()
# --- Main Execution ---
if __name__ == "__main__":
root = tk.Tk()
# Use the renamed class
app = RobocopyMirrorApp(root)
root.protocol("WM_DELETE_WINDOW", app.on_closing)
root.mainloop()