task to get this here done

This commit is contained in:
2026-06-12 13:20:33 -04:00
parent 6379860123
commit 34855eff55
7 changed files with 1307 additions and 85 deletions

View File

@@ -22,6 +22,7 @@
import "dotenv/config";
import { readFileSync, readdirSync, writeFileSync, existsSync, mkdirSync } from "fs";
import { resolve, extname } from "path";
import { Agent, setGlobalDispatcher } from "undici";
// Load .env.development for DB creds
const envPath = resolve(__dirname, "../.env.development");
@@ -41,7 +42,7 @@ try {
} catch {}
import { getDb, closeDb } from "@/lib/db/index";
import { diseases } from "@/lib/db/schema";
import { plants, diseases } from "@/lib/db/schema";
// ─── Config ─────────────────────────────────────────────────────────────────
@@ -49,17 +50,18 @@ const DATASET_DIR = resolve(__dirname, "../data/dataset");
const SEEN_CACHE_FILE = resolve(DATASET_DIR, ".fill-seen-urls.json");
/** Target images per disease */
const TARGET_PER_DISEASE = 200;
const TARGET_PER_DISEASE = 100;
/** Target images for the "healthy" class */
const TARGET_HEALTHY = 400;
/** Target images per plant for the "healthy" class */
const TARGET_HEALTHY_PER_PLANT = 400;
/**
* How many diseases to process in parallel.
* Each disease is I/O-bound (HTTP requests), so high concurrency is safe.
* The global DDG rate limiter prevents us from overwhelming DuckDuckGo.
* Reduced from 50 to 5 to prevent overwhelming undici's connection pool.
* Each disease fires multiple concurrent requests, so high concurrency causes
* connection exhaustion and undici crashes.
*/
const DISEASE_CONCURRENCY = 50;
const DISEASE_CONCURRENCY = 5;
/**
* Max DDG requests per second (shared across all concurrent diseases).
@@ -70,8 +72,11 @@ const DISEASE_CONCURRENCY = 50;
*/
const DDG_RATE_LIMIT_RPS = 6;
/** Max concurrent image downloads per disease */
const CONCURRENT_DOWNLOADS = 50;
/** Max concurrent image downloads per disease.
* Reduced from 50 to 10 to prevent connection pool exhaustion.
* With 5 diseases × 10 downloads = 50 concurrent downloads (manageable).
*/
const CONCURRENT_DOWNLOADS = 10;
/** Minimum image size in bytes to accept */
const MIN_IMAGE_SIZE = 10_000; // 10KB
@@ -86,9 +91,16 @@ const ALLOWED_EXTENSIONS = [".jpg", ".jpeg", ".png", ".webp"];
const UA =
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Mobile/15E148 Safari/604.1";
/** Healthy class directory name */
/** Healthy class parent directory name */
const HEALTHY_CLASS = "healthy";
/**
* How many plants to process in parallel for healthy image collection.
* Each plant runs the same fillClass pipeline (3 sources), so keep this
* modest to avoid connection pool exhaustion.
*/
const MAX_HEALTHY_CONCURRENCY = 5;
/** How often (in diseases processed) to flush the seen-URLs cache to disk */
const SEEN_CACHE_FLUSH_INTERVAL = 20;
@@ -98,9 +110,6 @@ const SEEN_CACHE_FLUSH_INTERVAL = 20;
* the seen-URLs cache accumulates across runs. */
const MAX_DDG_PAGES = 5;
/** Healthy source queries limit */
const MAX_HEALTHY_QUERIES = 20;
// ─── Types ──────────────────────────────────────────────────────────────────
interface DuckDuckGoImageResult {
@@ -169,6 +178,45 @@ const ddgLimiter = new TokenBucket(DDG_RATE_LIMIT_RPS);
// ─── Helpers ────────────────────────────────────────────────────────────────
/**
* Custom undici agent with connection pooling to prevent overwhelming the network stack.
* Limits concurrent connections per host to prevent undici crashes (the "onResponseError" bug).
* setGlobalDispatcher() makes ALL fetch() calls use this agent automatically.
*/
const fetchAgent = new Agent({
connections: 50, // Max 50 concurrent connections total
connect: {
timeout: 10_000, // 10s connection timeout
},
bodyTimeout: 30_000, // 30s body timeout
headersTimeout: 15_000, // 15s headers timeout
keepAliveTimeout: 30_000, // 30s keep-alive
keepAliveMaxTimeout: 60_000, // 60s max keep-alive
});
setGlobalDispatcher(fetchAgent);
/**
* Wrapper around global fetch() with retry + exponential backoff for transient errors.
* The connection pooling is handled by the global undici dispatcher (set above).
*/
async function safeFetch(url: string | URL, init?: RequestInit, retries = 2): Promise<Response> {
let lastError: Error | undefined;
for (let attempt = 0; attempt <= retries; attempt++) {
try {
return await fetch(url, init);
} catch (err: unknown) {
lastError = err instanceof Error ? err : new Error(String(err));
// Don't retry on abort/timeout (user-initiated)
if (err instanceof Error && err.name === "AbortError") throw err;
if (attempt < retries) {
// Exponential backoff: 500ms, 2000ms
await sleep(500 * (attempt + 1) * (attempt + 1));
}
}
}
throw lastError ?? new Error(`fetch failed after ${retries + 1} attempts`);
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
@@ -240,7 +288,7 @@ async function getVqdToken(query: string): Promise<string> {
const url = `https://duckduckgo.com/?q=${encodeURIComponent(query)}&t=h_&iax=images&ia=images`;
const res = await fetch(url, {
const res = await safeFetch(url, {
headers: { "User-Agent": UA, Accept: "text/html" },
signal: AbortSignal.timeout(15_000),
});
@@ -267,7 +315,7 @@ async function searchImagesDuckDuckGo(
query,
)}&vqd=${vqd}&o=json&p=${page}&f=,,,`;
const res = await fetch(url, {
const res = await safeFetch(url, {
headers: {
"User-Agent": UA,
Accept: "application/json",
@@ -289,7 +337,7 @@ async function searchImagesDuckDuckGo(
const freshVqd = await getVqdToken(query);
await ddgLimiter.acquire();
const retryUrl = url.replace(/vqd=[^&]+/, `vqd=${freshVqd}`);
const retryRes = await fetch(retryUrl, {
const retryRes = await safeFetch(retryUrl, {
headers: {
"User-Agent": UA,
Accept: "application/json",
@@ -410,7 +458,7 @@ async function searchImagesInaturalist(
`&order_by=observed_on&order=desc`;
try {
const res = await fetch(apiUrl, {
const res = await safeFetch(apiUrl, {
headers: { "User-Agent": UA, Accept: "application/json" },
signal: AbortSignal.timeout(15_000),
});
@@ -462,7 +510,7 @@ async function searchImagesCommons(
const url = `https://commons.wikimedia.org/w/api.php?${params}`;
try {
const res = await fetch(url, {
const res = await safeFetch(url, {
headers: { "User-Agent": UA },
signal: AbortSignal.timeout(10_000),
});
@@ -500,7 +548,7 @@ async function searchImagesCommons(
async function downloadImage(url: string, destPath: string): Promise<boolean> {
try {
const res = await fetch(url, {
const res = await safeFetch(url, {
headers: { "User-Agent": UA, Accept: "image/webp,image/png,image/jpeg,*/*" },
signal: AbortSignal.timeout(8_000),
});
@@ -667,16 +715,16 @@ async function fillClass(
interface ScanResult {
/** Disease id → how many images currently on disk */
diseaseCounts: Map<string, number>;
/** How many healthy images on disk */
healthyCount: number;
/** Plant id → how many healthy images currently on disk */
healthyCounts: Map<string, number>;
}
function scanDataset(): ScanResult {
const diseaseCounts = new Map<string, number>();
let healthyCount = 0;
const healthyCounts = new Map<string, number>();
if (!existsSync(DATASET_DIR)) {
return { diseaseCounts, healthyCount: 0 };
return { diseaseCounts, healthyCounts };
}
const entries = readdirSync(DATASET_DIR, { withFileTypes: true });
@@ -686,7 +734,16 @@ function scanDataset(): ScanResult {
if (entry.name.startsWith(".")) continue;
if (entry.name === HEALTHY_CLASS) {
healthyCount = countImagesInDir(resolve(DATASET_DIR, entry.name));
// Scan per-plant subdirectories under healthy/
const healthyDir = resolve(DATASET_DIR, entry.name);
const plantDirs = readdirSync(healthyDir, { withFileTypes: true });
for (const pd of plantDirs) {
if (!pd.isDirectory() || pd.name.startsWith(".")) continue;
const count = countImagesInDir(resolve(healthyDir, pd.name));
if (count > 0) {
healthyCounts.set(pd.name, count);
}
}
} else {
const count = countImagesInDir(resolve(DATASET_DIR, entry.name));
if (count > 0) {
@@ -695,7 +752,7 @@ function scanDataset(): ScanResult {
}
}
return { diseaseCounts, healthyCount };
return { diseaseCounts, healthyCounts };
}
// ─── CLI Flags ──────────────────────────────────────────────────────────────
@@ -722,11 +779,14 @@ async function main() {
// ── Step 1: Scan what we already have ────────────────────────────────────
console.log("\nScanning existing dataset...");
const { diseaseCounts, healthyCount } = scanDataset();
console.log(` Found ${diseaseCounts.size} disease directories, ${healthyCount} healthy images`);
const { diseaseCounts, healthyCounts } = scanDataset();
const totalHealthyImages = [...healthyCounts.values()].reduce((s, c) => s + c, 0);
console.log(
` Found ${diseaseCounts.size} disease directories, ${healthyCounts.size} plants with healthy images (${totalHealthyImages} total)`,
);
// ── Step 2: Load disease info from DB ────────────────────────────────────
console.log("\nLoading disease info from database...");
// ── Step 2: Load disease info and plant info from DB ─────────────────────
console.log("\nLoading data from database...");
const db = getDb();
const allDiseases = await db
@@ -746,6 +806,15 @@ async function main() {
}
console.log(` Loaded ${diseaseInfo.size} unique diseases from DB`);
// Load all plants from DB for healthy class generation
const allPlants = await db
.select({
id: plants.id,
commonName: plants.commonName,
})
.from(plants);
console.log(` Loaded ${allPlants.length} plants from DB`);
// ── Step 3: Build deficit list ──────────────────────────────────────────
const deficits: DiseaseInfo[] = [];
@@ -764,14 +833,24 @@ async function main() {
// direction when the front of the queue keeps hitting dead URLs)
if (flags.reverse) deficits.reverse();
const healthyDeficit = TARGET_HEALTHY - healthyCount;
// Build per-plant healthy deficits
const healthyDeficits: Array<{ id: string; have: number; needed: number }> = [];
for (const plant of allPlants) {
const have = healthyCounts.get(plant.id) ?? 0;
const needed = TARGET_HEALTHY_PER_PLANT - have;
if (needed > 0) {
healthyDeficits.push({ id: plant.id, have, needed });
}
}
const totalHealthyDeficit = healthyDeficits.reduce((s, d) => s + d.needed, 0);
console.log(`\n${"=".repeat(60)}`);
console.log("DEFICIT REPORT");
console.log(`${"=".repeat(60)}`);
console.log(` Diseases needing images: ${deficits.length}/${diseaseInfo.size}`);
console.log(` Total images missing: ${deficits.reduce((s, d) => s + d.needed, 0)}`);
console.log(` Healthy deficit: ${Math.max(0, healthyDeficit)}`);
console.log(` Plants needing healthy: ${healthyDeficits.length}/${allPlants.length}`);
console.log(` Total healthy images missing: ${totalHealthyDeficit}`);
console.log(` Parallelism: ${DISEASE_CONCURRENCY} diseases at once`);
console.log(` DDG rate limit: ${DDG_RATE_LIMIT_RPS} req/s (shared)`);
console.log(
@@ -779,7 +858,7 @@ async function main() {
);
console.log(`${"=".repeat(60)}`);
if (deficits.length === 0 && healthyDeficit <= 0) {
if (deficits.length === 0 && healthyDeficits.length === 0) {
console.log("\n ✓ Nothing to do — all targets met!\n");
await closeDb();
return;
@@ -788,7 +867,6 @@ async function main() {
// ── Step 4: Load seen-URLs cache ────────────────────────────────────────
const seenUrlsCache = loadSeenUrlsCache();
let totalDownloaded = 0;
let totalFailed = 0;
let diseasesProcessed = 0;
const startTime = Date.now();
@@ -875,64 +953,78 @@ async function main() {
}
}
// ── Step 6: Fill healthy deficit ────────────────────────────────────────
if (healthyDeficit > 0) {
// ── Step 6: Fill healthy deficits per plant ────────────────────────────
if (healthyDeficits.length > 0) {
console.log("\n" + "─".repeat(60));
console.log(`FILLING HEALTHY CLASS (target: ${TARGET_HEALTHY})`);
console.log(
`FILLING ${healthyDeficits.length} PLANTS\' HEALTHY CLASSES` +
` (target: ${TARGET_HEALTHY_PER_PLANT} each)`,
);
console.log("─".repeat(60));
const healthyDir = resolve(DATASET_DIR, HEALTHY_CLASS);
mkdirSync(healthyDir, { recursive: true });
let healthyProcessed = 0;
// Collect all unique plants from the disease info
const allPlants = [...new Set(diseaseInfo.values())].map((d) => d.plantId);
const allHealthyQueries: string[] = [];
for (const plant of allPlants) {
allHealthyQueries.push(...buildHealthyQueries(plant));
}
// Process in parallel batches
for (let i = 0; i < healthyDeficits.length; i += MAX_HEALTHY_CONCURRENCY) {
const batch = healthyDeficits.slice(i, i + MAX_HEALTHY_CONCURRENCY);
const batchNum = Math.floor(i / MAX_HEALTHY_CONCURRENCY) + 1;
const totalBatches = Math.ceil(healthyDeficits.length / MAX_HEALTHY_CONCURRENCY);
const healthySeen = new Set<string>(seenUrlsCache[HEALTHY_CLASS] ?? []);
const healthyNeeded = TARGET_HEALTHY - countImagesInDir(healthyDir);
console.log(`\n[Batch ${batchNum}/${totalBatches}] Processing ${batch.length} plants...`);
// Run all 3 sources in parallel for the healthy class too
const [ddgUrls, inatUrls, commonsUrls] = await Promise.allSettled([
collectImagesDuckDuckGo(
allHealthyQueries.slice(0, MAX_HEALTHY_QUERIES),
healthyNeeded,
healthySeen,
),
searchImagesInaturalist(allHealthyQueries[0], healthyNeeded, healthySeen),
searchImagesCommons(allHealthyQueries[0], healthyNeeded, healthySeen),
]);
const STAGGER_MS = 200;
const batchResults = await Promise.allSettled(
batch.map((p, idx) =>
(async () => {
if (idx > 0) await sleep(idx * STAGGER_MS);
const allUrls: string[] = [];
for (const settled of [ddgUrls, inatUrls, commonsUrls]) {
if (settled.status === "fulfilled") {
allUrls.push(...settled.value.urls);
}
}
const classDir = resolve(DATASET_DIR, HEALTHY_CLASS, p.id);
const queries = buildHealthyQueries(p.id);
const seen = new Set<string>();
if (allUrls.length > 0) {
console.log(`\n Downloading ${allUrls.length} healthy images...`);
const startIdx = countImagesInDir(healthyDir);
const { downloaded, failed } = await downloadBatch(allUrls, healthyDir, startIdx);
console.log(` [healthy/${p.id}] have ${p.have}, need ${p.needed} more`);
const newTotal = countImagesInDir(healthyDir);
const gained = newTotal - healthyCount;
totalDownloaded += gained;
totalFailed += failed;
const gained = await fillClass(`healthy/${p.id}`, queries, p.needed, classDir, seen);
console.log(
` ${downloaded > 0 ? "✓" : "✗"} Got ${downloaded} images.` +
` Total healthy: ${newTotal}/${TARGET_HEALTHY} (${gained} new)`,
// Update seen-URLs cache for this plant's healthy images
const cacheKey = `${HEALTHY_CLASS}/${p.id}`;
const existing = seenUrlsCache[cacheKey] ?? [];
const merged = [...new Set([...existing, ...Array.from(seen)])];
seenUrlsCache[cacheKey] = merged.slice(-500);
return gained;
})(),
),
);
} else {
console.log(`\n ✗ No healthy images found`);
}
// Update seen-URLs cache
seenUrlsCache[HEALTHY_CLASS] = Array.from(healthySeen);
saveSeenUrlsCache(seenUrlsCache);
for (const result of batchResults) {
if (result.status === "fulfilled") {
totalDownloaded += result.value;
} else {
console.error(` ✗ Plant healthy fill failed: ${result.reason}`);
}
}
healthyProcessed += batch.length;
// Flush seen-URLs cache to disk periodically
if (
healthyProcessed % SEEN_CACHE_FLUSH_INTERVAL < batch.length ||
i + batch.length >= healthyDeficits.length
) {
saveSeenUrlsCache(seenUrlsCache);
}
const elapsed = Math.round((Date.now() - startTime) / 1000);
const rate = healthyProcessed / Math.max(1, elapsed);
const remaining = healthyDeficits.length - healthyProcessed;
const eta = remaining / Math.max(0.01, rate);
console.log(
` [Batch ${batchNum}/${totalBatches}] checkpoint — ` +
`${totalDownloaded} downloaded (healthy), ` +
`${healthyProcessed}/${healthyDeficits.length} plants (${rate.toFixed(1)}/s, ` +
`ETA: ${Math.round(eta)}s)`,
);
}
}
// ── Summary ──────────────────────────────────────────────────────────────
@@ -946,16 +1038,21 @@ async function main() {
const atTarget = [...finalScan.diseaseCounts.values()].filter(
(c) => c >= TARGET_PER_DISEASE,
).length;
const healthyAtTarget = [...finalScan.healthyCounts.values()].filter(
(c) => c >= TARGET_HEALTHY_PER_PLANT,
).length;
const totalHealthyFinal = [...finalScan.healthyCounts.values()].reduce((s, c) => s + c, 0);
console.log("\n" + "=".repeat(60));
console.log(" ✅ FILL COMPLETE");
console.log("=".repeat(60));
console.log(` Time: ${hrs}h ${mins % 60}m`);
console.log(` Diseases at target: ${atTarget}/${diseaseInfo.size}`);
console.log(` Total images: ${totalHave}`);
console.log(` Healthy images: ${finalScan.healthyCount}/${TARGET_HEALTHY}`);
console.log(` New downloads: ${totalDownloaded}`);
console.log(` Dataset dir: ${DATASET_DIR}/`);
console.log(` Time: ${hrs}h ${mins % 60}m`);
console.log(` Diseases at target: ${atTarget}/${diseaseInfo.size}`);
console.log(` Total disease images: ${totalHave}`);
console.log(` Plants at healthy target: ${healthyAtTarget}/${allPlants.length}`);
console.log(` Total healthy images: ${totalHealthyFinal}`);
console.log(` New downloads: ${totalDownloaded}`);
console.log(` Dataset dir: ${DATASET_DIR}/`);
await closeDb();
console.log("=".repeat(60));