928 lines
32 KiB
JavaScript
928 lines
32 KiB
JavaScript
#!/usr/bin/env node
|
||
/**
|
||
* fill-training-dataset.ts
|
||
*
|
||
* Scans the existing dataset directory and downloads any missing images
|
||
* to reach the target counts (200 per disease, 400 for healthy).
|
||
*
|
||
* Does NOT re-run prevalence queries — just fills gaps from image sources.
|
||
* Each run scans the directory, reports deficits, then fills them.
|
||
* Interrupt-safe: re-run to pick up where you left off.
|
||
*
|
||
* Parallelism strategy:
|
||
* - Disease-level: 30 diseases processed concurrently
|
||
* - Per disease: all 3 DDG queries run in parallel
|
||
* - Per query: all search pages fetched in parallel
|
||
* - Per disease: DDG, iNaturalist, and Wikimedia Commons all run concurrently
|
||
* - A shared DDG token-bucket rate limiter prevents bans
|
||
*
|
||
* Usage: cd apps/web && npx tsx scripts/fill-training-dataset.ts
|
||
*/
|
||
|
||
import "dotenv/config";
|
||
import { readFileSync, readdirSync, writeFileSync, existsSync, mkdirSync } from "fs";
|
||
import { resolve, extname } from "path";
|
||
|
||
// Load .env.development for DB creds
|
||
const envPath = resolve(__dirname, "../.env.development");
|
||
try {
|
||
const env = readFileSync(envPath, "utf-8");
|
||
for (const line of env.split("\n")) {
|
||
const trimmed = line.trim();
|
||
if (trimmed && !trimmed.startsWith("#")) {
|
||
const eqIdx = trimmed.indexOf("=");
|
||
if (eqIdx > 0) {
|
||
const key = trimmed.slice(0, eqIdx).trim();
|
||
const val = trimmed.slice(eqIdx + 1).trim();
|
||
if (!process.env[key]) process.env[key] = val;
|
||
}
|
||
}
|
||
}
|
||
} catch {}
|
||
|
||
import { getDb, closeDb } from "@/lib/db/index";
|
||
import { diseases } from "@/lib/db/schema";
|
||
|
||
// ─── Config ─────────────────────────────────────────────────────────────────
|
||
|
||
const DATASET_DIR = resolve(__dirname, "../data/dataset");
|
||
const SEEN_CACHE_FILE = resolve(DATASET_DIR, ".fill-seen-urls.json");
|
||
|
||
/** Target images per disease */
|
||
const TARGET_PER_DISEASE = 200;
|
||
|
||
/** Target images for the "healthy" class */
|
||
const TARGET_HEALTHY = 400;
|
||
|
||
/**
|
||
* How many diseases to process in parallel.
|
||
* Each disease is I/O-bound (HTTP requests), so high concurrency is safe.
|
||
* The global DDG rate limiter prevents us from overwhelming DuckDuckGo.
|
||
*/
|
||
const DISEASE_CONCURRENCY = 30;
|
||
|
||
/**
|
||
* Max DDG requests per second (shared across all concurrent diseases).
|
||
* DuckDuckGo is fairly tolerant, but we still want to be polite.
|
||
* With DISEASE_CONCURRENCY=30, each disease fires 3 parallel queries with
|
||
* parallel pages = 9 parallel DDG requests per disease at peak.
|
||
* The rate limiter serializes this so we don't get banned.
|
||
*/
|
||
const DDG_RATE_LIMIT_RPS = 15;
|
||
|
||
/** Max concurrent image downloads per disease */
|
||
const CONCURRENT_DOWNLOADS = 30;
|
||
|
||
/** Minimum image size in bytes to accept */
|
||
const MIN_IMAGE_SIZE = 10_000; // 10KB
|
||
|
||
/** Maximum image size in bytes */
|
||
const MAX_IMAGE_SIZE = 10 * 1024 * 1024; // 10MB
|
||
|
||
/** Allowed file extensions */
|
||
const ALLOWED_EXTENSIONS = [".jpg", ".jpeg", ".png", ".webp"];
|
||
|
||
/** User agent for requests */
|
||
const UA =
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36";
|
||
|
||
/** Healthy class directory name */
|
||
const HEALTHY_CLASS = "healthy";
|
||
|
||
/** How often (in diseases processed) to flush the seen-URLs cache to disk */
|
||
const SEEN_CACHE_FLUSH_INTERVAL = 20;
|
||
|
||
/** Max DDG pages to fetch per query.
|
||
* Each page returns ~100 image results, so 3 pages × 3 queries = ~900 raw URLs
|
||
* before dedup — more than enough to find 200 unique, valid images. */
|
||
const MAX_DDG_PAGES = 3;
|
||
|
||
/** Healthy source queries limit */
|
||
const MAX_HEALTHY_QUERIES = 20;
|
||
|
||
// ─── Types ──────────────────────────────────────────────────────────────────
|
||
|
||
interface DuckDuckGoImageResult {
|
||
image: string;
|
||
title: string;
|
||
url: string;
|
||
thumbnail: string;
|
||
height: number;
|
||
width: number;
|
||
}
|
||
|
||
interface DiseaseInfo {
|
||
id: string;
|
||
name: string;
|
||
plantId: string;
|
||
have: number;
|
||
needed: number;
|
||
}
|
||
|
||
interface CollectResult {
|
||
urls: string[];
|
||
exhausted: boolean;
|
||
}
|
||
|
||
// ─── Token-Bucket Rate Limiter ──────────────────────────────────────────────
|
||
|
||
class TokenBucket {
|
||
private tokens: number;
|
||
private lastRefill: number;
|
||
private readonly capacity: number;
|
||
private readonly refillInterval: number; // ms per token (e.g., 100ms for 10 rps)
|
||
|
||
constructor(rps: number) {
|
||
this.capacity = rps;
|
||
this.tokens = rps;
|
||
this.lastRefill = Date.now();
|
||
this.refillInterval = 1000 / rps;
|
||
}
|
||
|
||
/** Acquire one token, blocking until one is available. */
|
||
async acquire(): Promise<void> {
|
||
while (true) {
|
||
this.refill();
|
||
if (this.tokens >= 1) {
|
||
this.tokens -= 1;
|
||
return;
|
||
}
|
||
// No tokens — wait for the next one to arrive, then retry
|
||
await sleep(Math.ceil(this.refillInterval));
|
||
}
|
||
}
|
||
|
||
private refill(): void {
|
||
const now = Date.now();
|
||
const elapsed = now - this.lastRefill;
|
||
const newTokens = Math.floor(elapsed / this.refillInterval);
|
||
if (newTokens > 0) {
|
||
this.tokens = Math.min(this.capacity, this.tokens + newTokens);
|
||
this.lastRefill = now - (elapsed % this.refillInterval);
|
||
}
|
||
}
|
||
}
|
||
|
||
// Global DDG rate limiter — all concurrent diseases share this
|
||
const ddgLimiter = new TokenBucket(DDG_RATE_LIMIT_RPS);
|
||
|
||
// ─── Helpers ────────────────────────────────────────────────────────────────
|
||
|
||
function sleep(ms: number): Promise<void> {
|
||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||
}
|
||
|
||
/** Count actual image files in a directory (matching img_* pattern). */
|
||
function countImagesInDir(dir: string): number {
|
||
if (!existsSync(dir)) return 0;
|
||
try {
|
||
const files = readdirSync(dir);
|
||
return files.filter((f) => f.startsWith("img_")).length;
|
||
} catch {
|
||
return 0;
|
||
}
|
||
}
|
||
|
||
// ─── Seen-URLs Cache ──────────────────────────────────────────────────────
|
||
|
||
/**
|
||
* Load the per-disease seen-URLs cache from disk.
|
||
* This prevents re-fetching the same URLs across runs.
|
||
*/
|
||
function loadSeenUrlsCache(): Record<string, string[]> {
|
||
if (existsSync(SEEN_CACHE_FILE)) {
|
||
try {
|
||
return JSON.parse(readFileSync(SEEN_CACHE_FILE, "utf-8"));
|
||
} catch {}
|
||
}
|
||
return {};
|
||
}
|
||
|
||
/**
|
||
* Save the seen-URLs cache to disk.
|
||
*/
|
||
function saveSeenUrlsCache(cache: Record<string, string[]>): void {
|
||
writeFileSync(SEEN_CACHE_FILE, JSON.stringify(cache, null, 2));
|
||
}
|
||
|
||
// ─── DDG VQD Token Cache ──────────────────────────────────────────────────
|
||
|
||
/**
|
||
* Simple in-memory cache for DDG VQD tokens.
|
||
* Tokens are per-query, but if we've fetched one for a similar query recently,
|
||
* we can skip the initial HTML page fetch.
|
||
*/
|
||
const vqdCache = new Map<string, { token: string; expiresAt: number }>();
|
||
|
||
function getCachedVqd(query: string): string | undefined {
|
||
const entry = vqdCache.get(query);
|
||
if (entry && entry.expiresAt > Date.now()) return entry.token;
|
||
vqdCache.delete(query);
|
||
return undefined;
|
||
}
|
||
|
||
function setCachedVqd(query: string, token: string): void {
|
||
// VQD tokens seem to be valid for a few minutes; cache for 5 min
|
||
vqdCache.set(query, { token, expiresAt: Date.now() + 5 * 60 * 1000 });
|
||
// Evict oldest entries if cache grows too large (unlikely but safe)
|
||
if (vqdCache.size > 500) {
|
||
const firstKey = vqdCache.keys().next().value;
|
||
if (firstKey) vqdCache.delete(firstKey);
|
||
}
|
||
}
|
||
|
||
// ─── DuckDuckGo API ─────────────────────────────────────────────────────────
|
||
|
||
async function getVqdToken(query: string): Promise<string> {
|
||
const cached = getCachedVqd(query);
|
||
if (cached) return cached;
|
||
|
||
const url = `https://duckduckgo.com/?q=${encodeURIComponent(query)}&t=h_&iax=images&ia=images`;
|
||
|
||
const res = await fetch(url, {
|
||
headers: { "User-Agent": UA, Accept: "text/html" },
|
||
signal: AbortSignal.timeout(15_000),
|
||
});
|
||
|
||
if (!res.ok) throw new Error(`Failed to get vqd token: ${res.status}`);
|
||
|
||
const html = await res.text();
|
||
const match = html.match(/vqd['"]?\s*[:=]\s*['"]([a-f0-9-]+)['"]/);
|
||
if (!match) throw new Error(`Could not extract vqd token for "${query}"`);
|
||
|
||
setCachedVqd(query, match[1]);
|
||
return match[1];
|
||
}
|
||
|
||
async function searchImagesDuckDuckGo(
|
||
query: string,
|
||
vqd: string,
|
||
page: number,
|
||
): Promise<DuckDuckGoImageResult[]> {
|
||
// Rate-limit before making the request
|
||
await ddgLimiter.acquire();
|
||
|
||
const url = `https://duckduckgo.com/i.js?q=${encodeURIComponent(
|
||
query,
|
||
)}&vqd=${vqd}&o=json&p=${page}&f=,,,`;
|
||
|
||
const res = await fetch(url, {
|
||
headers: {
|
||
"User-Agent": UA,
|
||
Accept: "application/json",
|
||
Referer: `https://duckduckgo.com/?q=${encodeURIComponent(query)}&t=h_&iax=images&ia=images`,
|
||
},
|
||
signal: AbortSignal.timeout(15_000),
|
||
});
|
||
|
||
if (!res.ok) {
|
||
if (res.status === 429) {
|
||
// Rate limited — wait and retry once
|
||
await sleep(5_000);
|
||
return searchImagesDuckDuckGo(query, vqd, page);
|
||
}
|
||
if (res.status === 403) return [];
|
||
// Don't throw for transient errors — just return empty
|
||
return [];
|
||
}
|
||
|
||
const data = (await res.json()) as { results: DuckDuckGoImageResult[] };
|
||
return data.results ?? [];
|
||
}
|
||
|
||
/**
|
||
* Collect images from DDG for a single query.
|
||
* Fetches up to MAX_DDG_PAGES pages in PARALLEL (rate-limited via ddgLimiter).
|
||
*/
|
||
async function collectFromDdgQuery(
|
||
query: string,
|
||
target: number,
|
||
seenUrls: Set<string>,
|
||
): Promise<CollectResult> {
|
||
const results: string[] = [];
|
||
|
||
let vqd: string;
|
||
try {
|
||
vqd = await getVqdToken(query);
|
||
} catch (err) {
|
||
console.warn(` ⚠ DDG token failed: ${err instanceof Error ? err.message : "unknown"}`);
|
||
return { urls: [], exhausted: true };
|
||
}
|
||
|
||
// Fetch all pages in parallel
|
||
const pageFetches: Promise<DuckDuckGoImageResult[]>[] = [];
|
||
for (let page = 1; page <= MAX_DDG_PAGES; page++) {
|
||
pageFetches.push(searchImagesDuckDuckGo(query, vqd, page));
|
||
}
|
||
|
||
const pageResults = await Promise.allSettled(pageFetches);
|
||
|
||
for (const settled of pageResults) {
|
||
if (settled.status !== "fulfilled") continue;
|
||
if (results.length >= target) break;
|
||
|
||
for (const r of settled.value) {
|
||
if (results.length >= target) break;
|
||
const imgUrl = r.image || r.url;
|
||
if (!imgUrl || typeof imgUrl !== "string") continue;
|
||
if (seenUrls.has(imgUrl)) continue;
|
||
try {
|
||
new URL(imgUrl);
|
||
} catch {
|
||
continue;
|
||
}
|
||
seenUrls.add(imgUrl);
|
||
results.push(imgUrl);
|
||
}
|
||
}
|
||
|
||
return { urls: results.slice(0, target), exhausted: results.length < target };
|
||
}
|
||
|
||
/**
|
||
* Collect images from DDG across ALL queries for a disease.
|
||
* Runs all queries in PARALLEL, then merges deduplicated results.
|
||
*/
|
||
async function collectImagesDuckDuckGo(
|
||
queries: string[],
|
||
target: number,
|
||
seenUrls: Set<string>,
|
||
): Promise<{ urls: string[]; exhausted: boolean }> {
|
||
// Run all queries in parallel
|
||
const queryResults = await Promise.allSettled(
|
||
queries.map((q) => collectFromDdgQuery(q, target, seenUrls)),
|
||
);
|
||
|
||
// Merge results — seenUrls already deduplicates across queries
|
||
const merged: string[] = [];
|
||
for (const settled of queryResults) {
|
||
if (settled.status === "fulfilled") {
|
||
merged.push(...settled.value.urls);
|
||
if (merged.length >= target) break;
|
||
}
|
||
}
|
||
|
||
return { urls: merged.slice(0, target), exhausted: merged.length < target };
|
||
}
|
||
|
||
// ─── iNaturalist API ───────────────────────────────────────────────────────
|
||
|
||
async function searchImagesInaturalist(
|
||
query: string,
|
||
target: number,
|
||
seenUrls: Set<string>,
|
||
): Promise<CollectResult> {
|
||
const results: string[] = [];
|
||
const perPage = Math.min(target, 200);
|
||
|
||
const apiUrl =
|
||
`https://api.inaturalist.org/v1/observations` +
|
||
`?q=${encodeURIComponent(query)}` +
|
||
`&photos_only=true` +
|
||
`&quality_grade=research` +
|
||
`&per_page=${perPage}` +
|
||
`&order_by=observed_on&order=desc`;
|
||
|
||
try {
|
||
const res = await fetch(apiUrl, {
|
||
headers: { "User-Agent": UA, Accept: "application/json" },
|
||
signal: AbortSignal.timeout(15_000),
|
||
});
|
||
if (!res.ok) return { urls: [], exhausted: false };
|
||
|
||
const data = (await res.json()) as {
|
||
results: Array<{ photos: Array<{ url: string }> }>;
|
||
};
|
||
|
||
for (const obs of data.results ?? []) {
|
||
if (results.length >= target) break;
|
||
for (const photo of obs.photos ?? []) {
|
||
if (results.length >= target) break;
|
||
const url = photo.url;
|
||
if (!url || seenUrls.has(url)) continue;
|
||
const fullUrl = url.replace("/medium.", "/original.");
|
||
seenUrls.add(fullUrl);
|
||
results.push(fullUrl);
|
||
}
|
||
}
|
||
|
||
return { urls: results, exhausted: results.length < target };
|
||
} catch {
|
||
return { urls: results, exhausted: false };
|
||
}
|
||
}
|
||
|
||
// ─── Wikimedia Commons API ─────────────────────────────────────────────────
|
||
|
||
async function searchImagesCommons(
|
||
query: string,
|
||
target: number,
|
||
seenUrls: Set<string>,
|
||
): Promise<CollectResult> {
|
||
const results: string[] = [];
|
||
let sroffset = 0;
|
||
|
||
while (results.length < target) {
|
||
const params = new URLSearchParams({
|
||
action: "query",
|
||
list: "search",
|
||
srsearch: query,
|
||
srnamespace: "6",
|
||
srlimit: "50",
|
||
sroffset: String(sroffset),
|
||
format: "json",
|
||
});
|
||
|
||
const url = `https://commons.wikimedia.org/w/api.php?${params}`;
|
||
|
||
try {
|
||
const res = await fetch(url, {
|
||
headers: { "User-Agent": UA },
|
||
signal: AbortSignal.timeout(10_000),
|
||
});
|
||
if (!res.ok) break;
|
||
|
||
const data = (await res.json()) as {
|
||
query?: { search?: Array<{ title: string }> };
|
||
continue?: { sroffset?: number };
|
||
};
|
||
|
||
const hits = data.query?.search ?? [];
|
||
if (hits.length === 0) break;
|
||
|
||
for (const hit of hits) {
|
||
if (results.length >= target) break;
|
||
const filename = hit.title.replace(/^File:/, "");
|
||
const imgUrl = `https://commons.wikimedia.org/wiki/Special:FilePath/${encodeURIComponent(
|
||
filename,
|
||
)}`;
|
||
if (seenUrls.has(imgUrl)) continue;
|
||
seenUrls.add(imgUrl);
|
||
results.push(imgUrl);
|
||
}
|
||
|
||
sroffset = data.continue?.sroffset ?? sroffset + hits.length;
|
||
} catch {
|
||
break;
|
||
}
|
||
}
|
||
|
||
return { urls: results, exhausted: results.length < target };
|
||
}
|
||
|
||
// ─── Image Download ─────────────────────────────────────────────────────────
|
||
|
||
async function downloadImage(url: string, destPath: string): Promise<boolean> {
|
||
try {
|
||
const res = await fetch(url, {
|
||
headers: { "User-Agent": UA, Accept: "image/webp,image/png,image/jpeg,*/*" },
|
||
signal: AbortSignal.timeout(8_000),
|
||
});
|
||
if (!res.ok) return false;
|
||
|
||
const contentType = res.headers.get("content-type") || "";
|
||
if (contentType.includes("text/html")) return false;
|
||
|
||
const buffer = Buffer.from(await res.arrayBuffer());
|
||
if (buffer.length < MIN_IMAGE_SIZE) return false;
|
||
if (buffer.length > MAX_IMAGE_SIZE) return false;
|
||
|
||
let ext = extname(new URL(url).pathname).toLowerCase();
|
||
if (!ALLOWED_EXTENSIONS.includes(ext)) {
|
||
if (contentType.includes("jpeg") || contentType.includes("jpg")) ext = ".jpg";
|
||
else if (contentType.includes("png")) ext = ".png";
|
||
else if (contentType.includes("webp")) ext = ".webp";
|
||
else ext = ".jpg";
|
||
}
|
||
|
||
const filePath = destPath.replace(/\.\w+$/, ext);
|
||
writeFileSync(filePath, buffer);
|
||
return true;
|
||
} catch {
|
||
return false;
|
||
}
|
||
}
|
||
|
||
async function downloadBatch(
|
||
urls: string[],
|
||
classDir: string,
|
||
startIndex: number,
|
||
): Promise<{ downloaded: number; failed: number; lastIndex: number }> {
|
||
let downloaded = 0;
|
||
let failed = 0;
|
||
let index = startIndex;
|
||
|
||
for (let i = 0; i < urls.length; i += CONCURRENT_DOWNLOADS) {
|
||
const chunk = urls.slice(i, i + CONCURRENT_DOWNLOADS);
|
||
|
||
const results = await Promise.all(
|
||
chunk.map(async (url) => {
|
||
const paddedIndex = String(index).padStart(4, "0");
|
||
const destPath = resolve(classDir, `img_${paddedIndex}.jpg`);
|
||
const success = await downloadImage(url, destPath);
|
||
return { success, index: index++ };
|
||
}),
|
||
);
|
||
|
||
for (const r of results) {
|
||
if (r.success) downloaded++;
|
||
else failed++;
|
||
}
|
||
}
|
||
|
||
return { downloaded, failed, lastIndex: index };
|
||
}
|
||
|
||
// ─── Query Building ─────────────────────────────────────────────────────────
|
||
|
||
function buildSearchQueries(name: string, plant: string): string[] {
|
||
return [`${name} ${plant} leaf disease`, `${plant} ${name} symptoms`, `${name} ${plant}`];
|
||
}
|
||
|
||
function buildHealthyQueries(plant: string): string[] {
|
||
const name = plant.replace(/-/g, " ");
|
||
return [
|
||
`healthy ${name} leaf`,
|
||
`${name} leaf closeup`,
|
||
`healthy ${name} plant`,
|
||
`${name} foliage`,
|
||
];
|
||
}
|
||
|
||
// ─── Fill Logic ─────────────────────────────────────────────────────────────
|
||
|
||
/**
|
||
* Try to collect up to `needed` images for a disease by hitting all three
|
||
* sources IN PARALLEL. Returns how many new images were actually downloaded.
|
||
*
|
||
* Sources (DDG with its 3 internal queries, iNat, Commons) all run concurrently.
|
||
* As soon as any source completes, its URLs are downloaded immediately while
|
||
* other sources are still searching (pipeline).
|
||
*/
|
||
async function fillClass(
|
||
_diseaseId: string,
|
||
queries: string[],
|
||
needed: number,
|
||
classDir: string,
|
||
seenUrls: Set<string>,
|
||
): Promise<number> {
|
||
if (needed <= 0) return 0;
|
||
|
||
mkdirSync(classDir, { recursive: true });
|
||
const startCount = countImagesInDir(classDir);
|
||
|
||
// ── Run all sources in parallel, pipelining downloads ──────────────────
|
||
// Start downloading from each source as soon as it returns results, rather
|
||
// than waiting for all sources to complete. DDG is (by far) the richest
|
||
// source, so its results start saving to disk while iNat and Commons are
|
||
// still searching.
|
||
//
|
||
// Each source gets a DEDICATED index range so there's no race condition
|
||
// writing files. DDG gets [startCount, startCount+199], iNat gets
|
||
// [startCount+200, startCount+399], Commons gets [startCount+400,...].
|
||
// The 4-digit filename supports up to 9999, well beyond our 200 target.
|
||
|
||
let totalDownloaded = 0;
|
||
let totalFailed = 0;
|
||
let anySuccess = false;
|
||
|
||
const collectAndDownload = async (
|
||
label: string,
|
||
collector: () => Promise<CollectResult>,
|
||
indexOffset: number,
|
||
): Promise<void> => {
|
||
const result = await collector();
|
||
if (result.urls.length === 0) return;
|
||
console.log(` ${label}: ${result.urls.length} new URLs`);
|
||
|
||
// Each source writes to its own non-overlapping range
|
||
const { downloaded, failed } = await downloadBatch(result.urls, classDir, indexOffset);
|
||
totalDownloaded += downloaded;
|
||
totalFailed += failed;
|
||
if (downloaded > 0) anySuccess = true;
|
||
};
|
||
|
||
await Promise.allSettled([
|
||
collectAndDownload("DDG", () => collectImagesDuckDuckGo(queries, needed, seenUrls), startCount),
|
||
collectAndDownload(
|
||
"iNat",
|
||
() => searchImagesInaturalist(queries[0], needed, seenUrls),
|
||
startCount + TARGET_PER_DISEASE,
|
||
),
|
||
collectAndDownload(
|
||
"Commons",
|
||
() => searchImagesCommons(queries[0], needed, seenUrls),
|
||
startCount + 2 * TARGET_PER_DISEASE,
|
||
),
|
||
]);
|
||
|
||
if (!anySuccess) {
|
||
console.log(` ✗ No new images found from any source`);
|
||
return 0;
|
||
}
|
||
|
||
const newTotal = countImagesInDir(classDir);
|
||
const gained = newTotal - startCount;
|
||
console.log(
|
||
` ✓ ${totalDownloaded}/${totalDownloaded + totalFailed} downloaded` +
|
||
` (${totalFailed} failed, ${gained} new files)`,
|
||
);
|
||
|
||
return gained;
|
||
}
|
||
|
||
// ─── Directory Scanner ─────────────────────────────────────────────────────
|
||
|
||
interface ScanResult {
|
||
/** Disease id → how many images currently on disk */
|
||
diseaseCounts: Map<string, number>;
|
||
/** How many healthy images on disk */
|
||
healthyCount: number;
|
||
}
|
||
|
||
function scanDataset(): ScanResult {
|
||
const diseaseCounts = new Map<string, number>();
|
||
let healthyCount = 0;
|
||
|
||
if (!existsSync(DATASET_DIR)) {
|
||
return { diseaseCounts, healthyCount: 0 };
|
||
}
|
||
|
||
const entries = readdirSync(DATASET_DIR, { withFileTypes: true });
|
||
|
||
for (const entry of entries) {
|
||
if (!entry.isDirectory()) continue;
|
||
if (entry.name.startsWith(".")) continue;
|
||
|
||
if (entry.name === HEALTHY_CLASS) {
|
||
healthyCount = countImagesInDir(resolve(DATASET_DIR, entry.name));
|
||
} else {
|
||
const count = countImagesInDir(resolve(DATASET_DIR, entry.name));
|
||
if (count > 0) {
|
||
diseaseCounts.set(entry.name, count);
|
||
}
|
||
}
|
||
}
|
||
|
||
return { diseaseCounts, healthyCount };
|
||
}
|
||
|
||
// ─── CLI Flags ──────────────────────────────────────────────────────────────
|
||
|
||
function parseFlags(): { reverse: boolean } {
|
||
const args = process.argv.slice(2);
|
||
return {
|
||
reverse: args.includes("--reverse") || args.includes("-r"),
|
||
};
|
||
}
|
||
|
||
// ─── Main ───────────────────────────────────────────────────────────────────
|
||
|
||
async function main() {
|
||
const flags = parseFlags();
|
||
|
||
console.log("=".repeat(60));
|
||
console.log("TRAINING DATASET FILL — Parallelized gap-filling download");
|
||
if (flags.reverse) console.log(" (reverse order — processing from lowest deficit first)");
|
||
console.log("=".repeat(60));
|
||
|
||
// Ensure dataset directory exists
|
||
mkdirSync(DATASET_DIR, { recursive: true });
|
||
|
||
// ── Step 1: Scan what we already have ────────────────────────────────────
|
||
console.log("\nScanning existing dataset...");
|
||
const { diseaseCounts, healthyCount } = scanDataset();
|
||
console.log(` Found ${diseaseCounts.size} disease directories, ${healthyCount} healthy images`);
|
||
|
||
// ── Step 2: Load disease info from DB ────────────────────────────────────
|
||
console.log("\nLoading disease info from database...");
|
||
const db = getDb();
|
||
|
||
const allDiseases = await db
|
||
.select({
|
||
id: diseases.id,
|
||
plantId: diseases.plantId,
|
||
name: diseases.name,
|
||
})
|
||
.from(diseases);
|
||
|
||
// Build a deduplicated map: disease id → first disease info found
|
||
const diseaseInfo = new Map<string, { name: string; plantId: string }>();
|
||
for (const d of allDiseases) {
|
||
if (!diseaseInfo.has(d.id)) {
|
||
diseaseInfo.set(d.id, { name: d.name, plantId: d.plantId });
|
||
}
|
||
}
|
||
console.log(` Loaded ${diseaseInfo.size} unique diseases from DB`);
|
||
|
||
// ── Step 3: Build deficit list ──────────────────────────────────────────
|
||
const deficits: DiseaseInfo[] = [];
|
||
|
||
for (const [id, info] of diseaseInfo) {
|
||
const have = diseaseCounts.get(id) ?? 0;
|
||
const needed = TARGET_PER_DISEASE - have;
|
||
if (needed > 0) {
|
||
deficits.push({ id, name: info.name, plantId: info.plantId, have, needed });
|
||
}
|
||
}
|
||
|
||
// Sort by deficit size (largest first) so we prioritize the neediest diseases
|
||
deficits.sort((a, b) => b.needed - a.needed);
|
||
|
||
// Reverse order if --reverse/-r flag is set (useful to try a different
|
||
// direction when the front of the queue keeps hitting dead URLs)
|
||
if (flags.reverse) deficits.reverse();
|
||
|
||
const healthyDeficit = TARGET_HEALTHY - healthyCount;
|
||
|
||
console.log(`\n${"=".repeat(60)}`);
|
||
console.log("DEFICIT REPORT");
|
||
console.log(`${"=".repeat(60)}`);
|
||
console.log(` Diseases needing images: ${deficits.length}/${diseaseInfo.size}`);
|
||
console.log(` Total images missing: ${deficits.reduce((s, d) => s + d.needed, 0)}`);
|
||
console.log(` Healthy deficit: ${Math.max(0, healthyDeficit)}`);
|
||
console.log(` Parallelism: ${DISEASE_CONCURRENCY} diseases at once`);
|
||
console.log(` DDG rate limit: ${DDG_RATE_LIMIT_RPS} req/s (shared)`);
|
||
console.log(
|
||
` Order: ${flags.reverse ? "reverse (--reverse)" : "normal (deficit-first)"}`,
|
||
);
|
||
console.log(`${"=".repeat(60)}`);
|
||
|
||
if (deficits.length === 0 && healthyDeficit <= 0) {
|
||
console.log("\n ✓ Nothing to do — all targets met!\n");
|
||
await closeDb();
|
||
return;
|
||
}
|
||
|
||
// ── Step 4: Load seen-URLs cache ────────────────────────────────────────
|
||
const seenUrlsCache = loadSeenUrlsCache();
|
||
let totalDownloaded = 0;
|
||
let totalFailed = 0;
|
||
let diseasesProcessed = 0;
|
||
const startTime = Date.now();
|
||
|
||
// ── Step 5: Fill disease deficits ───────────────────────────────────────
|
||
if (deficits.length > 0) {
|
||
console.log("\n" + "─".repeat(60));
|
||
console.log(`FILLING ${deficits.length} DISEASES (target: ${TARGET_PER_DISEASE} each)`);
|
||
console.log("─".repeat(60));
|
||
|
||
// Process in parallel batches
|
||
for (let i = 0; i < deficits.length; i += DISEASE_CONCURRENCY) {
|
||
const batch = deficits.slice(i, i + DISEASE_CONCURRENCY);
|
||
const batchNum = Math.floor(i / DISEASE_CONCURRENCY) + 1;
|
||
const totalBatches = Math.ceil(deficits.length / DISEASE_CONCURRENCY);
|
||
|
||
console.log(`\n[Batch ${batchNum}/${totalBatches}] Processing ${batch.length} diseases...`);
|
||
|
||
// Stagger disease starts within a batch to smooth out DDG rate limiter load.
|
||
// Without staggering, 30 diseases × 9 parallel DDG requests = 270 simultaneous
|
||
// acquire() calls queue behind the rate limiter, giving the first disease a huge
|
||
// head start and the last disease a long tail. Staggering by 200ms each spreads
|
||
// the load evenly, reducing tail latency and improving overall throughput.
|
||
const STAGGER_MS = 200;
|
||
const batchResults = await Promise.allSettled(
|
||
batch.map((d, idx) =>
|
||
(async () => {
|
||
if (idx > 0) await sleep(idx * STAGGER_MS);
|
||
|
||
const classDir = resolve(DATASET_DIR, d.id);
|
||
const queries = buildSearchQueries(d.name, d.plantId);
|
||
const seen = new Set<string>(seenUrlsCache[d.id] ?? []);
|
||
|
||
console.log(
|
||
` [${d.id}] have ${d.have}, need ${d.needed} more` + ` (${d.name} / ${d.plantId})`,
|
||
);
|
||
|
||
const gained = await fillClass(d.id, queries, d.needed, classDir, seen);
|
||
|
||
// Update seen-URLs cache for this disease
|
||
seenUrlsCache[d.id] = Array.from(seen);
|
||
return gained;
|
||
})(),
|
||
),
|
||
);
|
||
|
||
// Aggregate batch results
|
||
for (const result of batchResults) {
|
||
if (result.status === "fulfilled") {
|
||
totalDownloaded += result.value;
|
||
} else {
|
||
console.error(` ✗ Disease failed: ${result.reason}`);
|
||
}
|
||
}
|
||
|
||
diseasesProcessed += batch.length;
|
||
|
||
// Flush seen-URLs cache to disk periodically (not after every disease)
|
||
if (
|
||
diseasesProcessed % SEEN_CACHE_FLUSH_INTERVAL < batch.length ||
|
||
i + batch.length >= deficits.length
|
||
) {
|
||
saveSeenUrlsCache(seenUrlsCache);
|
||
}
|
||
|
||
const elapsed = Math.round((Date.now() - startTime) / 1000);
|
||
const rate = diseasesProcessed / Math.max(1, elapsed);
|
||
const remaining = deficits.length - diseasesProcessed;
|
||
const eta = remaining / Math.max(0.01, rate);
|
||
console.log(
|
||
` [Batch ${batchNum}/${totalBatches}] checkpoint — ` +
|
||
`${totalDownloaded} downloaded, ` +
|
||
`${diseasesProcessed}/${deficits.length} diseases (${rate.toFixed(1)}/s, ` +
|
||
`ETA: ${Math.round(eta)}s)`,
|
||
);
|
||
}
|
||
}
|
||
|
||
// ── Step 6: Fill healthy deficit ────────────────────────────────────────
|
||
if (healthyDeficit > 0) {
|
||
console.log("\n" + "─".repeat(60));
|
||
console.log(`FILLING HEALTHY CLASS (target: ${TARGET_HEALTHY})`);
|
||
console.log("─".repeat(60));
|
||
|
||
const healthyDir = resolve(DATASET_DIR, HEALTHY_CLASS);
|
||
mkdirSync(healthyDir, { recursive: true });
|
||
|
||
// Collect all unique plants from the disease info
|
||
const allPlants = [...new Set(diseaseInfo.values())].map((d) => d.plantId);
|
||
const allHealthyQueries: string[] = [];
|
||
for (const plant of allPlants) {
|
||
allHealthyQueries.push(...buildHealthyQueries(plant));
|
||
}
|
||
|
||
const healthySeen = new Set<string>(seenUrlsCache[HEALTHY_CLASS] ?? []);
|
||
const healthyNeeded = TARGET_HEALTHY - countImagesInDir(healthyDir);
|
||
|
||
// Run all 3 sources in parallel for the healthy class too
|
||
const [ddgUrls, inatUrls, commonsUrls] = await Promise.allSettled([
|
||
collectImagesDuckDuckGo(
|
||
allHealthyQueries.slice(0, MAX_HEALTHY_QUERIES),
|
||
healthyNeeded,
|
||
healthySeen,
|
||
),
|
||
searchImagesInaturalist(allHealthyQueries[0], healthyNeeded, healthySeen),
|
||
searchImagesCommons(allHealthyQueries[0], healthyNeeded, healthySeen),
|
||
]);
|
||
|
||
const allUrls: string[] = [];
|
||
for (const settled of [ddgUrls, inatUrls, commonsUrls]) {
|
||
if (settled.status === "fulfilled") {
|
||
allUrls.push(...settled.value.urls);
|
||
}
|
||
}
|
||
|
||
if (allUrls.length > 0) {
|
||
console.log(`\n Downloading ${allUrls.length} healthy images...`);
|
||
const startIdx = countImagesInDir(healthyDir);
|
||
const { downloaded, failed } = await downloadBatch(allUrls, healthyDir, startIdx);
|
||
|
||
const newTotal = countImagesInDir(healthyDir);
|
||
const gained = newTotal - healthyCount;
|
||
totalDownloaded += gained;
|
||
totalFailed += failed;
|
||
|
||
console.log(
|
||
` ${downloaded > 0 ? "✓" : "✗"} Got ${downloaded} images.` +
|
||
` Total healthy: ${newTotal}/${TARGET_HEALTHY} (${gained} new)`,
|
||
);
|
||
} else {
|
||
console.log(`\n ✗ No healthy images found`);
|
||
}
|
||
|
||
// Update seen-URLs cache
|
||
seenUrlsCache[HEALTHY_CLASS] = Array.from(healthySeen);
|
||
saveSeenUrlsCache(seenUrlsCache);
|
||
}
|
||
|
||
// ── Summary ──────────────────────────────────────────────────────────────
|
||
const elapsed = Math.round((Date.now() - startTime) / 1000);
|
||
const mins = Math.floor(elapsed / 60);
|
||
const hrs = Math.floor(mins / 60);
|
||
|
||
// Final scan
|
||
const finalScan = scanDataset();
|
||
const totalHave = [...finalScan.diseaseCounts.values()].reduce((s, c) => s + c, 0);
|
||
const atTarget = [...finalScan.diseaseCounts.values()].filter(
|
||
(c) => c >= TARGET_PER_DISEASE,
|
||
).length;
|
||
|
||
console.log("\n" + "=".repeat(60));
|
||
console.log(" ✅ FILL COMPLETE");
|
||
console.log("=".repeat(60));
|
||
console.log(` Time: ${hrs}h ${mins % 60}m`);
|
||
console.log(` Diseases at target: ${atTarget}/${diseaseInfo.size}`);
|
||
console.log(` Total images: ${totalHave}`);
|
||
console.log(` Healthy images: ${finalScan.healthyCount}/${TARGET_HEALTHY}`);
|
||
console.log(` New downloads: ${totalDownloaded}`);
|
||
console.log(` Dataset dir: ${DATASET_DIR}/`);
|
||
|
||
await closeDb();
|
||
console.log("=".repeat(60));
|
||
}
|
||
|
||
main().catch((err) => {
|
||
console.error("\nFatal error:", `\n${err}`);
|
||
process.exit(1);
|
||
});
|