使用AList和Rclone实现网盘挂载并批量下载

篇文章分享如何利用AList和Rclone实现网盘挂载、输出网盘内资源目录,并利用python实现本地自动批量下载。这一套方法对于日常网盘使用场景来说没有太大用处,但适用于“网盘中存放了500T图书资源,需要快速搜索并批量下载”这种需求。

安装AList并挂载百度网盘

这一步直接参考这篇知乎文章就可以。

安装并配置Rclone

(1)下载rclone安装包并解压

(2)配置rclone
在解压后的文件夹上方路径地址栏输入cmd并回车,打开该路径下的cmd;

输入 rclone config ,然后输入n开始新建远程连接,新连接命名为alist;

接着输入 webdav 或对应的编号 63 来指定webdav服务;

接着根据提示依次输入:
url: http://127.0.0.1:5244/dav/
vendor: other
user:输入AList的用户名
password:输入AList的登录密码(注意:输入的密码不显示,正常回车即可)
设置完成后退出。

导出资源目录

继续在cmd窗口输入:

.\rclone lsf --files-only -R "alist:/读 秀DX书库(2.0-5.0)" | Out-File -Encoding utf8 "D:\读秀500T.txt"

就可以在指定文件夹将所有图书目录导出为一个单独的txt文件。

用python实现批量下载

利用python编写了一个批量下载图书的可视化界面,实现如下功能:

(1)指定各个工具路径;
(2)打开、关闭AList服务;
(2)调用notepad++进行图书检索;
(3)将搜索到的路径粘贴到文本框;
(4)点击下载后进行批量下载。


代码如下,可继续根据自己需求进一步优化。

import tkinter as tk
from tkinter import filedialog, messagebox, scrolledtext
import subprocess
import os
import webbrowser
import threading
import sys

