[PYTHON] 폴더 동기화(단방향,양방향) 코드

python


import sys

import os

import shutil

import threading

from datetime import datetime

import time

from concurrent.futures import ThreadPoolExecutor

from queue import Queue

from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,

                             QPushButton, QLabel, QFileDialog, QLineEdit, 

                             QTextEdit, QProgressBar, QRadioButton)

from PyQt6.QtCore import Qt, QTimer

from PyQt6.QtGui import QFont, QIcon


class SyncWindow(QMainWindow):

    def __init__(self):

        super().__init__()

        self.setWindowTitle("폴더 동기화")

        self.setGeometry(100, 100, 800, 600)


        # 앱 아이콘 설정

        if getattr(sys, 'frozen', False):

            base_path = sys._MEIPASS

        else:

            base_path = os.path.dirname(__file__)

        icon_path = os.path.join(base_path, "icon.icns")

        self.log_text = QTextEdit()  # log_text 초기화 후 사용

        self.log_text.setReadOnly(True)

        if os.path.exists(icon_path):

            self.setWindowIcon(QIcon(icon_path))

            self.log_text.append(f"Icon loaded from: {icon_path}")

        else:

            self.log_text.append(f"Icon not found at: {icon_path}")


        base_font = QFont()

        base_font.setPointSize(int(base_font.pointSize() * 1.5))

        self.setFont(base_font)


        main_widget = QWidget()

        self.setCentralWidget(main_widget)

        layout = QVBoxLayout(main_widget)


        folder1_widget = QWidget()

        folder1_layout = QHBoxLayout(folder1_widget)

        self.folder1_label = QLabel("Src 폴더:")

        self.folder1_path = QLineEdit()

        self.folder1_path.setReadOnly(True)

        self.folder1_btn = QPushButton("선택")

        self.folder1_btn.setFixedWidth(80)

        self.folder1_btn.clicked.connect(self.select_folder1)

        folder1_layout.addWidget(self.folder1_label)

        folder1_layout.addWidget(self.folder1_path)

        folder1_layout.addWidget(self.folder1_btn)


        folder2_widget = QWidget()

        folder2_layout = QHBoxLayout(folder2_widget)

        self.folder2_label = QLabel("Dest 폴더:")

        self.folder2_path = QLineEdit()

        self.folder2_path.setReadOnly(True)

        self.folder2_btn = QPushButton("선택")

        self.folder2_btn.setFixedWidth(80)

        self.folder2_btn.clicked.connect(self.select_folder2)

        folder2_layout.addWidget(self.folder2_label)

        folder2_layout.addWidget(self.folder2_path)

        folder2_layout.addWidget(self.folder2_btn)


        mode_widget = QWidget()

        mode_layout = QHBoxLayout(mode_widget)

        self.one_way_radio = QRadioButton("단방향 (Src -> Dest)")

        self.two_way_radio = QRadioButton("양방향 (Src <-> Dest)")

        self.one_way_radio.setChecked(True)

        mode_layout.addWidget(self.one_way_radio)

        mode_layout.addWidget(self.two_way_radio)


        self.sync_btn = QPushButton("동기화 시작")

        self.sync_btn.clicked.connect(self.start_sync)


        self.progress_bar = QProgressBar()

        self.progress_bar.setValue(0)

        self.progress_label = QLabel("진행률: 0 MB / 0 MB")

        self.time_label = QLabel("경과 시간: 00:00 | 남은 시간: --초")


        layout.addWidget(folder1_widget)

        layout.addWidget(folder2_widget)

        layout.addWidget(mode_widget)

        layout.addSpacing(20)

        layout.addWidget(self.sync_btn)

        layout.addWidget(self.progress_bar)

        layout.addWidget(self.progress_label)

        layout.addWidget(self.time_label)

        layout.addWidget(self.log_text)


        self.folder1 = ""

        self.folder2 = ""

        self.total_size = 0

        self.previous_files1 = {}

        self.previous_files2 = {}

        self.current_files1 = {}

        self.current_files2 = {}

        self.synced_size = 0

        self.processed_size = 0

        self.total_work_size = 0

        self.total_files = 0

        self.result_queue = Queue()

        self.lock = threading.Lock()

        self.LARGE_FILE_THRESHOLD = 10 * 1024 * 1024

        self.CHUNK_SIZE = 1000

        self.start_time = None


    def select_folder1(self):

        folder = QFileDialog.getExistingDirectory(self, "Src 폴더 선택")

        if folder:

            self.folder1 = folder

            self.folder1_path.setText(folder)

            self.folder1_label.setText(f"Src 폴더:")


    def select_folder2(self):

        folder = QFileDialog.getExistingDirectory(self, "Dest 폴더 선택")

        if folder:

            self.folder2 = folder

            self.folder2_path.setText(folder)

            self.folder2_label.setText(f"Dest 폴더:")


    def calculate_folder_size(self, folder):

        total_size = 0

        file_count = 0

        for dirpath, _, filenames in os.walk(folder):

            for f in filenames:

                fp = os.path.join(dirpath, f)

                total_size += os.path.getsize(fp)

                file_count += 1

        return total_size / (1024 * 1024), file_count


    def get_file_info_generator(self, folder):

        self.result_queue.put((f"Scanning files in {folder}", 0))

        for dirpath, _, filenames in os.walk(folder):

            rel_dir = os.path.relpath(dirpath, folder)

            for f in filenames:

                fp = os.path.join(dirpath, f)

                rel_path = os.path.join(rel_dir, f) if rel_dir != "." else f

                stat = os.stat(fp)

                yield rel_path, {

                    "size": stat.st_size,

                    "mtime": datetime.fromtimestamp(stat.st_mtime)

                }


    def sync_file(self, rel_path, files1, files2):

        path1 = os.path.join(self.folder1, rel_path)

        path2 = os.path.join(self.folder2, rel_path)

        file1 = files1.get(rel_path)

        file2 = files2.get(rel_path)

        is_two_way = self.two_way_radio.isChecked()


        result = None

        processed_size = 0

        try:

            if file1 and not file2:

                dst_dir = os.path.dirname(path2)

                if not os.path.exists(dst_dir):

                    os.makedirs(dst_dir, exist_ok=True)

                shutil.copy2(path1, path2)

                size_mb = file1["size"] / (1024 * 1024)

                result = (f"{rel_path}: Src -> Dest 복사", size_mb)

                processed_size = size_mb

            elif file1 and file2 and file1["mtime"] > file2["mtime"]:

                shutil.copy2(path1, path2)

                size_mb = file1["size"] / (1024 * 1024)

                result = (f"{rel_path}: Src -> Dest (최신)", size_mb)

                processed_size = size_mb


            if is_two_way:

                if file2 and not file1:

                    dst_dir = os.path.dirname(path1)

                    if not os.path.exists(dst_dir):

                        os.makedirs(dst_dir, exist_ok=True)

                    shutil.copy2(path2, path1)

                    size_mb = file2["size"] / (1024 * 1024)

                    result = (f"{rel_path}: Dest -> Src 복사", size_mb)

                    processed_size = size_mb

                elif file1 and file2 and file2["mtime"] > file1["mtime"]:

                    shutil.copy2(path2, path1)

                    size_mb = file2["size"] / (1024 * 1024)

                    result = (f"{rel_path}: Dest -> Src (최신)", size_mb)

                    processed_size = size_mb


                if rel_path in self.previous_files1 and not file1 and file2:

                    size_mb = file2["size"] / (1024 * 1024)

                    os.remove(path2)

                    result = (f"{rel_path}: Src에서 삭제 -> Dest 삭제", 0)

                    processed_size = size_mb

                elif rel_path in self.previous_files2 and not file2 and file1:

                    size_mb = file1["size"] / (1024 * 1024)

                    os.remove(path1)

                    result = (f"{rel_path}: Dest에서 삭제 -> Src 삭제", 0)

                    processed_size = size_mb


            with self.lock:

                if result:

                    self.result_queue.put(result)

                self.processed_size += processed_size

        except Exception as e:

            self.result_queue.put((f"Error processing {rel_path}: {str(e)}", 0))


    def calculate_total_work_size(self, files1, files2, prev_files1, prev_files2):

        total_size = 0

        all_files = set(files1.keys()).union(files2.keys())

        is_two_way = self.two_way_radio.isChecked()


        for rel_path in all_files:

            file1 = files1.get(rel_path)

            file2 = files2.get(rel_path)

            if file1 and not file2:

                total_size += file1["size"] / (1024 * 1024)

            elif file1 and file2 and file1["mtime"] > file2["mtime"]:

                total_size += file1["size"] / (1024 * 1024)

            

            if is_two_way:

                if file2 and not file1:

                    total_size += file2["size"] / (1024 * 1024)

                elif file1 and file2 and file2["mtime"] > file1["mtime"]:

                    total_size += file2["size"] / (1024 * 1024)

                if rel_path in prev_files1 and not file1 and file2:

                    total_size += file2["size"] / (1024 * 1024)

                elif rel_path in prev_files2 and not file2 and file1:

                    total_size += file1["size"] / (1024 * 1024)

        return total_size


    def process_chunk(self, chunk, files1, files2):

        large_files = [rel_path for rel_path in chunk 

                       if (files1.get(rel_path, {}).get("size", 0) > self.LARGE_FILE_THRESHOLD or 

                           files2.get(rel_path, {}).get("size", 0) > self.LARGE_FILE_THRESHOLD)]

        small_files = [rel_path for rel_path in chunk if rel_path not in large_files]


        if large_files:

            with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:

                executor.map(lambda rel_path: self.sync_file(rel_path, files1, files2), large_files)


        for rel_path in small_files:

            self.sync_file(rel_path, files1, files2)


    def sync_folders(self):

        mode = "양방향" if self.two_way_radio.isChecked() else "단방향"

        self.result_queue.put((f"Starting folder scan ({mode})...", 0))

        self.current_files1 = dict(self.get_file_info_generator(self.folder1))

        self.current_files2 = dict(self.get_file_info_generator(self.folder2))

        all_files = list(set(self.current_files1.keys()).union(self.current_files2.keys()))

        self.total_files = len(all_files)

        self.total_work_size = self.calculate_total_work_size(

            self.current_files1, self.current_files2, self.previous_files1, self.previous_files2

        )

        self.result_queue.put((f"Total files: {self.total_files}, Work size: {self.total_work_size:.2f} MB", 0))


        for i in range(0, len(all_files), self.CHUNK_SIZE):

            chunk = all_files[i:i + self.CHUNK_SIZE]

            self.result_queue.put((f"Processing chunk {i//self.CHUNK_SIZE + 1} ({len(chunk)} files)...", 0))

            self.process_chunk(chunk, self.current_files1, self.current_files2)


        self.result_queue.put(("Sync completed", 0))


    def update_ui(self):

        while not self.result_queue.empty():

            log_msg, size_mb = self.result_queue.get()

            self.log_text.append(log_msg)

            self.synced_size += size_mb


        if self.total_work_size > 0 and self.start_time:

            elapsed_time = time.time() - self.start_time

            elapsed_minutes, elapsed_seconds = divmod(int(elapsed_time), 60)

            elapsed_str = f"{elapsed_minutes:02d}:{elapsed_seconds:02d}"


            progress = self.processed_size / self.total_work_size

            if progress > 0:

                total_time_est = elapsed_time / progress

                remaining_time = int(total_time_est - elapsed_time)

                remaining_str = f"{remaining_time}초"

            else:

                remaining_str = "--초"


            self.time_label.setText(f"경과 시간: {elapsed_str} | 남은 시간: {remaining_str}")


        if self.total_work_size > 0:

            progress = (self.processed_size / self.total_work_size) * 100

            self.progress_bar.setValue(int(progress))

            self.progress_label.setText(f"진행률: {self.processed_size:.2f} MB / {self.total_work_size:.2f} MB")


        if self.processed_size >= self.total_work_size - 0.01:

            self.log_text.append("동기화 완료!")

            self.sync_btn.setEnabled(True)

            self.timer.stop()

        QApplication.processEvents()


    def start_sync(self):

        if not self.folder1 or not self.folder2:

            self.log_text.append("두 폴더를 모두 선택하세요!")

            return


        self.log_text.clear()

        self.log_text.append("동기화 시작...")

        self.sync_btn.setEnabled(False)

        self.synced_size = 0

        self.processed_size = 0

        self.result_queue = Queue()

        self.start_time = time.time()


        self.previous_files1 = dict(self.get_file_info_generator(self.folder1))

        self.previous_files2 = dict(self.get_file_info_generator(self.folder2))


        size1, count1 = self.calculate_folder_size(self.folder1)

        size2, count2 = self.calculate_folder_size(self.folder2)

        self.total_size = max(size1, size2)

        self.result_queue.put((f"Src: {count1} files, {size1:.2f} MB; Dest: {count2} files, {size2:.2f} MB", 0))


        threading.Thread(target=self.sync_folders).start()


        self.timer = QTimer(self)

        self.timer.timeout.connect(self.update_ui)

        self.timer.start(500)


def main():

    app = QApplication(sys.argv)

    window = SyncWindow()

    window.show()

    sys.exit(app.exec())


if __name__ == "__main__":

    main()



#python #rsync #폴더동기화 #단방향 #양방향 #앱 #생산성

댓글 쓰기

다음 이전