import { Injectable, Logger } from '@nestjs/common'; export interface MemoryStats { heapUsed: number; // Used heap memory (MB) heapTotal: number; // Total heap memory (MB) external: number; // External memory (MB) rss: number; // RSS (Resident Set Size) (MB) timestamp: Date; } @Injectable() export class MemoryMonitorService { private readonly logger = new Logger(MemoryMonitorService.name); private readonly MAX_MEMORY_MB: number; private readonly BATCH_SIZE: number; private readonly GC_THRESHOLD_MB: number; constructor() { // Load config from env vars. Default values for memory optimization this.MAX_MEMORY_MB = parseInt(process.env.MAX_MEMORY_USAGE_MB || '1024'); // 1GB limit this.BATCH_SIZE = parseInt(process.env.CHUNK_BATCH_SIZE || '100'); // 100 chunks per batch this.GC_THRESHOLD_MB = parseInt(process.env.GC_THRESHOLD_MB || '800'); // Trigger GC at 800MB this.logger.log( `Memory monitor initialized: limit=${this.MAX_MEMORY_MB}MB, batchSize=${this.BATCH_SIZE}, GCThreshold=${this.GC_THRESHOLD_MB}MB`, ); } /** * Get current memory usage */ getMemoryUsage(): MemoryStats { const usage = process.memoryUsage(); return { heapUsed: Math.round(usage.heapUsed / 1024 / 1024), heapTotal: Math.round(usage.heapTotal / 1024 / 1024), external: Math.round((usage.external || 0) / 1024 / 1024), rss: Math.round(usage.rss / 1024 / 1024), timestamp: new Date(), }; } /** * Check if memory is approaching limit */ isMemoryHigh(): boolean { const usage = this.getMemoryUsage(); return usage.heapUsed > this.MAX_MEMORY_MB * 0.85; // 85% threshold } /** * Wait for memory to become available (with timeout) */ async waitForMemoryAvailable(timeoutMs: number = 30000): Promise { const startTime = Date.now(); while (this.isMemoryHigh()) { if (Date.now() - startTime > timeoutMs) { throw new Error( `Memory wait timeout: current ${this.getMemoryUsage().heapUsed}MB > ${this.MAX_MEMORY_MB * 0.85}MB`, ); } this.logger.warn( `Memory usage too high. Waiting for release... ${this.getMemoryUsage().heapUsed}/${this.MAX_MEMORY_MB}MB`, ); // Force garbage collection (if available) if (global.gc) { this.logger.log('Running forced garbage collection...'); global.gc(); } await new Promise((resolve) => setTimeout(resolve, 1000)); } } /** * Force garbage collection (if available) */ forceGC(): void { if (global.gc) { const before = this.getMemoryUsage(); global.gc(); const after = this.getMemoryUsage(); this.logger.log( `GC completed: ${before.heapUsed}MB → ${after.heapUsed}MB (${before.heapUsed - after.heapUsed}MB freed)`, ); } } /** * Dynamically adjust batch size */ getDynamicBatchSize(currentMemoryMB: number): number { const baseBatchSize = this.BATCH_SIZE; if (currentMemoryMB > this.GC_THRESHOLD_MB) { // Memory pressure, reduce batch size const reduced = Math.max(10, Math.floor(baseBatchSize * 0.5)); this.logger.warn( `Memory pressure (${currentMemoryMB}MB), adjusting batch size: ${baseBatchSize} → ${reduced}`, ); return reduced; } else if (currentMemoryMB < this.MAX_MEMORY_MB * 0.4) { // Enough memory, increase batch size const increased = Math.min(200, Math.floor(baseBatchSize * 1.2)); if (increased > baseBatchSize) { this.logger.log( `Memory available (${currentMemoryMB}MB), adjusting batch size: ${baseBatchSize} → ${increased}`, ); } return increased; } return baseBatchSize; } /** * Process large data: auto-batching and memory control */ async processInBatches( items: T[], processor: (batch: T[], batchIndex: number) => Promise, options?: { onBatchComplete?: ( batchIndex: number, totalBatches: number, results: R[], ) => Promise | void; maxConcurrency?: number; }, ): Promise { const totalItems = items.length; if (totalItems === 0) return []; const startTime = Date.now(); this.logger.log(`Starting batch processing: ${totalItems} items`); const allResults: R[] = []; let processedCount = 0; for (let i = 0; i < totalItems; ) { // Check memory state and wait await this.waitForMemoryAvailable(); // Dynamically adjust batch size const currentMem = this.getMemoryUsage().heapUsed; const batchSize = this.getDynamicBatchSize(currentMem); // Get current batch const batch = items.slice(i, i + batchSize); const batchIndex = Math.floor(i / batchSize) + 1; const totalBatches = Math.ceil(totalItems / batchSize); this.logger.log( `Processing batch ${batchIndex}/${totalBatches}: ${batch.length} items (cumulative ${processedCount}/${totalItems})`, ); // Process batch const batchResults = await processor(batch, batchIndex); allResults.push(...batchResults); processedCount += batch.length; // Callback notification if (options?.onBatchComplete) { await options.onBatchComplete(batchIndex, totalBatches, batchResults); } // Force GC if memory near threshold if (currentMem > this.GC_THRESHOLD_MB) { this.forceGC(); } // Clear references to help GC batch.length = 0; i += batchSize; } const duration = ((Date.now() - startTime) / 1000).toFixed(2); const finalMem = this.getMemoryUsage(); this.logger.log( `Batch processing completed: ${totalItems} items, duration ${duration}s, final memory ${finalMem.heapUsed}MB`, ); return allResults; } /** * Estimate memory required for processing */ estimateMemoryUsage( itemCount: number, itemSizeBytes: number, vectorDim: number, ): number { // Text content memory const textMemory = itemCount * itemSizeBytes; // Vector memory (dimension * 4 bytes per vector) const vectorMemory = itemCount * vectorDim * 4; // Object overhead (~100 bytes per object) const overhead = itemCount * 100; const totalMB = Math.round( (textMemory + vectorMemory + overhead) / 1024 / 1024, ); return totalMB; } /** * Check if batching should be used */ shouldUseBatching( itemCount: number, itemSizeBytes: number, vectorDim: number, ): boolean { const estimatedMB = this.estimateMemoryUsage( itemCount, itemSizeBytes, vectorDim, ); const threshold = this.MAX_MEMORY_MB * 0.7; // 70% threshold if (estimatedMB > threshold) { this.logger.warn( `Estimated memory ${estimatedMB}MB exceeds threshold ${threshold}MB, using batch processing`, ); return true; } return false; } /** * Get recommended batch size */ getRecommendedBatchSize(itemSizeBytes: number, vectorDim: number): number { // Goal: max 200MB memory per batch const targetMemoryMB = 200; const targetMemoryBytes = targetMemoryMB * 1024 * 1024; // Memory per item = text + vector + overhead const singleItemMemory = itemSizeBytes + vectorDim * 4 + 100; const batchSize = Math.floor(targetMemoryBytes / singleItemMemory); // Limit between 10-200 return Math.max(10, Math.min(200, batchSize)); } }