diff --git a/Robocopy镜像文件小程序/robocopy_mirror_gui.py b/Robocopy镜像文件小程序/robocopy_mirror_gui.py new file mode 100644 index 0000000..8f82b51 --- /dev/null +++ b/Robocopy镜像文件小程序/robocopy_mirror_gui.py @@ -0,0 +1,220 @@ +import tkinter as tk +from tkinter import ttk, filedialog, messagebox, scrolledtext +import subprocess +import threading +import platform +import queue # For thread-safe communication + +# --- Configuration --- +# Robocopy Flags for Mirroring (/MIR) +# /MIR :: MIRror a directory tree (equivalent to /E plus /PURGE). +# /Z :: copy files in restartable mode. +# /R:3 :: number of Retries on failed copies (default 1 million). Set to 3. +# /W:5 :: Wait time between retries (default 30 seconds). Set to 5. +# /NP :: No Progress - don't display % copied. Makes output cleaner. +# /TEE :: output to console window, as well as the log file. Essential for GUI. +# /V :: Verbose output, showing skipped and *deleted* files. Recommended for /MIR. +# Add other flags if needed, but /MIR is the core for this function. +MIRROR_ROBOCOPY_FLAGS = ["/MIR", "/Z", "/R:3", "/W:5", "/NP", "/TEE", "/V"] + +# --- Main Application Class --- +class RobocopyMirrorApp: # Renamed class for clarity + def __init__(self, root): + self.root = root + # Updated Title + self.root.title("Robocopy Mirror GUI") + self.root.geometry("700x500") # Adjust size as needed + + # Check OS + if platform.system() != "Windows": + messagebox.showerror("OS Error", "Robocopy is a Windows utility. This app only works on Windows.") + self.root.destroy() + return + + self.source_var = tk.StringVar() + self.target_var = tk.StringVar() + self.output_queue = queue.Queue() # Queue for thread-safe GUI updates + self.process = None # To hold the running subprocess + self.is_running = False + + # --- GUI Elements --- + # Frame for inputs + input_frame = ttk.Frame(root, padding="10") + input_frame.pack(fill=tk.X) + + # Source Row + ttk.Label(input_frame, text="Source:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W) + self.source_entry = ttk.Entry(input_frame, textvariable=self.source_var, width=50) + self.source_entry.grid(row=0, column=1, padx=5, pady=5, sticky=tk.EW) + self.source_button = ttk.Button(input_frame, text="Browse...", command=self.browse_source) + self.source_button.grid(row=0, column=2, padx=5, pady=5) + + # Target Row + ttk.Label(input_frame, text="Target:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W) + self.target_entry = ttk.Entry(input_frame, textvariable=self.target_var, width=50) + self.target_entry.grid(row=1, column=1, padx=5, pady=5, sticky=tk.EW) + self.target_button = ttk.Button(input_frame, text="Browse...", command=self.browse_target) + self.target_button.grid(row=1, column=2, padx=5, pady=5) + + # Configure input frame column weights for resizing + input_frame.columnconfigure(1, weight=1) + + # Run Button (Text could be changed to "Run Mirror" but "Run Robocopy" is fine) + self.run_button = ttk.Button(root, text="Run Robocopy Mirror", command=self.start_robocopy_thread) + self.run_button.pack(pady=10) + + # Output Area Frame + output_frame = ttk.Frame(root, padding="10") + output_frame.pack(fill=tk.BOTH, expand=True) + + ttk.Label(output_frame, text="Robocopy Output:").pack(anchor=tk.W) + self.output_text = scrolledtext.ScrolledText(output_frame, wrap=tk.WORD, height=15, state='disabled') + self.output_text.pack(fill=tk.BOTH, expand=True, pady=5) + + # --- Start queue check --- + self.check_queue() # Start checking the queue for updates + + def browse_source(self): + directory = filedialog.askdirectory(title="Select Source Directory") + if directory: + self.source_var.set(directory) + + def browse_target(self): + directory = filedialog.askdirectory(title="Select Target Directory (will be mirrored)") + if directory: + self.target_var.set(directory) + + def start_robocopy_thread(self): + """Starts the robocopy mirror process in a separate thread.""" + if self.is_running: + messagebox.showwarning("Busy", "Robocopy is already running.") + return + + source = self.source_var.get().strip() + target = self.target_var.get().strip() + + if not source or not target: + messagebox.showerror("Error", "Please select both Source and Target directories.") + return + + # Confirmation for Mirroring (IMPORTANT!) + confirm = messagebox.askyesno( + "Confirm Mirror", + f"WARNING: Mirroring will delete files/folders in the TARGET directory:\n\n{target}\n\nif they do not exist in the SOURCE directory:\n\n{source}\n\nAre you sure you want to proceed?" + ) + if not confirm: + return + + # Disable run button, enable text area + self.run_button.config(state='disabled') + self.output_text.config(state='normal') + self.output_text.delete('1.0', tk.END) + self.add_output("Starting Robocopy Mirror operation...\n") + self.add_output(f"Source: {source}\n") + self.add_output(f"Target: {target}\n") + # Display the specific flags being used + self.add_output(f"Flags: {' '.join(MIRROR_ROBOCOPY_FLAGS)}\n") + self.add_output("-" * 30 + "\n") + + self.is_running = True + + # Create and start the thread + self.thread = threading.Thread(target=self.run_robocopy_process, args=(source, target), daemon=True) + self.thread.start() + + def run_robocopy_process(self, source, target): + """The actual function running in the thread.""" + try: + # Use the MIRROR_ROBOCOPY_FLAGS + command = ["robocopy", source, target] + MIRROR_ROBOCOPY_FLAGS + + self.process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # Redirect stderr to stdout + text=True, + encoding='utf-8', + errors='replace', + bufsize=1, + creationflags=subprocess.CREATE_NO_WINDOW # Prevent cmd window popup + ) + + # Read output line by line + for line in iter(self.process.stdout.readline, ''): + if not line: + break + self.output_queue.put(line) + + self.process.stdout.close() + return_code = self.process.wait() + + # Send completion message based on return code + self.output_queue.put("\n" + "-" * 30 + "\n") + # Robocopy success codes (0-7) indicate completion, possibly with minor issues + # like skipped files, but the overall operation is considered successful. + # Codes 8 and higher indicate failures or significant issues. + if return_code < 8: + self.output_queue.put(f"Robocopy Mirror finished successfully (Return Code: {return_code}).\n") + else: + self.output_queue.put(f"Robocopy Mirror finished with errors (Return Code: {return_code}).\n") + + except FileNotFoundError: + self.output_queue.put("\nERROR: 'robocopy' command not found. Is it in your system's PATH?\n") + except Exception as e: + self.output_queue.put(f"\nAn error occurred: {e}\n") + finally: + # Signal completion to the main thread + self.output_queue.put(None) # Sentinel value + + def check_queue(self): + """Checks the queue for messages from the worker thread.""" + try: + while True: + line = self.output_queue.get_nowait() + if line is None: + self.on_robocopy_complete() + break + else: + self.add_output(line) + except queue.Empty: + pass + finally: + self.root.after(100, self.check_queue) + + def add_output(self, text): + """Appends text to the output area.""" + self.output_text.insert(tk.END, text) + self.output_text.see(tk.END) + + def on_robocopy_complete(self): + """Called when the robocopy process finishes.""" + self.run_button.config(state='normal') + self.output_text.config(state='disabled') + self.process = None + self.is_running = False + messagebox.showinfo("Finished", "Robocopy Mirror process has completed.") + + def on_closing(self): + """Handle window closing event.""" + if self.is_running and self.process: + if messagebox.askyesno("Quit", "Robocopy might still be running. Terminate it and exit?"): + try: + self.process.terminate() + self.process.wait(timeout=2) + except subprocess.TimeoutExpired: + self.process.kill() + except Exception: + pass + self.root.destroy() + else: + return + else: + self.root.destroy() + +# --- Main Execution --- +if __name__ == "__main__": + root = tk.Tk() + # Use the renamed class + app = RobocopyMirrorApp(root) + root.protocol("WM_DELETE_WINDOW", app.on_closing) + root.mainloop() \ No newline at end of file