use chrono::{DateTime, Local, TimeZone, Utc}; use clap::{Parser, Subcommand}; use colored::Colorize; use indicatif::{ProgressBar, ProgressStyle}; use robotstxt::DefaultMatcher; use search_hub::config::{config_file_path, Config}; use search_hub::importer::chrome::ChromeImporter; use search_hub::importer::firefox::FirefoxImporter; use search_hub::importer::zen::ZenImporter; use search_hub::importer::Importer; use search_hub::models::Bookmark; use search_hub::storage; use search_hub::tagging::{default_tags, TagDef, TaggingEngine}; use search_hub::web; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::fs; use std::path::PathBuf; use std::sync::{Arc, Mutex, RwLock}; use tracing::{error, info}; const USER_AGENT: &str = concat!("search_hub/", env!("CARGO_PKG_VERSION")); #[derive(Serialize, Deserialize)] struct CacheEntry { body: String, fetched_at: DateTime<Utc>, } struct Fetcher { client: reqwest::Client, rt: tokio::runtime::Runtime, robots_cache: Mutex<HashMap<String, CacheEntry>>, cache_path: PathBuf, } impl Fetcher { fn new(cache_path: PathBuf) -> anyhow::Result<Self> { let client = reqwest::Client::builder() .user_agent(USER_AGENT) .build()?; let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; let cache = Self::load_cache(&cache_path); Ok(Self { client, rt, robots_cache: Mutex::new(cache), cache_path }) } fn load_cache(path: &PathBuf) -> HashMap<String, CacheEntry> { if path.exists() { match fs::read_to_string(path) { Ok(data) => { serde_json::from_str(&data).unwrap_or_default() } Err(_) => HashMap::new(), } } else { HashMap::new() } } fn save_cache(&self) { if let Ok(cache) = self.robots_cache.lock() { if let Ok(data) = serde_json::to_string(&*cache) { let _ = fs::write(&self.cache_path, data); } } } fn fetch(&mut self, url: &str) -> Result<String, String> { self.rt.block_on(self.fetch_async(url)) } async fn fetch_async(&self, url: &str) -> Result<String, String> { let parsed = url::Url::parse(url).map_err(|e| format!("invalid url: {e}"))?; let domain = parsed.host_str().unwrap_or("").to_string(); let port = parsed.port(); let one_month = chrono::Duration::days(30); // Check cache (no await while holding lock) let need_fetch = { let cache = self.robots_cache.lock() .map_err(|_| "internal error: lock poisoned".to_string())?; let expired = cache.get(&domain) .is_none_or(|e| Utc::now() - e.fetched_at > one_month); !cache.contains_key(&domain) || expired }; // Fetch robots.txt if needed (no lock held during await) if need_fetch { let robots_url = match port { Some(p) => format!("{}://{}:{}/robots.txt", parsed.scheme(), domain, p), None => format!("{}://{}/robots.txt", parsed.scheme(), domain), }; let body = match self.client.get(&robots_url).send().await { Ok(resp) => resp.text().await.unwrap_or_default(), Err(_) => String::new(), }; // Brief lock to update cache (no await) let mut cache = self.robots_cache.lock() .map_err(|_| "internal error: lock poisoned".to_string())?; cache.insert(domain.clone(), CacheEntry { body, fetched_at: Utc::now() }); } // Check if allowed (brief lock, no await) let body = { let cache = self.robots_cache.lock() .map_err(|_| "internal error: lock poisoned".to_string())?; cache.get(&domain).map(|e| e.body.clone()).unwrap_or_default() }; if !DefaultMatcher::default() .one_agent_allowed_by_robots(&body, USER_AGENT, url) { return Err("blocked by robots.txt".to_string()); } let resp = self.client.get(url).send().await .map_err(|e| format!("failed to fetch {url}: {e}"))?; resp.text().await.map_err(|e| format!("failed to read body for {url}: {e}")) } } impl Drop for Fetcher { fn drop(&mut self) { self.save_cache(); } } #[derive(Parser)] #[command(author, version, about)] struct Args { #[arg(short, long, global = true)] /// Path to config file (default: ~/.config/search_hub/config.toml) config: Option<String>, #[command(subcommand)] command: Command, } #[derive(Subcommand)] enum Command { /// Start the web server Serve { #[arg(short, long, default_value_t = 8080)] port: u16, #[arg(long)] /// Bookmark database path (default: platform data directory) db_path: Option<String>, }, /// Add a bookmark Insert { title: String, url: String, #[arg(short, long)] description: Option<String>, #[arg(long)] /// Bookmark database path (default: platform data directory) db_path: Option<String>, }, /// List all bookmarks List { #[arg(long)] /// Bookmark database path (default: platform data directory) db_path: Option<String>, }, /// Delete a bookmark by ID Remove { #[arg(short, long)] id: i32, #[arg(long)] /// Bookmark database path (default: platform data directory) db_path: Option<String>, }, /// Search bookmarks from the terminal Search { /// FTS5 search query query: String, #[arg(long)] /// Bookmark database path (default: platform data directory) db_path: Option<String>, }, /// Re-run tagging on bookmarks Retag { #[arg(short, long)] /// IDs of bookmarks to retag (comma-separated). If empty without --all, enter interactive mode. id: Vec<i64>, /// Retag all bookmarks that have content #[arg(long)] all: bool, #[arg(long)] /// Bookmark database path (default: platform data directory) db_path: Option<String>, }, /// Import data from a browser Import { #[command(subcommand)] action: ImportAction, }, /// Create a default config file at the default config path InitConfig, /// Check for updates and apply them automatically SelfUpdate { #[arg(long)] /// Release feed URL (default: abbaye Atom feed) feed_url: Option<String>, #[arg(long)] /// Target triple for binary download (default: auto-detected) target: Option<String>, #[arg(long)] /// Check for updates without downloading dry_run: bool, }, } #[derive(Subcommand)] enum ImportAction { /// Import bookmarks from a browser Bookmarks { /// Browser to import from (e.g. "firefox") source: String, #[arg(short, long)] /// Path to the browser profile directory (auto-discovered if omitted) profile: Option<String>, #[arg(long)] /// Target database path (default: platform data directory) db_path: Option<String>, }, /// Import history from a browser History { /// Browser to import from (e.g. "firefox") source: String, #[arg(short, long)] /// Path to the browser profile directory (auto-discovered if omitted) profile: Option<String>, #[arg(long)] /// Target database path (default: platform data directory) db_path: Option<String>, }, /// Import both bookmarks and history from a browser All { /// Browser to import from (e.g. "firefox") source: String, #[arg(short, long)] /// Path to the browser profile directory (auto-discovered if omitted) profile: Option<String>, #[arg(long)] /// Target database path (default: platform data directory) db_path: Option<String>, }, } fn print_bookmark(b: &Bookmark) { let local_time = Local.from_utc_datetime(&b.created_at.naive_utc()); let id_str = format!("#{}", b.id).bold().cyan(); let title = b.title.bold(); let url = b.url.dimmed(); let time = local_time.format("%Y-%m-%d %H:%M:%S").to_string().dimmed(); println!("{id_str} {title} {url}"); if let Some(tags) = b.tags.as_ref().filter(|t| !t.is_empty()) { let tag_parts: Vec<_> = tags.split(", ").filter_map(|t| { let trimmed = t.trim(); if trimmed.is_empty() { None } else { Some(format!("[{trimmed}]").yellow().to_string()) } }).collect(); if !tag_parts.is_empty() { println!(" {} {}", time, tag_parts.join(" ")); } } else { println!(" {time}"); } } fn expand_path(s: &str) -> String { let home = std::env::var("HOME").unwrap_or_default(); let user = std::env::var("USER").unwrap_or_default(); let s = s.replace("~", &home); let s = s.replace("$HOME", &home); s.replace("$USER", &user) } fn resolve_db_path(cli_path: Option<String>, config_db_path: Option<&str>) -> PathBuf { if let Some(p) = cli_path { PathBuf::from(p) } else if let Some(p) = config_db_path { PathBuf::from(expand_path(p)) } else { storage::default_db_path() } } #[tokio::main] async fn main() { // Increase thread stack size so blocking tasks (fastembed ONNX model, // htmd HTML conversion) don't overflow on deep call trees. std::env::set_var("RUST_MIN_STACK", "8388608"); let args = Args::parse(); match &args.command { Command::Serve { .. } => { tracing_subscriber::fmt() .with_target(false) .init(); } _ => { tracing_subscriber::fmt() .with_target(false) .without_time() .init(); } } let config_path = args.config.as_ref().map(PathBuf::from); let config = match &config_path { Some(p) => Config::load_from(p), None => Config::load(), }; let engines = config.engines.clone(); let tagging_enabled = config.tagging_enabled.unwrap_or(false); let tag_threshold: f32 = config.tagging_threshold.map(|t| t as f32).unwrap_or(0.60); let exclude_hosts: Vec<String> = config.exclude_urls.clone().unwrap_or_else(|| { vec!["localhost".into(), "127.0.0.1".into(), "::1".into()] }); let tags: Vec<TagDef> = if config.tags.is_empty() { default_tags() } else { config.tags.clone() }; let bind_address = config.bind_address.clone().unwrap_or_else(|| "127.0.0.1".into()); let page_size = config.page_size.unwrap_or(20); let workers = config.workers.unwrap_or(2); let onnx_model = config.onnx_model.clone().unwrap_or_else(|| "BGESmallENV15".into()); let truncation = config.truncation.unwrap_or(2000); let max_tags = config.max_tags.unwrap_or(5); let cache_dir = directories::ProjectDirs::from("com", "search_hub", "search_hub") .map(|d| d.cache_dir().to_path_buf()) .unwrap_or_else(|| { let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()); PathBuf::from(home).join(".cache").join("search_hub") }); let _ = fs::create_dir_all(&cache_dir); let cache_path = cache_dir.join("robots_cache.json"); match args.command { Command::Serve { port, db_path } => { let db_path = resolve_db_path(db_path, config.db_path.as_deref()); let engines = Arc::new(RwLock::new(engines)); info!("Starting server on {}:{}", bind_address, port); let srv_cfg = web::ServerConfig { port, bind_address, page_size, workers }; let reload_engines = engines.clone(); let config_path = config_path.clone(); tokio::spawn(async move { use tokio::signal::unix::{signal, SignalKind}; let mut sig = signal(SignalKind::hangup()).expect("failed to register SIGHUP"); loop { sig.recv().await; info!("SIGHUP received, reloading engines from config"); let new_config = match &config_path { Some(p) => Config::load_from(p), None => Config::load(), }; let mut guard = reload_engines.write().unwrap(); *guard = new_config.engines; info!("Reloaded {} engines from config", guard.len()); } }); // Panics if two engines share the same shortcode (config error). let shortcuts = config.resolve_shortcuts(); if let Err(e) = web::run_server(&db_path.to_string_lossy(), srv_cfg, engines, shortcuts).await { error!("Server error: {}", e); } } Command::Insert { title, url, description, db_path } => { let db_path = resolve_db_path(db_path, config.db_path.as_deref()); let conn = storage::init_db(&db_path.to_string_lossy()).expect("Failed to open database"); if url::Url::parse(&url).is_err() { eprintln!("Error: invalid URL '{url}'"); return; } let mut fetcher = match Fetcher::new(cache_path.clone()) { Ok(f) => f, Err(e) => { eprintln!("Warning: failed to create HTTP client: {e}"); let bookmark = Bookmark { id: 0, title, url, description, source: "bookmark".into(), content: None, tags: None, created_at: Utc::now(), }; storage::insert_bookmark(&conn, &bookmark).expect("Failed to insert bookmark"); println!("Bookmark inserted successfully (skipped HTTP fetch)."); return; } }; let content = if is_excluded_url(&url, &exclude_hosts) { info!("skipping fetch for excluded URL: {}", url); None } else { fetch_and_convert(&mut fetcher, &url, None) }; let md = content.as_ref().and_then(|c| { if !tagging_enabled { info!("tagging disabled via config"); return None; } info!("tagging content..."); match TaggingEngine::new(&tags, tag_threshold, max_tags, truncation, &onnx_model) { Ok(mut engine) => { let tags = engine.tags_for(c).unwrap_or_default(); if tags.is_empty() { info!("no tags matched"); None } else { info!("tags: {}", tags.join(", ")); Some(tags.join(", ")) } } Err(e) => { eprintln!("Warning: failed to initialize tagger: {e}"); None } } }); let bookmark = Bookmark { id: 0, title, url, description, source: "bookmark".into(), content, tags: md, created_at: Utc::now(), }; storage::insert_bookmark(&conn, &bookmark).expect("Failed to insert bookmark"); println!("Bookmark inserted successfully."); } Command::List { db_path } => { let db_path = resolve_db_path(db_path, config.db_path.as_deref()); let conn = storage::init_db(&db_path.to_string_lossy()).expect("Failed to open database"); let bookmarks = storage::list_bookmarks(&conn, 1, 10000).expect("Failed to list bookmarks"); for b in bookmarks { print_bookmark(&b); } } Command::Remove { id, db_path } => { let db_path = resolve_db_path(db_path, config.db_path.as_deref()); let conn = storage::init_db(&db_path.to_string_lossy()).expect("Failed to open database"); storage::delete_bookmark(&conn, id).expect("Failed to delete bookmark"); println!("Bookmark removed successfully."); } Command::Search { query, db_path } => { let db_path = resolve_db_path(db_path, config.db_path.as_deref()); let conn = storage::init_db(&db_path.to_string_lossy()).expect("Failed to open database"); let bookmarks = storage::search_bookmarks(&conn, &query, 1, 10000).expect("Failed to search"); if bookmarks.is_empty() { println!("No results for \"{query}\""); return; } println!("{} result{} for \"{}\":", bookmarks.len(), if bookmarks.len() == 1 { "" } else { "s" }, query); println!(); for b in bookmarks { print_bookmark(&b); } } Command::Retag { id, all, db_path } => { if !tagging_enabled { println!("Tagging is disabled in config. Enable it with tagging_enabled = true."); return; } let db_path = resolve_db_path(db_path, config.db_path.as_deref()); let conn = storage::init_db(&db_path.to_string_lossy()).expect("Failed to open database"); let mut engine = match TaggingEngine::new(&tags, tag_threshold, max_tags, truncation, &onnx_model) { Ok(e) => e, Err(e) => { eprintln!("Warning: failed to initialize tagger: {e}"); return; } }; let ids: Vec<i64> = if all { let bookmarks = storage::list_bookmarks(&conn, 1, 10000).expect("Failed to list bookmarks"); bookmarks.into_iter() .filter(|b| b.content.is_some()) .map(|b| b.id as i64) .collect() } else if !id.is_empty() { id } else { let bookmarks = storage::list_bookmarks(&conn, 1, 10000).expect("Failed to list bookmarks"); if bookmarks.is_empty() { println!("No bookmarks found."); return; } println!("Bookmarks:"); for (i, b) in bookmarks.iter().enumerate() { let local_time = Local.from_utc_datetime(&b.created_at.naive_utc()); let time = local_time.format("%Y-%m-%d %H:%M").to_string(); let tags = b.tags.as_deref().unwrap_or("(no tags)"); println!(" {}. #{} {} {} [{}]", i + 1, b.id, b.title, time, tags); } println!(); print!("Enter IDs (comma-separated) or 'all': "); std::io::Write::flush(&mut std::io::stdout()).unwrap(); let mut input = String::new(); std::io::stdin().read_line(&mut input).unwrap(); let input = input.trim(); if input.is_empty() { println!("No IDs entered."); return; } if input == "all" { bookmarks.into_iter() .filter(|b| b.content.is_some()) .map(|b| b.id as i64) .collect() } else { input.split(',') .filter_map(|s| s.trim().parse::<i64>().ok()) .collect() } }; if ids.is_empty() { println!("No bookmarks to retag."); return; } let pb = ProgressBar::new(ids.len() as u64); pb.set_style( ProgressStyle::default_bar() .template("{spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} {msg}") .unwrap() .progress_chars("##-"), ); let mut tagged = 0u64; let mut skipped = 0u64; for rowid in &ids { match storage::get_bookmark(&conn, *rowid) { Ok(Some(b)) => { match b.content { Some(ref content) => { match engine.tags_for(content) { Ok(tags) => { let tags_str = if tags.is_empty() { None } else { Some(tags.join(", ")) }; storage::update_bookmark_tags(&conn, *rowid, tags_str.as_deref()) .unwrap_or_else(|e| eprintln!("Warning: failed to update tags: {e}")); tagged += 1; pb.set_message(format!("{} tagged", "✓")); } Err(e) => { eprintln!("Warning: tagging failed for #{rowid}: {e}"); skipped += 1; pb.set_message(format!("{} failed", "✘")); } } } None => { eprintln!("Warning: #{rowid} has no content, skipping"); skipped += 1; pb.set_message(format!("{} no content", "✘")); } } } Ok(None) => { eprintln!("Warning: bookmark #{rowid} not found"); skipped += 1; } Err(e) => { eprintln!("Warning: failed to read bookmark #{rowid}: {e}"); skipped += 1; } } pb.inc(1); } pb.finish_with_message(format!("{} {} tagged, {} {} skipped", "✔", tagged, "✘", skipped)); } Command::InitConfig => { let path = config_path.clone().unwrap_or_else(config_file_path); if path.exists() { eprintln!("Config file already exists at {path:?}"); return; } let default_db = storage::default_db_path(); let home = std::env::var("HOME").unwrap_or_default(); let default_display = default_db.to_string_lossy(); let default_display = if let Some(rest) = default_display.strip_prefix(&home) { format!("~{rest}") } else { default_display.to_string() }; let content = format!( "# SearchHub configuration\n\ \n\ # Bookmark database path (default: platform data directory)\n\ # db_path = \"{default_display}\"\n\ \n\ # Custom tags override the built-in defaults\n\ # [[tags]]\n\ # name = \"my-custom-tag\"\n\ # examples = [\"example text one\", \"example text two\"]\n\ \n\ \n\ # Whether auto-tagging is enabled (default: false)\n\ # tagging_enabled = true\n\ \n\ # Minimum confidence for auto-tagging (0.0 to 1.0, default: 0.6)\n\ # tagging_threshold = 0.6\n\ \n\ # Hosts to skip when fetching content for bookmarking (default: localhost addresses)\n\ # exclude_urls = [\"localhost\", \"127.0.0.1\", \"::1\"]\n\ \n\ # Per-engine configuration (optional)\n\ # [[engines]]\n\ # type = \"searxng\"\n\ # instance = \"https://search.kael.ink\"\n\ # Best: use an existing public instance (see https://searx.space).\n\ # Also possible: run your own with Docker:\n\ # docker run -d --name searxng -p 8888:8080 searxng/searxng\n" ); if let Some(parent) = path.parent() { tokio::fs::create_dir_all(parent).await.expect("Failed to create config directory"); } tokio::fs::write(&path, content).await.expect("Failed to write config file"); println!("Default config created at {path:?}"); } Command::SelfUpdate { feed_url, target, dry_run } => { let mut updater = search_hub::self_update::SelfUpdate::new() .dry_run(dry_run); if let Some(url) = feed_url { updater = updater.with_feed_url(url.clone()); } if let Some(t) = target { updater = updater.with_target(t.clone()); } if let Err(e) = updater.run().await { error!("Self-update failed: {}", e); } } Command::Import { action } => { match action { ImportAction::Bookmarks { source, profile, db_path } => { let db_path = resolve_db_path(db_path, config.db_path.as_deref()); run_import(&source, profile, &db_path.to_string_lossy(), &config, ImportKind::Bookmarks, cache_path.clone()).await; } ImportAction::History { source, profile, db_path } => { let db_path = resolve_db_path(db_path, config.db_path.as_deref()); run_import(&source, profile, &db_path.to_string_lossy(), &config, ImportKind::History, cache_path.clone()).await; } ImportAction::All { source, profile, db_path } => { let db_path = resolve_db_path(db_path, config.db_path.as_deref()); run_import(&source, profile, &db_path.to_string_lossy(), &config, ImportKind::All, cache_path.clone()).await; } } } } } enum ImportKind { Bookmarks, History, All } fn resolve_profiles(importer: &(impl Importer + ?Sized), profile: Option<String>) -> Vec<PathBuf> { match profile { Some(p) => vec![PathBuf::from(p)], None => { let profiles = importer.discover_profiles(); if profiles.is_empty() { eprintln!("No {} profile found. Specify --profile.", importer.name()); std::process::exit(1); } if profiles.len() == 1 { println!("Using {} profile: {:?}", importer.name(), profiles[0]); profiles } else { println!("Found {} {} profiles:", profiles.len(), importer.name()); for (i, p) in profiles.iter().enumerate() { println!(" {}. {:?}", i + 1, p); } println!(); print!("Enter numbers (comma-separated) or 'all': "); std::io::Write::flush(&mut std::io::stdout()).unwrap(); let mut input = String::new(); std::io::stdin().read_line(&mut input).unwrap(); let input = input.trim(); if input.is_empty() { eprintln!("No profiles selected."); std::process::exit(1); } if input == "all" { profiles } else { let selected: Vec<PathBuf> = input.split(',') .filter_map(|s| { let idx: usize = s.trim().parse().ok()?; if idx >= 1 && idx <= profiles.len() { Some(profiles[idx - 1].clone()) } else { None } }) .collect(); if selected.is_empty() { eprintln!("No valid profiles selected."); std::process::exit(1); } selected } } } } } async fn run_import(source: &str, profile: Option<String>, db_path: &str, config: &Config, kind: ImportKind, cache_path: PathBuf) { let tagging_enabled = config.tagging_enabled.unwrap_or(false); let tag_threshold: f32 = config.tagging_threshold.map(|t| t as f32).unwrap_or(0.60); let tags: Vec<TagDef> = if config.tags.is_empty() { default_tags() } else { config.tags.clone() }; let exclude_hosts: Vec<String> = config.exclude_urls.clone().unwrap_or_else(|| { vec!["localhost".into(), "127.0.0.1".into(), "::1".into()] }); let max_tags = config.max_tags.unwrap_or(5); let truncation = config.truncation.unwrap_or(2000); let onnx_model = config.onnx_model.clone().unwrap_or_else(|| "BGESmallENV15".into()); let importer: Box<dyn Importer> = match source { "firefox" => Box::new(FirefoxImporter), "zen" => Box::new(ZenImporter), "chrome" | "chromium" => Box::new(ChromeImporter), other => { eprintln!("Unknown browser source: '{other}'. Supported: firefox, zen, chrome, chromium"); std::process::exit(1); } }; let label = match kind { ImportKind::Bookmarks => "bookmarks", ImportKind::History => "history", ImportKind::All => "bookmarks and history", }; let selected_profiles = resolve_profiles(importer.as_ref(), profile); let conn = storage::init_db(db_path) .expect("Failed to open target database"); let mut all_entries = Vec::new(); for profile_path in &selected_profiles { let results: [anyhow::Result<Vec<Bookmark>>; 2] = match kind { ImportKind::Bookmarks => [importer.import(profile_path), Ok(Vec::new())], ImportKind::History => [importer.import_history(profile_path), Ok(Vec::new())], ImportKind::All => [importer.import(profile_path), importer.import_history(profile_path)], }; for result in results { match result { Ok(mut entries) => all_entries.append(&mut entries), Err(e) => { eprintln!("Warning: failed to import {label} from {profile_path:?}: {e}"); } } } } for entry in &mut all_entries { entry.url = strip_fragment(&entry.url); } let total = all_entries.len(); if total == 0 { println!("No {label} found to import."); return; } // First pass: validate, deduplicate, insert let pb = ProgressBar::new(total as u64); pb.enable_steady_tick(std::time::Duration::from_millis(100)); pb.set_style( ProgressStyle::default_bar() .template("{spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} {msg}") .unwrap() .progress_chars("##-"), ); let mut imported = 0u64; let mut skipped = 0u64; let mut invalid = 0u64; let mut pending: Vec<(i64, String)> = Vec::new(); for entry in all_entries { if url::Url::parse(&entry.url).is_err() { invalid += 1; pb.set_message(format!("{} invalid url", "✗")); } else if storage::bookmark_exists_by_url(&conn, &entry.url).unwrap_or(false) { skipped += 1; pb.set_message(format!("{} skipped", "✗")); } else { let rowid = storage::insert_bookmark(&conn, &entry) .expect("Failed to insert entry"); imported += 1; pending.push((rowid, entry.url.clone())); pb.set_message(format!("{} inserted", "✓")); } pb.inc(1); } if pending.is_empty() { pb.finish_with_message(format!("{} {} imported, {} {} skipped, {} {} invalid", "✔", imported, "✘", skipped, "⚠", invalid)); return; } pb.finish_and_clear(); // Second pass: parallel fetch + tag via worker pool let pb = ProgressBar::new(pending.len() as u64); pb.enable_steady_tick(std::time::Duration::from_millis(100)); pb.set_style( ProgressStyle::default_bar() .template("{spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} {msg}") .unwrap() .progress_chars("##-"), ); pb.set_message("fetching and tagging..."); let (tx, rx) = std::sync::mpsc::channel::<String>(); let bar = Arc::new(pb.clone()); let db = db_path.to_string(); let num_threads = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(4) .max(1); let chunk_size = pending.len().div_ceil(num_threads); let tasks: Vec<_> = pending.chunks(chunk_size).enumerate().map(|(task_id, chunk)| { let owned: Vec<_> = chunk.to_vec(); let tx = tx.clone(); let bar = bar.clone(); let db = db.clone(); let task_tags = tags.clone(); let task_threshold = tag_threshold; let task_exclude = exclude_hosts.to_vec(); let task_tagging_enabled = tagging_enabled; let task_cache = cache_path.clone(); let task_onnx = onnx_model.clone(); tokio::task::spawn_blocking(move || { let mut fetcher = match Fetcher::new(task_cache) { Ok(f) => f, Err(e) => { let _ = tx.send(format!("[{task_id}] failed to create HTTP client: {e}")); return; } }; let mut tagger = if task_tagging_enabled { TaggingEngine::new(&task_tags, task_threshold, max_tags, truncation, &task_onnx).ok() } else { None }; let conn = storage::init_db(&db) .expect("Failed to open target database"); for (rowid, url) in &owned { if is_excluded_url(url, &task_exclude) { bar.inc(1); continue; } if let Some(md) = fetch_and_convert(&mut fetcher, url, Some(task_id)) { let entry_tags = tagger.as_mut() .and_then(|e| e.tags_for(&md).ok()) .unwrap_or_default(); let tags_str = if entry_tags.is_empty() { None } else { Some(entry_tags.join(", ")) }; storage::update_bookmark_content_tags( &conn, *rowid, Some(&md), tags_str.as_deref(), ).unwrap_or_else(|e| { let _ = tx.send(format!("failed to update bookmark {rowid}: {e}")); }); } bar.inc(1); } }) }).collect(); // Read errors from channel, keep ringbuffer of last 10, print live let error_handle = tokio::task::spawn_blocking(move || { let mut error_buffer: VecDeque<String> = VecDeque::with_capacity(10); while let Ok(err) = rx.recv() { if error_buffer.len() < 10 { error_buffer.push_back(err.clone()); eprintln!("{err}"); } else { error_buffer.pop_front(); error_buffer.push_back(err); } } }); for task in tasks { task.await.unwrap(); } drop(tx); error_handle.await.unwrap(); pb.finish_with_message(format!("{} {} imported, {} {} skipped, {} {} invalid", "✔", imported, "✘", skipped, "⚠", invalid)); } fn strip_fragment(url: &str) -> String { url::Url::parse(url) .ok() .map(|mut u| { u.set_fragment(None); u.into() }) .unwrap_or_else(|| url.to_string()) } fn is_excluded_url(url: &str, exclude_hosts: &[String]) -> bool { url::Url::parse(url) .ok() .and_then(|u| match u.host() { Some(url::Host::Domain(h)) => Some(h.to_lowercase()), Some(url::Host::Ipv4(ip)) => Some(ip.to_string()), Some(url::Host::Ipv6(ip)) => Some(ip.to_string()), None => None, }) .map(|host| exclude_hosts.contains(&host)) .unwrap_or(false) } fn fetch_and_convert(fetcher: &mut Fetcher, url: &str, task_id: Option<usize>) -> Option<String> { let prefix = task_id.map(|id| format!("[importer-{id}] ")).unwrap_or_default(); match fetcher.fetch(url) { Ok(html) => match htmd::convert(&html) { Ok(md) => Some(md), Err(e) => { eprintln!("{prefix}Warning: failed to convert HTML to Markdown: {e}"); None } }, Err(e) => { eprintln!("{prefix}Warning: failed to fetch URL '{url}': {e}"); None } } } #[cfg(test)] mod tests { use super::*; use std::io::{Read, Write}; use std::net::TcpListener; fn test_server() -> (TcpListener, u16) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let port = listener.local_addr().unwrap().port(); (listener, port) } fn handle_connection(mut stream: std::net::TcpStream) { use std::time::Duration; let mut buf = [0; 4096]; if stream.read(&mut buf).is_err() { return; } let request = String::from_utf8_lossy(&buf[..]); let (status, body) = if request.contains("/robots.txt") { ("200 OK", "User-agent: *\nDisallow: /private/\n") } else if request.contains("/private/") { ("200 OK", "<html><body>private</body></html>") } else { ("200 OK", "<html><body>hello</body></html>") }; let response = format!( "HTTP/1.1 {status}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}", body.len() ); let _ = stream.write_all(response.as_bytes()); let _ = stream.flush(); // Give the client time to consume the body before we close std::thread::sleep(Duration::from_millis(10)); } #[test] fn fetcher_fetch_basic_page() { let (listener, port) = test_server(); std::thread::spawn(move || { for stream in listener.incoming().flatten() { handle_connection(stream); } }); let cache_dir = tempfile::tempdir().unwrap(); let cache_path = cache_dir.path().join("robots_cache.json"); let mut fetcher = Fetcher::new(cache_path).unwrap(); let result = fetcher.fetch(&format!("http://127.0.0.1:{port}/page")); assert!(result.is_ok(), "fetch should succeed: {result:?}"); assert!(result.unwrap().contains("hello")); } #[test] fn fetcher_blocks_disallowed_path() { let (listener, port) = test_server(); std::thread::spawn(move || { for stream in listener.incoming().flatten() { handle_connection(stream); } }); let cache_dir = tempfile::tempdir().unwrap(); let cache_path = cache_dir.path().join("robots_cache.json"); let mut fetcher = Fetcher::new(cache_path).unwrap(); // First request fetches robots.txt let result = fetcher.fetch(&format!("http://127.0.0.1:{port}/page")); assert!(result.is_ok(), "first fetch should succeed: {result:?}"); // Second request to disallowed path should be blocked let result = fetcher.fetch(&format!("http://127.0.0.1:{port}/private/page")); assert!(result.is_err(), "disallowed path should be blocked"); assert!(result.unwrap_err().contains("blocked by robots.txt")); } #[test] fn fetcher_caches_robots_txt() { let (listener, port) = test_server(); let request_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); let count = request_count.clone(); std::thread::spawn(move || { for stream in listener.incoming().flatten() { let mut stream = stream; count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let mut buf = [0; 4096]; let _ = stream.read(&mut buf); let request = String::from_utf8_lossy(&buf[..]); let (status, body) = if request.contains("/robots.txt") { ("200 OK", "User-agent: *\nDisallow:\n") } else { ("200 OK", "<html><body>hello</body></html>") }; let response = format!( "HTTP/1.1 {status}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}", body.len() ); let _ = stream.write_all(response.as_bytes()); let _ = stream.flush(); } }); let cache_dir = tempfile::tempdir().unwrap(); let cache_path = cache_dir.path().join("robots_cache.json"); let mut fetcher = Fetcher::new(cache_path).unwrap(); // First fetch fetches robots.txt + page assert!(fetcher.fetch(&format!("http://127.0.0.1:{port}/page")).is_ok()); let first_count = request_count.load(std::sync::atomic::Ordering::SeqCst); // Second fetch to same domain should NOT re-fetch robots.txt (use cached) assert!(fetcher.fetch(&format!("http://127.0.0.1:{port}/other")).is_ok()); let second_count = request_count.load(std::sync::atomic::Ordering::SeqCst); assert_eq!(second_count, first_count + 1, "only one additional HTTP request (not two)"); } #[test] fn strip_fragment_removes_hash() { assert_eq!(strip_fragment("https://example.com/page#section"), "https://example.com/page"); } #[test] fn strip_fragment_no_change_without_hash() { assert_eq!(strip_fragment("https://example.com/page"), "https://example.com/page"); } #[test] fn strip_fragment_keeps_query() { assert_eq!(strip_fragment("https://example.com/page?q=test#section"), "https://example.com/page?q=test"); } #[test] fn strip_fragment_empty_fragment() { assert_eq!(strip_fragment("https://example.com/page#"), "https://example.com/page"); } #[test] fn strip_fragment_invalid_url_unchanged() { assert_eq!(strip_fragment("not a url"), "not a url"); } #[test] fn is_excluded_matches_localhost() { let hosts = vec!["localhost".into(), "127.0.0.1".into(), "::1".into()]; assert!(is_excluded_url("http://localhost:8080/page", &hosts)); assert!(is_excluded_url("http://127.0.0.1:3000/", &hosts)); assert!(is_excluded_url("http://[::1]:8080/page", &hosts)); } #[test] fn is_excluded_does_not_match_real_domains() { let hosts = vec!["localhost".into(), "127.0.0.1".into(), "::1".into()]; assert!(!is_excluded_url("https://example.com", &hosts)); assert!(!is_excluded_url("https://rust-lang.org", &hosts)); } #[test] fn is_excluded_empty_list_allows_all() { let hosts: Vec<String> = vec![]; assert!(!is_excluded_url("http://localhost:8080/", &hosts)); } #[test] fn is_excluded_custom_hosts() { let hosts = vec!["my-internal.dev".into()]; assert!(is_excluded_url("http://my-internal.dev/api", &hosts)); assert!(!is_excluded_url("https://example.com", &hosts)); } }