248 lines
11 KiB
Python
248 lines
11 KiB
Python
import tkinter as tk
|
|
from tkinter import ttk, filedialog, messagebox, scrolledtext
|
|
import sv_ttk # Import the library
|
|
import subprocess
|
|
import threading
|
|
import platform
|
|
import queue # For thread-safe communication
|
|
import locale # For getting system's default encoding
|
|
|
|
# --- Configuration ---
|
|
# Robocopy Flags for Mirroring (/MIR)
|
|
# /MIR :: MIRror a directory tree (equivalent to /E plus /PURGE).
|
|
# /Z :: copy files in restartable mode.
|
|
# /R:3 :: number of Retries on failed copies (default 1 million). Set to 3.
|
|
# /W:5 :: Wait time between retries (default 30 seconds). Set to 5.
|
|
# /NP :: No Progress - don't display % copied. Makes output cleaner.
|
|
# /TEE :: output to console window, as well as the log file. Essential for GUI.
|
|
# /V :: Verbose output, showing skipped and *deleted* files. Recommended for /MIR.
|
|
# Add other flags if needed, but /MIR is the core for this function.
|
|
MIRROR_ROBOCOPY_FLAGS = ["/MIR", "/Z", "/R:3", "/W:5", "/NP", "/TEE", "/V"]
|
|
|
|
# --- Main Application Class ---
|
|
class RobocopyMirrorApp: # Renamed class for clarity
|
|
def __init__(self, root):
|
|
self.root = root
|
|
# Updated Title
|
|
self.root.title("Robocopy Mirror GUI")
|
|
self.root.geometry("700x550") # Initial size
|
|
self.root.minsize(700, 550) # Set minimum window size
|
|
|
|
# Check OS
|
|
if platform.system() != "Windows":
|
|
messagebox.showerror("OS Error", "Robocopy is a Windows utility. This app only works on Windows.")
|
|
self.root.destroy()
|
|
return
|
|
|
|
self.source_var = tk.StringVar()
|
|
self.target_var = tk.StringVar()
|
|
self.output_queue = queue.Queue() # Queue for thread-safe GUI updates
|
|
self.process = None # To hold the running subprocess
|
|
self.is_running = False
|
|
|
|
# --- GUI Elements ---
|
|
# Main content frame (to allow disclaimer to be packed at the bottom of root)
|
|
main_content_frame = ttk.Frame(root)
|
|
main_content_frame.pack(fill=tk.BOTH, expand=True)
|
|
|
|
|
|
# Frame for inputs
|
|
input_frame = ttk.Frame(main_content_frame, padding="15 15 15 10") # Increased padding
|
|
input_frame.pack(fill=tk.X, padx=10, pady=(10,0)) # Added horizontal padding for the frame itself
|
|
|
|
# Source Row
|
|
ttk.Label(input_frame, text="Source:").grid(row=0, column=0, padx=5, pady=10, sticky=tk.W) # Increased pady
|
|
self.source_entry = ttk.Entry(input_frame, textvariable=self.source_var, width=50)
|
|
self.source_entry.grid(row=0, column=1, padx=5, pady=10, sticky=tk.EW) # Increased pady
|
|
self.source_button = ttk.Button(input_frame, text="Browse...", command=self.browse_source)
|
|
self.source_button.grid(row=0, column=2, padx=5, pady=10) # Increased pady
|
|
|
|
# Target Row
|
|
ttk.Label(input_frame, text="Target:").grid(row=1, column=0, padx=5, pady=10, sticky=tk.W) # Increased pady
|
|
self.target_entry = ttk.Entry(input_frame, textvariable=self.target_var, width=50)
|
|
self.target_entry.grid(row=1, column=1, padx=5, pady=10, sticky=tk.EW) # Increased pady
|
|
self.target_button = ttk.Button(input_frame, text="Browse...", command=self.browse_target)
|
|
self.target_button.grid(row=1, column=2, padx=5, pady=10) # Increased pady
|
|
|
|
# Configure input frame column weights for resizing
|
|
input_frame.columnconfigure(1, weight=1)
|
|
|
|
# Run Button Frame for better padding control
|
|
run_button_frame = ttk.Frame(main_content_frame)
|
|
run_button_frame.pack(pady=(15, 10)) # Increased pady
|
|
|
|
self.run_button = ttk.Button(run_button_frame, text="Run Robocopy Mirror", command=self.start_robocopy_thread)
|
|
self.run_button.pack()
|
|
|
|
|
|
# Output Area Frame
|
|
output_frame = ttk.Frame(main_content_frame, padding="15 15 15 10") # Increased padding
|
|
output_frame.pack(fill=tk.BOTH, expand=True)
|
|
|
|
ttk.Label(output_frame, text="Robocopy Output:").pack(anchor=tk.W)
|
|
self.output_text = scrolledtext.ScrolledText(output_frame, wrap=tk.WORD, height=15, state='disabled')
|
|
self.output_text.pack(fill=tk.BOTH, expand=True, pady=5)
|
|
|
|
# --- Disclaimer Label ---
|
|
# Packed directly into root, so it stays at the very bottom and can be centered
|
|
disclaimer_label = ttk.Label(root, text="Ver. 2.1 @杨昱幸. All Rights Reserved.", font=("Arial", 8))
|
|
disclaimer_label.pack(side=tk.BOTTOM, pady=(5, 10)) # Centered by default when using pack with side=BOTTOM
|
|
|
|
|
|
# --- Start queue check ---
|
|
self.check_queue() # Start checking the queue for updates
|
|
|
|
def browse_source(self):
|
|
directory = filedialog.askdirectory(title="Select Source Directory")
|
|
if directory:
|
|
self.source_var.set(directory)
|
|
|
|
def browse_target(self):
|
|
directory = filedialog.askdirectory(title="Select Target Directory (will be mirrored)")
|
|
if directory:
|
|
self.target_var.set(directory)
|
|
|
|
def start_robocopy_thread(self):
|
|
"""Starts the robocopy mirror process in a separate thread."""
|
|
if self.is_running:
|
|
messagebox.showwarning("Busy", "Robocopy is already running.")
|
|
return
|
|
|
|
source = self.source_var.get().strip()
|
|
target = self.target_var.get().strip()
|
|
|
|
if not source or not target:
|
|
messagebox.showerror("Error", "Please select both Source and Target directories.")
|
|
return
|
|
|
|
# Confirmation for Mirroring (IMPORTANT!)
|
|
confirm = messagebox.askyesno(
|
|
"Confirm Mirror",
|
|
f"WARNING: Mirroring will delete files/folders in the TARGET directory:\n\n{target}\n\n if they do not exist in the SOURCE directory:\n\n{source}\n\n Are you sure you want to proceed?"
|
|
)
|
|
if not confirm:
|
|
return
|
|
|
|
# Disable run button, enable text area
|
|
self.run_button.config(state='disabled')
|
|
self.output_text.config(state='normal')
|
|
self.output_text.delete('1.0', tk.END)
|
|
self.add_output("Starting Robocopy Mirror operation...\n")
|
|
self.add_output(f"Source: {source}\n")
|
|
self.add_output(f"Target: {target}\n")
|
|
# Display the specific flags being used
|
|
self.add_output(f"Flags: {' '.join(MIRROR_ROBOCOPY_FLAGS)}\n")
|
|
self.add_output("-" * 30 + "\n")
|
|
|
|
self.is_running = True
|
|
|
|
# Create and start the thread
|
|
self.thread = threading.Thread(target=self.run_robocopy_process, args=(source, target), daemon=True)
|
|
self.thread.start()
|
|
|
|
def run_robocopy_process(self, source, target):
|
|
"""The actual function running in the thread."""
|
|
try:
|
|
# Determine the console's output encoding, crucial for non-English filenames
|
|
output_encoding = locale.getpreferredencoding(False)
|
|
# Use the MIRROR_ROBOCOPY_FLAGS
|
|
command = ["robocopy", source, target] + MIRROR_ROBOCOPY_FLAGS
|
|
|
|
self.process = subprocess.Popen(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, # Redirect stderr to stdout
|
|
# text=True and encoding are removed to read raw bytes
|
|
bufsize=1,
|
|
creationflags=subprocess.CREATE_NO_WINDOW # Prevent cmd window popup
|
|
)
|
|
|
|
# Read output line by line as bytes and decode
|
|
for line in iter(self.process.stdout.readline, b''):
|
|
if not line: # Check if process has exited
|
|
break
|
|
# Decode using the system's preferred encoding and put into the queue
|
|
self.output_queue.put(line.decode(output_encoding, errors='replace'))
|
|
if self.process.poll() is not None and not line: # Double check if process ended and no more output
|
|
break
|
|
|
|
|
|
if self.process.stdout:
|
|
self.process.stdout.close()
|
|
return_code = self.process.wait()
|
|
|
|
# Send completion message based on return code
|
|
self.output_queue.put("\n" + "-" * 30 + "\n")
|
|
# Robocopy success codes (0-7) indicate completion, possibly with minor issues
|
|
# like skipped files, but the overall operation is considered successful.
|
|
# Codes 8 and higher indicate failures or significant issues.
|
|
if return_code < 8:
|
|
self.output_queue.put(f"Robocopy Mirror finished successfully (Return Code: {return_code}).\n")
|
|
else:
|
|
self.output_queue.put(f"Robocopy Mirror finished with errors (Return Code: {return_code}).\n")
|
|
|
|
except FileNotFoundError:
|
|
self.output_queue.put("\nERROR: 'robocopy' command not found. Is it in your system's PATH?\n")
|
|
except Exception as e:
|
|
self.output_queue.put(f"\nAn error occurred: {e}\n")
|
|
finally:
|
|
# Signal completion to the main thread
|
|
self.output_queue.put(None) # Sentinel value
|
|
|
|
def check_queue(self):
|
|
"""Checks the queue for messages from the worker thread."""
|
|
try:
|
|
while True:
|
|
line = self.output_queue.get_nowait()
|
|
if line is None:
|
|
self.on_robocopy_complete()
|
|
break
|
|
else:
|
|
self.add_output(line)
|
|
except queue.Empty:
|
|
pass
|
|
finally:
|
|
self.root.after(100, self.check_queue)
|
|
|
|
def add_output(self, text):
|
|
"""Appends text to the output area."""
|
|
self.output_text.insert(tk.END, text)
|
|
self.output_text.see(tk.END)
|
|
|
|
def on_robocopy_complete(self):
|
|
"""Called when the robocopy process finishes."""
|
|
self.run_button.config(state='normal')
|
|
# self.output_text.config(state='disabled') # Keep enabled to allow copy-paste
|
|
self.process = None
|
|
self.is_running = False
|
|
messagebox.showinfo("Finished", "Robocopy Mirror process has completed.")
|
|
|
|
def on_closing(self):
|
|
"""Handle window closing event."""
|
|
if self.is_running and self.process:
|
|
if messagebox.askyesno("Quit", "Robocopy might still be running. Terminate it and exit?"):
|
|
try:
|
|
self.process.terminate() # Ask nicely first
|
|
try:
|
|
self.process.wait(timeout=2) # Give it a moment
|
|
except subprocess.TimeoutExpired:
|
|
self.process.kill() # Force kill if necessary
|
|
except Exception: # Handle other wait errors
|
|
pass
|
|
except Exception: # Handle errors during terminate/kill (e.g., process already ended)
|
|
pass
|
|
self.root.destroy()
|
|
else:
|
|
return # Don't close if user says no
|
|
else:
|
|
self.root.destroy()
|
|
|
|
# --- Main Execution ---
|
|
if __name__ == "__main__":
|
|
root = tk.Tk()
|
|
sv_ttk.set_theme("dark") # Set the dark theme
|
|
# Use the renamed class
|
|
app = RobocopyMirrorApp(root)
|
|
root.protocol("WM_DELETE_WINDOW", app.on_closing)
|
|
root.mainloop()
|