diff --git a/apps/web/scripts/fill-training-dataset.ts b/apps/web/scripts/fill-training-dataset.ts index fe16aaf..163d51d 100644 --- a/apps/web/scripts/fill-training-dataset.ts +++ b/apps/web/scripts/fill-training-dataset.ts @@ -9,6 +9,13 @@ * Each run scans the directory, reports deficits, then fills them. * Interrupt-safe: re-run to pick up where you left off. * + * Parallelism strategy: + * - Disease-level: 30 diseases processed concurrently + * - Per disease: all 3 DDG queries run in parallel + * - Per query: all search pages fetched in parallel + * - Per disease: DDG, iNaturalist, and Wikimedia Commons all run concurrently + * - A shared DDG token-bucket rate limiter prevents bans + * * Usage: cd apps/web && npx tsx scripts/fill-training-dataset.ts */ @@ -35,7 +42,6 @@ try { import { getDb, closeDb } from "@/lib/db/index"; import { diseases } from "@/lib/db/schema"; -import { sql } from "drizzle-orm"; // ─── Config ───────────────────────────────────────────────────────────────── @@ -48,15 +54,25 @@ const TARGET_PER_DISEASE = 200; /** Target images for the "healthy" class */ const TARGET_HEALTHY = 400; -/** Delay between DuckDuckGo search API calls (ms) */ -const SEARCH_DELAY = 1500; +/** + * How many diseases to process in parallel. + * Each disease is I/O-bound (HTTP requests), so high concurrency is safe. + * The global DDG rate limiter prevents us from overwhelming DuckDuckGo. + */ +const DISEASE_CONCURRENCY = 30; + +/** + * Max DDG requests per second (shared across all concurrent diseases). + * DuckDuckGo is fairly tolerant, but we still want to be polite. + * With DISEASE_CONCURRENCY=30, each disease fires 3 parallel queries with + * parallel pages = 9 parallel DDG requests per disease at peak. + * The rate limiter serializes this so we don't get banned. + */ +const DDG_RATE_LIMIT_RPS = 15; /** Max concurrent image downloads per disease */ const CONCURRENT_DOWNLOADS = 30; -/** Number of diseases to process in parallel */ -const DISEASE_CONCURRENCY = 5; - /** Minimum image size in bytes to accept */ const MIN_IMAGE_SIZE = 10_000; // 10KB @@ -73,6 +89,17 @@ const UA = /** Healthy class directory name */ const HEALTHY_CLASS = "healthy"; +/** How often (in diseases processed) to flush the seen-URLs cache to disk */ +const SEEN_CACHE_FLUSH_INTERVAL = 20; + +/** Max DDG pages to fetch per query. + * Each page returns ~100 image results, so 3 pages × 3 queries = ~900 raw URLs + * before dedup — more than enough to find 200 unique, valid images. */ +const MAX_DDG_PAGES = 3; + +/** Healthy source queries limit */ +const MAX_HEALTHY_QUERIES = 20; + // ─── Types ────────────────────────────────────────────────────────────────── interface DuckDuckGoImageResult { @@ -92,6 +119,53 @@ interface DiseaseInfo { needed: number; } +interface CollectResult { + urls: string[]; + exhausted: boolean; +} + +// ─── Token-Bucket Rate Limiter ────────────────────────────────────────────── + +class TokenBucket { + private tokens: number; + private lastRefill: number; + private readonly capacity: number; + private readonly refillInterval: number; // ms per token (e.g., 100ms for 10 rps) + + constructor(rps: number) { + this.capacity = rps; + this.tokens = rps; + this.lastRefill = Date.now(); + this.refillInterval = 1000 / rps; + } + + /** Acquire one token, blocking until one is available. */ + async acquire(): Promise { + while (true) { + this.refill(); + if (this.tokens >= 1) { + this.tokens -= 1; + return; + } + // No tokens — wait for the next one to arrive, then retry + await sleep(Math.ceil(this.refillInterval)); + } + } + + private refill(): void { + const now = Date.now(); + const elapsed = now - this.lastRefill; + const newTokens = Math.floor(elapsed / this.refillInterval); + if (newTokens > 0) { + this.tokens = Math.min(this.capacity, this.tokens + newTokens); + this.lastRefill = now - (elapsed % this.refillInterval); + } + } +} + +// Global DDG rate limiter — all concurrent diseases share this +const ddgLimiter = new TokenBucket(DDG_RATE_LIMIT_RPS); + // ─── Helpers ──────────────────────────────────────────────────────────────── function sleep(ms: number): Promise { @@ -109,13 +183,6 @@ function countImagesInDir(dir: string): number { } } -/** Format bytes for display */ -function formatBytes(bytes: number): string { - if (bytes < 1024) return `${bytes} B`; - if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`; - return `${(bytes / (1024 * 1024)).toFixed(1)} MB`; -} - // ─── Seen-URLs Cache ────────────────────────────────────────────────────── /** @@ -138,9 +205,38 @@ function saveSeenUrlsCache(cache: Record): void { writeFileSync(SEEN_CACHE_FILE, JSON.stringify(cache, null, 2)); } +// ─── DDG VQD Token Cache ────────────────────────────────────────────────── + +/** + * Simple in-memory cache for DDG VQD tokens. + * Tokens are per-query, but if we've fetched one for a similar query recently, + * we can skip the initial HTML page fetch. + */ +const vqdCache = new Map(); + +function getCachedVqd(query: string): string | undefined { + const entry = vqdCache.get(query); + if (entry && entry.expiresAt > Date.now()) return entry.token; + vqdCache.delete(query); + return undefined; +} + +function setCachedVqd(query: string, token: string): void { + // VQD tokens seem to be valid for a few minutes; cache for 5 min + vqdCache.set(query, { token, expiresAt: Date.now() + 5 * 60 * 1000 }); + // Evict oldest entries if cache grows too large (unlikely but safe) + if (vqdCache.size > 500) { + const firstKey = vqdCache.keys().next().value; + if (firstKey) vqdCache.delete(firstKey); + } +} + // ─── DuckDuckGo API ───────────────────────────────────────────────────────── async function getVqdToken(query: string): Promise { + const cached = getCachedVqd(query); + if (cached) return cached; + const url = `https://duckduckgo.com/?q=${encodeURIComponent(query)}&t=h_&iax=images&ia=images`; const res = await fetch(url, { @@ -154,6 +250,7 @@ async function getVqdToken(query: string): Promise { const match = html.match(/vqd['"]?\s*[:=]\s*['"]([a-f0-9-]+)['"]/); if (!match) throw new Error(`Could not extract vqd token for "${query}"`); + setCachedVqd(query, match[1]); return match[1]; } @@ -162,6 +259,9 @@ async function searchImagesDuckDuckGo( vqd: string, page: number, ): Promise { + // Rate-limit before making the request + await ddgLimiter.acquire(); + const url = `https://duckduckgo.com/i.js?q=${encodeURIComponent( query, )}&vqd=${vqd}&o=json&p=${page}&f=,,,`; @@ -177,27 +277,29 @@ async function searchImagesDuckDuckGo( if (!res.ok) { if (res.status === 429) { - console.warn(" ⚠ DDG rate limited (429). Waiting 10s..."); - await sleep(10_000); + // Rate limited — wait and retry once + await sleep(5_000); return searchImagesDuckDuckGo(query, vqd, page); } if (res.status === 403) return []; - throw new Error(`DuckDuckGo search failed: ${res.status}`); + // Don't throw for transient errors — just return empty + return []; } const data = (await res.json()) as { results: DuckDuckGoImageResult[] }; return data.results ?? []; } -async function collectImagesDuckDuckGo( +/** + * Collect images from DDG for a single query. + * Fetches up to MAX_DDG_PAGES pages in PARALLEL (rate-limited via ddgLimiter). + */ +async function collectFromDdgQuery( query: string, target: number, seenUrls: Set, -): Promise<{ urls: string[]; exhausted: boolean }> { +): Promise { const results: string[] = []; - let page = 1; - let exhausted = false; - let consecutiveEmpty = 0; let vqd: string; try { @@ -207,34 +309,19 @@ async function collectImagesDuckDuckGo( return { urls: [], exhausted: true }; } - const MAX_PAGES = 5; - let lowNoveltyCount = 0; + // Fetch all pages in parallel + const pageFetches: Promise[] = []; + for (let page = 1; page <= MAX_DDG_PAGES; page++) { + pageFetches.push(searchImagesDuckDuckGo(query, vqd, page)); + } - while (results.length < target && page <= MAX_PAGES) { - await sleep(SEARCH_DELAY); + const pageResults = await Promise.allSettled(pageFetches); - let pageResults: DuckDuckGoImageResult[]; - try { - pageResults = await searchImagesDuckDuckGo(query, vqd, page); - } catch (err) { - console.warn(` ⚠ DDG error: ${err instanceof Error ? err.message : "unknown"}`); - break; - } + for (const settled of pageResults) { + if (settled.status !== "fulfilled") continue; + if (results.length >= target) break; - if (!pageResults || pageResults.length === 0) { - consecutiveEmpty++; - if (consecutiveEmpty >= 3) { - exhausted = true; - break; - } - page++; - continue; - } - - consecutiveEmpty = 0; - let newCount = 0; - - for (const r of pageResults) { + for (const r of settled.value) { if (results.length >= target) break; const imgUrl = r.image || r.url; if (!imgUrl || typeof imgUrl !== "string") continue; @@ -246,21 +333,36 @@ async function collectImagesDuckDuckGo( } seenUrls.add(imgUrl); results.push(imgUrl); - newCount++; } - - const newRatio = newCount / pageResults.length; - if (newRatio < 0.05) { - lowNoveltyCount++; - if (lowNoveltyCount >= 2) break; - } else { - lowNoveltyCount = 0; - } - - if (results.length < target) page++; } - return { urls: results.slice(0, target), exhausted }; + return { urls: results.slice(0, target), exhausted: results.length < target }; +} + +/** + * Collect images from DDG across ALL queries for a disease. + * Runs all queries in PARALLEL, then merges deduplicated results. + */ +async function collectImagesDuckDuckGo( + queries: string[], + target: number, + seenUrls: Set, +): Promise<{ urls: string[]; exhausted: boolean }> { + // Run all queries in parallel + const queryResults = await Promise.allSettled( + queries.map((q) => collectFromDdgQuery(q, target, seenUrls)), + ); + + // Merge results — seenUrls already deduplicates across queries + const merged: string[] = []; + for (const settled of queryResults) { + if (settled.status === "fulfilled") { + merged.push(...settled.value.urls); + if (merged.length >= target) break; + } + } + + return { urls: merged.slice(0, target), exhausted: merged.length < target }; } // ─── iNaturalist API ─────────────────────────────────────────────────────── @@ -269,7 +371,7 @@ async function searchImagesInaturalist( query: string, target: number, seenUrls: Set, -): Promise<{ urls: string[]; exhausted: boolean }> { +): Promise { const results: string[] = []; const perPage = Math.min(target, 200); @@ -316,7 +418,7 @@ async function searchImagesCommons( query: string, target: number, seenUrls: Set, -): Promise<{ urls: string[]; exhausted: boolean }> { +): Promise { const results: string[] = []; let sroffset = 0; @@ -374,7 +476,7 @@ async function downloadImage(url: string, destPath: string): Promise { try { const res = await fetch(url, { headers: { "User-Agent": UA, Accept: "image/webp,image/png,image/jpeg,*/*" }, - signal: AbortSignal.timeout(15_000), + signal: AbortSignal.timeout(8_000), }); if (!res.ok) return false; @@ -426,13 +528,7 @@ async function downloadBatch( if (r.success) downloaded++; else failed++; } - - const total = downloaded + failed; - if (total % 30 === 0 || total === urls.length) { - process.stdout.write(`\r Progress: ${downloaded}/${urls.length} (${failed} failed)`); - } } - console.log(); return { downloaded, failed, lastIndex: index }; } @@ -457,10 +553,14 @@ function buildHealthyQueries(plant: string): string[] { /** * Try to collect up to `needed` images for a disease by hitting all three - * sources in order. Returns how many new images were actually downloaded. + * sources IN PARALLEL. Returns how many new images were actually downloaded. + * + * Sources (DDG with its 3 internal queries, iNat, Commons) all run concurrently. + * As soon as any source completes, its URLs are downloaded immediately while + * other sources are still searching (pipeline). */ async function fillClass( - diseaseId: string, + _diseaseId: string, queries: string[], needed: number, classDir: string, @@ -469,51 +569,63 @@ async function fillClass( if (needed <= 0) return 0; mkdirSync(classDir, { recursive: true }); + const startCount = countImagesInDir(classDir); - const allUrls: string[] = []; + // ── Run all sources in parallel, pipelining downloads ────────────────── + // Start downloading from each source as soon as it returns results, rather + // than waiting for all sources to complete. DDG is (by far) the richest + // source, so its results start saving to disk while iNat and Commons are + // still searching. + // + // Each source gets a DEDICATED index range so there's no race condition + // writing files. DDG gets [startCount, startCount+199], iNat gets + // [startCount+200, startCount+399], Commons gets [startCount+400,...]. + // The 4-digit filename supports up to 9999, well beyond our 200 target. - // ── Source 1: DuckDuckGo ─────────────────────────────────────────────── - if (allUrls.length < needed) { - for (const query of queries) { - if (allUrls.length >= needed) break; - process.stdout.write(` DDG: "${query.substring(0, 40)}"... `); - const result = await collectImagesDuckDuckGo(query, needed - allUrls.length, seenUrls); - allUrls.push(...result.urls); - console.log(`${result.urls.length} new`); - if (result.exhausted) break; - } - } + let totalDownloaded = 0; + let totalFailed = 0; + let anySuccess = false; - // ── Source 2: iNaturalist ────────────────────────────────────────────── - if (allUrls.length < needed) { - process.stdout.write(` iNat: Searching... `); - const result = await searchImagesInaturalist(queries[0], needed - allUrls.length, seenUrls); - allUrls.push(...result.urls); - console.log(`${result.urls.length} new`); - } + const collectAndDownload = async ( + label: string, + collector: () => Promise, + indexOffset: number, + ): Promise => { + const result = await collector(); + if (result.urls.length === 0) return; + console.log(` ${label}: ${result.urls.length} new URLs`); - // ── Source 3: Wikimedia Commons ──────────────────────────────────────── - if (allUrls.length < needed) { - process.stdout.write(` Commons: Searching... `); - const result = await searchImagesCommons(queries[0], needed - allUrls.length, seenUrls); - allUrls.push(...result.urls); - console.log(`${result.urls.length} new`); - } + // Each source writes to its own non-overlapping range + const { downloaded, failed } = await downloadBatch(result.urls, classDir, indexOffset); + totalDownloaded += downloaded; + totalFailed += failed; + if (downloaded > 0) anySuccess = true; + }; - if (allUrls.length === 0) { + await Promise.allSettled([ + collectAndDownload("DDG", () => collectImagesDuckDuckGo(queries, needed, seenUrls), startCount), + collectAndDownload( + "iNat", + () => searchImagesInaturalist(queries[0], needed, seenUrls), + startCount + TARGET_PER_DISEASE, + ), + collectAndDownload( + "Commons", + () => searchImagesCommons(queries[0], needed, seenUrls), + startCount + 2 * TARGET_PER_DISEASE, + ), + ]); + + if (!anySuccess) { console.log(` ✗ No new images found from any source`); return 0; } - console.log(` Downloading ${allUrls.length} images...`); - const startIndex = countImagesInDir(classDir); - const { downloaded, failed } = await downloadBatch(allUrls, classDir, startIndex); - const newTotal = countImagesInDir(classDir); - const gained = newTotal - startIndex; + const gained = newTotal - startCount; console.log( - ` ${downloaded > 0 ? "✓" : "✗"} Downloaded ${downloaded}/${allUrls.length}` + - ` (${failed} failed, ${gained} new files)`, + ` ✓ ${totalDownloaded}/${totalDownloaded + totalFailed} downloaded` + + ` (${totalFailed} failed, ${gained} new files)`, ); return gained; @@ -559,7 +671,7 @@ function scanDataset(): ScanResult { async function main() { console.log("=".repeat(60)); - console.log("TRAINING DATASET FILL — Gap-filling download"); + console.log("TRAINING DATASET FILL — Parallelized gap-filling download"); console.log("=".repeat(60)); // Ensure dataset directory exists @@ -613,6 +725,8 @@ async function main() { console.log(` Diseases needing images: ${deficits.length}/${diseaseInfo.size}`); console.log(` Total images missing: ${deficits.reduce((s, d) => s + d.needed, 0)}`); console.log(` Healthy deficit: ${Math.max(0, healthyDeficit)}`); + console.log(` Parallelism: ${DISEASE_CONCURRENCY} diseases at once`); + console.log(` DDG rate limit: ${DDG_RATE_LIMIT_RPS} req/s (shared)`); console.log(`${"=".repeat(60)}`); if (deficits.length === 0 && healthyDeficit <= 0) { @@ -625,6 +739,7 @@ async function main() { const seenUrlsCache = loadSeenUrlsCache(); let totalDownloaded = 0; let totalFailed = 0; + let diseasesProcessed = 0; const startTime = Date.now(); // ── Step 5: Fill disease deficits ─────────────────────────────────────── @@ -641,33 +756,62 @@ async function main() { console.log(`\n[Batch ${batchNum}/${totalBatches}] Processing ${batch.length} diseases...`); - await Promise.all( - batch.map(async (d) => { - const classDir = resolve(DATASET_DIR, d.id); - const queries = buildSearchQueries(d.name, d.plantId); - const seen = new Set(seenUrlsCache[d.id] ?? []); + // Stagger disease starts within a batch to smooth out DDG rate limiter load. + // Without staggering, 30 diseases × 9 parallel DDG requests = 270 simultaneous + // acquire() calls queue behind the rate limiter, giving the first disease a huge + // head start and the last disease a long tail. Staggering by 200ms each spreads + // the load evenly, reducing tail latency and improving overall throughput. + const STAGGER_MS = 200; + const batchResults = await Promise.allSettled( + batch.map((d, idx) => + (async () => { + if (idx > 0) await sleep(idx * STAGGER_MS); - console.log( - ` [${d.id}] have ${d.have}, need ${d.needed} more` + ` (${d.name} / ${d.plantId})`, - ); + const classDir = resolve(DATASET_DIR, d.id); + const queries = buildSearchQueries(d.name, d.plantId); + const seen = new Set(seenUrlsCache[d.id] ?? []); - const gained = await fillClass(d.id, queries, d.needed, classDir, seen); + console.log( + ` [${d.id}] have ${d.have}, need ${d.needed} more` + ` (${d.name} / ${d.plantId})`, + ); - // Update seen-URLs cache for this disease - seenUrlsCache[d.id] = Array.from(seen); - saveSeenUrlsCache(seenUrlsCache); + const gained = await fillClass(d.id, queries, d.needed, classDir, seen); - totalDownloaded += gained; - }), + // Update seen-URLs cache for this disease + seenUrlsCache[d.id] = Array.from(seen); + return gained; + })(), + ), ); - // Save seen cache after every batch - saveSeenUrlsCache(seenUrlsCache); + // Aggregate batch results + for (const result of batchResults) { + if (result.status === "fulfilled") { + totalDownloaded += result.value; + } else { + console.error(` ✗ Disease failed: ${result.reason}`); + } + } + + diseasesProcessed += batch.length; + + // Flush seen-URLs cache to disk periodically (not after every disease) + if ( + diseasesProcessed % SEEN_CACHE_FLUSH_INTERVAL < batch.length || + i + batch.length >= deficits.length + ) { + saveSeenUrlsCache(seenUrlsCache); + } const elapsed = Math.round((Date.now() - startTime) / 1000); + const rate = diseasesProcessed / Math.max(1, elapsed); + const remaining = deficits.length - diseasesProcessed; + const eta = remaining / Math.max(0.01, rate); console.log( ` [Batch ${batchNum}/${totalBatches}] checkpoint — ` + - `${totalDownloaded} downloaded so far (${elapsed}s elapsed)`, + `${totalDownloaded} downloaded, ` + + `${diseasesProcessed}/${deficits.length} diseases (${rate.toFixed(1)}/s, ` + + `ETA: ${Math.round(eta)}s)`, ); } } @@ -690,26 +834,22 @@ async function main() { const healthySeen = new Set(seenUrlsCache[HEALTHY_CLASS] ?? []); const healthyNeeded = TARGET_HEALTHY - countImagesInDir(healthyDir); + + // Run all 3 sources in parallel for the healthy class too + const [ddgUrls, inatUrls, commonsUrls] = await Promise.allSettled([ + collectImagesDuckDuckGo( + allHealthyQueries.slice(0, MAX_HEALTHY_QUERIES), + healthyNeeded, + healthySeen, + ), + searchImagesInaturalist(allHealthyQueries[0], healthyNeeded, healthySeen), + searchImagesCommons(allHealthyQueries[0], healthyNeeded, healthySeen), + ]); + const allUrls: string[] = []; - - // Try each source with up to 20 healthy queries - const sources = [ - { name: "DDG", collector: collectImagesDuckDuckGo }, - { name: "iNat", collector: searchImagesInaturalist }, - { name: "Commons", collector: searchImagesCommons }, - ] as const; - - for (const source of sources) { - if (allUrls.length >= healthyNeeded) break; - console.log(`\n Source: ${source.name}`); - - for (const query of allHealthyQueries.slice(0, 20)) { - if (allUrls.length >= healthyNeeded) break; - - process.stdout.write(` "${query}"... `); - const result = await source.collector(query, healthyNeeded - allUrls.length, healthySeen); - allUrls.push(...result.urls); - console.log(`${result.urls.length} new`); + for (const settled of [ddgUrls, inatUrls, commonsUrls]) { + if (settled.status === "fulfilled") { + allUrls.push(...settled.value.urls); } } @@ -763,6 +903,6 @@ async function main() { } main().catch((err) => { - console.error("\nFatal error:", err); + console.error("\nFatal error:", `\n${err}`); process.exit(1); }); diff --git a/apps/web/scripts/verify-images.py b/apps/web/scripts/verify-images.py deleted file mode 100644 index ea5a94a..0000000 --- a/apps/web/scripts/verify-images.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python3 -"""Verify all plant image URLs work.""" -import json -import urllib.parse -import urllib.request -import urllib.error -import time -import sys - -with open('src/data/plants.json') as f: - plants = json.load(f) - -def check_url(url): - time.sleep(0.5) - req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) - try: - resp = urllib.request.urlopen(req, timeout=10) - ct = resp.headers.get('Content-Type', '') - cl = resp.headers.get('Content-Length', '?') - if 'image' in ct: - return (True, f'HTTP {resp.status} {ct} {cl}B') - return (False, f'not image: {ct}') - except urllib.error.HTTPError as e: - return (False, f'HTTP {e.code}') - except Exception as e: - return (False, str(e)) - -all_ok = True -for plant in plants: - ok, msg = check_url(plant['imageUrl']) - status = '✅' if ok else '❌' - if not ok: - all_ok = False - print(f' {plant["id"]:20s} {status} {msg}') - -sys.exit(0 if all_ok else 1)