class AListDuxiuTools:
    def __init__(self, root):
        self.root = root
        self.root.title("百度网盘文件下载工具")
        self.root.geometry("750x800") 
        
        # 内部进程变量
        self.alist_process = None
        self.download_thread = None
        
        # ================= 默认路径配置=================
        self.default_alist_path = r"D:\软件\alist-windows-amd64\alist.exe"
        self.default_rclone_path = r"D:\软件\rclone-v1.74.3-windows-amd64\rclone.exe"
        self.default_save_dir = r"D:\图书下载"
        self.default_notepad_path = r"D:\Program Files\portables\npp.8.9.6.2.portable.x64\notepad++.exe"
        self.default_total_list_path = r"D:\读秀500T.txt"
        
        # 固定默认路径前缀
        self.default_prefix = r"alist:/读 秀DX书库(2.0-5.0)/"

        self.create_widgets()

    def create_widgets(self):
        # 1. 核心路径配置区
        config_frame = tk.LabelFrame(self.root, text=" 基础环境配置 ", padx=10, pady=10)
        config_frame.pack(fill="x", padx=15, pady=10)

        # Alist 路径
        tk.Label(config_frame, text="AList 运行路径:").grid(row=0, column=0, sticky="w")
        self.alist_path_var = tk.StringVar(value=self.default_alist_path)
        tk.Entry(config_frame, textvariable=self.alist_path_var, width=60).grid(row=0, column=1, padx=5, pady=5)
        tk.Button(config_frame, text="浏览...", command=self.browse_alist).grid(row=0, column=2, padx=5)

        # Rclone 路径
        tk.Label(config_frame, text="Rclone 运行路径:").grid(row=1, column=0, sticky="w")
        self.rclone_path_var = tk.StringVar(value=self.default_rclone_path)
        tk.Entry(config_frame, textvariable=self.rclone_path_var, width=60).grid(row=1, column=1, padx=5, pady=5)
        tk.Button(config_frame, text="浏览...", command=self.browse_rclone).grid(row=1, column=2, padx=5)

        # Notepad++ 路径配置
        tk.Label(config_frame, text="Notepad++ 路径:").grid(row=2, column=0, sticky="w")
        self.notepad_path_var = tk.StringVar(value=self.default_notepad_path)
        tk.Entry(config_frame, textvariable=self.notepad_path_var, width=60).grid(row=2, column=1, padx=5, pady=5)
        tk.Button(config_frame, text="浏览...", command=self.browse_notepad).grid(row=2, column=2, padx=5)

        # 2. AList 服务开关区
        alist_frame = tk.LabelFrame(self.root, text=" AList 服务控制开关 ", padx=10, pady=10)
        alist_frame.pack(fill="x", padx=15, pady=5)

        self.lbl_status = tk.Label(alist_frame, text="AList 状态: 已关闭", fg="red", font=("Helvetica", 10, "bold"))
        self.lbl_status.pack(side="left", padx=10)

        self.btn_start = tk.Button(alist_frame, text="▶ 开启 AList 服务", bg="#4CAF50", fg="white", padx=10, command=self.start_alist)
        self.btn_start.pack(side="left", padx=5)

        self.btn_stop = tk.Button(alist_frame, text="■ 关闭 AList 服务", bg="#f44336", fg="white", padx=10, state="disabled", command=self.stop_alist)
        self.btn_stop.pack(side="left", padx=5)

        self.btn_browser = tk.Button(alist_frame, text="🌐 自动跳转网页后台", bg="#2196F3", fg="white", padx=10, command=self.open_browser)
        self.btn_browser.pack(side="left", padx=5)

        # 3. 下载区
        download_frame = tk.LabelFrame(self.root, text=" 批量下载 ", padx=10, pady=10)
        download_frame.pack(fill="x", padx=15, pady=5)

        prefix_frame = tk.Frame(download_frame)
        prefix_frame.pack(fill="x", pady=2)
        tk.Label(prefix_frame, text="路径前缀 (自动填写/可修改):").pack(side="left")
        self.prefix_var = tk.StringVar(value=self.default_prefix)
        tk.Entry(prefix_frame, textvariable=self.prefix_var, width=65).pack(side="left", padx=5)

        # 远程路径输入(升级为多行文本框,满足意见2)
        tk.Label(download_frame, text="请输入网盘图书地址 (每行一个路径,支持批量复制粘贴):\n注:若单行路径不以 alist:/ 开头,系统将自动在行首补齐上方的前缀。", justify="left", fg="#555555").pack(anchor="w", pady=4)
        
        self.remote_paths_text = scrolledtext.ScrolledText(download_frame, height=6, width=90, font=("Consolas", 10))
        self.remote_paths_text.pack(fill="x", padx=5, pady=5)
        # 默认填入一个示例
        self.remote_paths_text.insert(tk.END, "5.0(新书)/5/5/12887097.pdf\n")

        # 本地保存路径
        path_select_frame = tk.Frame(download_frame)
        path_select_frame.pack(fill="x", pady=5)
        tk.Label(path_select_frame, text="本地保存文件夹:").pack(side="left")
        self.save_dir_var = tk.StringVar(value=self.default_save_dir)
        tk.Entry(path_select_frame, textvariable=self.save_dir_var, width=65).pack(side="left", padx=5)
        tk.Button(path_select_frame, text="选择文件夹...", command=self.browse_save_dir).pack(side="left", padx=5)

        # 下载执行按钮
        self.btn_download = tk.Button(download_frame, text="开始下载", bg="#FF9800", fg="white", font=("Helvetica", 11, "bold"), pady=5, command=self.start_download_thread)
        self.btn_download.pack(fill="x", padx=5, pady=5)

        # 4. 快捷辅助工具面板
        tools_frame = tk.LabelFrame(self.root, text=" 快捷辅助工具 ", padx=10, pady=10)
        tools_frame.pack(fill="x", padx=15, pady=5)

        tk.Label(tools_frame, text="500T图书目录:").pack(side="left")
        self.total_list_var = tk.StringVar(value=self.default_total_list_path)
        tk.Entry(tools_frame, textvariable=self.total_list_var, width=50).pack(side="left", padx=5)
        tk.Button(tools_frame, text="浏览...", command=self.browse_total_list).pack(side="left", padx=5)
        
        tk.Button(tools_frame, text="用 Notepad++ 打开目录", bg="#9C27B0", fg="white", font=("Helvetica", 9, "bold"), padx=10, command=self.open_in_notepad).pack(side="left", padx=10)

        # 5. 实时日志显示区
        log_frame = tk.LabelFrame(self.root, text=" 实时执行日志与下载进度 ")
        log_frame.pack(fill="both", expand=True, padx=15, pady=10)
        
        self.log_box = scrolledtext.ScrolledText(log_frame, bg="#1e1e1e", fg="#00ff00", font=("Consolas", 9))
        self.log_box.pack(fill="both", expand=True, padx=5, pady=5)
        self.log("请输入图书路径(每行一个),然后启动批量队列下载。")

    # ================= 业务逻辑控制 =================
    def log(self, message):
        self.log_box.insert(tk.END, message + "\n")
        self.log_box.see(tk.END)

    def browse_alist(self):
        filename = filedialog.askopenfilename(filetypes=[("Executable", "*.exe")])
        if filename: self.alist_path_var.set(filename)

    def browse_rclone(self):
        filename = filedialog.askopenfilename(filetypes=[("Executable", "*.exe")])
        if filename: self.rclone_path_var.set(filename)

    def browse_save_dir(self):
        directory = filedialog.askdirectory()
        if directory: self.save_dir_var.set(directory)

    def browse_notepad(self):
        filename = filedialog.askopenfilename(filetypes=[("Executable", "*.exe")])
        if filename: self.notepad_path_var.set(filename)

    def browse_total_list(self):
        filename = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt"), ("All Files", "*.*")])
        if filename: self.total_list_var.set(filename)

    def open_in_notepad(self):
        notepad_exe = self.notepad_path_var.get().strip()
        total_list_path = self.total_list_var.get().strip()

        if not os.path.exists(notepad_exe):
            messagebox.showerror("错误", "在指定路径下找不到 notepad++.exe!")
            return
        if not os.path.exists(total_list_path):
            messagebox.showerror("错误", f"找不到图书目录文件:\n{total_list_path}")
            return

        try:
            subprocess.Popen([notepad_exe, total_list_path])
            self.log(f"[+] 成功联调:已唤醒 Notepad++ 并自动载入图书目录 {os.path.basename(total_list_path)}")
        except Exception as e:
            self.log(f"[-] 联动 Notepad++ 失败,原因: {str(e)}")

    def open_browser(self):
        webbrowser.open("http://localhost:5244")
        self.log("[*] 已尝试在浏览器中打开 http://localhost:5244")

    def start_alist(self):
        alist_exe = self.alist_path_var.get()
        if not os.path.exists(alist_exe):
            messagebox.showerror("错误", "找不到 alist.exe!")
            return
        
        alist_cwd = os.path.dirname(alist_exe)
        
        try:
            self.alist_process = subprocess.Popen(
                [alist_exe, "server"],
                cwd=alist_cwd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
            )
            
            self.lbl_status.config(text="AList 状态: 运行中", fg="green")
            self.btn_start.config(state="disabled")
            self.btn_stop.config(state="normal")
            self.log("[+] AList 服务后台开启成功!")
            
            threading.Thread(target=self.read_process_output, args=(self.alist_process, "[AList Service] "), daemon=True).start()
        except Exception as e:
            self.log(f"[-] AList 启动失败: {str(e)}")

    def stop_alist(self):
        if self.alist_process:
            subprocess.call(['taskkill', '/F', '/T', '/PID', str(self.alist_process.pid)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
            self.alist_process = None
            self.lbl_status.config(text="AList 状态: 已关闭", fg="red")
            self.btn_start.config(state="normal")
            self.btn_stop.config(state="disabled")
            self.log("[■] AList 服务已主动断开终止。")

    def read_process_output(self, process, prefix):
        while process and process.poll() is None:
            line = process.stdout.readline()
            if line:
                self.root.after(0, self.log, prefix + line.strip())

    # ================= 队列下载核心控制逻辑 =================
    def start_download_thread(self):
        # 提取多行文本框中的所有路径,并过滤空行
        raw_text = self.remote_paths_text.get("1.0", tk.END)
        remote_paths = [line.strip() for line in raw_text.split('\n') if line.strip()]
        
        save_dir = self.save_dir_var.get().strip()
        rclone_exe = self.rclone_path_var.get().strip()
        prefix = self.prefix_var.get().strip() # 获取当前用户设定的前缀

        if not remote_paths or not save_dir:
            messagebox.showwarning("提示", "请输入网盘图书地址并选择本地保存文件夹!")
            return
        if not os.path.exists(rclone_exe):
            messagebox.showerror("错误", "找不到 rclone.exe,请检查配置!")
            return

        self.btn_download.config(state="disabled", text="⏳ 队列下载中...")
        # 异步队列多线程抛出
        self.download_thread = threading.Thread(target=self.execute_queue_download, args=(rclone_exe, remote_paths, prefix, save_dir), daemon=True)
        self.download_thread.start()

    def execute_queue_download(self, rclone_exe, remote_paths, prefix, save_dir):
        total_files = len(remote_paths)
        self.root.after(0, self.log, f"\n启动下载任务,共计 {total_files} 个文件...")
        
        # 遍历路径队列
        for idx, path in enumerate(remote_paths, 1):
            # 自动前缀补齐逻辑
            full_remote_path = path
            if not path.startswith("alist:/"):
                base_prefix = prefix if prefix.endswith("/") else prefix + "/"
                sub_path = path[1:] if path.startswith("/") else path
                full_remote_path = base_prefix + sub_path

            self.root.after(0, self.log, f"\n[*] 正在下载 [{idx}/{total_files}]: {full_remote_path}")
            
            filename = full_remote_path.split('/')[-1]
            if not filename:
                self.root.after(0, self.log, f"[-] [{idx}/{total_files}] 路径解析失败,跳过该行。")
                continue
                
            local_final_path = os.path.join(save_dir, filename)

            startupinfo = None
            if sys.platform == "win32":
                startupinfo = subprocess.STARTUPINFO()
                startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
                startupinfo.wShowWindow = subprocess.SW_HIDE

            cmd = [rclone_exe, "copyto", full_remote_path, local_final_path, "-v", "--progress"]
            
            try:
                process = subprocess.Popen(
                    cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    text=True,
                    encoding='utf-8',
                    errors='ignore',
                    startupinfo=startupinfo
                )

                while True:
                    line = process.stdout.readline()
                    if not line and process.poll() is not None:
                        break
                    if line:
                        clean_line = line.strip()
                        if "Transferred" in clean_line or "Elapsed" in clean_line or "ETA" in clean_line or "ERROR" in clean_line:
                            self.root.after(0, self.log, f"[Queue {idx}] {clean_line}")

                if process.returncode == 0:
                    self.root.after(0, self.log, f"▲【成功】[{idx}/{total_files}] 书籍已送达:{local_final_path}")
                else:
                    self.root.after(0, self.log, f"▼【失败】[{idx}/{total_files}] Rclone 错误代码: {process.returncode}")
            except Exception as e:
                self.root.after(0, self.log, f"▼【异常】[{idx}/{total_files}] 运行报错: {str(e)}")
        
        self.root.after(0, self.log, "\n所有队列下载任务已全部执行完毕!")
        self.root.after(0, lambda: messagebox.showinfo("完成", f"批量下载结束!\n共成功处理 {total_files} 个文件任务。"))
        self.root.after(0, self.reset_download_btn)

    def reset_download_btn(self):
        self.btn_download.config(state="normal", text="开始下载")

# ================= 启动主循环 =================
if __name__ == "__main__":
    root = tk.Tk()
    app = AListDuxiuTools(root)
    
    def on_closing():
        if app.alist_process:
            app.stop_alist()
        root.destroy()
        
    root.protocol("WM_DELETE_WINDOW", on_closing)
    root.mainloop()