🏭 Central collector that receives metrics and logs via Rednet and exports to OTLP.

This is the heart of your telemetry infrastructure! The collector automatically receives telemetry data from clients, intelligently batches it for efficiency, and exports to external monitoring systems using the OpenTelemetry Protocol.

The collector handles all the complex details of metric aggregation, counter state management, and memory cleanup so you can focus on analyzing your data.

const collector = new RednetCollector({
batchInterval: 30000,
maxBatchSize: 1000,
otlpMetricsEndpoint: "http://prometheus:9090/api/v1/otlp/v1/metrics"
});

collector.start();

// The collector now automatically processes incoming telemetry
runOsEventLoop();

Constructors

  • Create a new Rednet collector.

    Parameters

    Returns RednetCollector

    const collector = new RednetCollector({
    batchInterval: 30000, // Process batches every 30 seconds
    maxBatchSize: 1000, // Max 1000 items per batch
    otlpMetricsEndpoint: "http://prometheus:9090/api/v1/otlp/v1/metrics",
    otlpLogsEndpoint: "http://loki:3100/otlp/v1/logs",
    otlpHeaders: {
    "Authorization": "Bearer your-api-key",
    "Content-Type": "application/x-protobuf"
    },
    protocol: "otel_telemetry",
    counterStateTTL: 3600000, // 1 hour counter state TTL
    maxCounterStates: 10000 // Max 10k counter states
    });

Methods

  • 🚀 Flush all pending telemetry immediately.

    Forces immediate processing and export of all pending telemetry data, regardless of batch interval or size settings.

    Returns { logs: LogsData; metrics: MetricsData }

    Object containing the flushed metrics and logs data

    // Force immediate flush before shutdown
    const flushedData = collector.flush();

    console.log(`Flushed ${flushedData.metrics.resourceMetrics.length} metric resources`);
    console.log(`Flushed ${flushedData.logs.resourceLogs.length} log resources`);

    // Now safe to stop the collector
    collector.stop();
  • 📝 Get collected logs from clients.

    Returns all log data that has been collected from clients but not yet exported. Useful for custom processing or analysis.

    Returns LogsData

    LogsData containing collected client logs

    const clientLogs = collector.getCollectedLogs();

    // Custom log analysis
    for (const resourceLog of clientLogs.resourceLogs) {
    for (const scopeLog of resourceLog.scopeLogs) {
    for (const logRecord of scopeLog.logRecords) {
    // Check for error patterns
    if (logRecord.body.includes("error") || logRecord.body.includes("failed")) {
    console.log("Error detected:", logRecord.body);
    }
    }
    }
    }
  • 📋 Get internal collector logs.

    Returns logs generated by the collector itself for monitoring and debugging.

    Returns LogsData

    LogsData containing collector internal logs

    const collectorLogs = collector.getCollectorLogs();

    // Analyze collector performance
    for (const resourceLog of collectorLogs.resourceLogs) {
    for (const scopeLog of resourceLog.scopeLogs) {
    for (const logRecord of scopeLog.logRecords) {
    if (logRecord.severityNumber >= SeverityNumber.WARN) {
    console.log("Collector warning/error:", logRecord.body);
    }
    }
    }
    }
  • 📊 Get collector statistics and status.

    Returns detailed information about the collector's current state, including pending telemetry, memory usage, and configuration.

    Returns {
        config: CollectorConfig;
        counterStates: number;
        isRunning: boolean;
        memoryUsage: {
            counterStatesCount: number;
            counterStateTTL: number;
            maxCounterStates: number;
            oldestCounterStateAge?: number;
        };
        pendingLogs: number;
        pendingMetrics: number;
        pendingTelemetry: number;
    }

    Collector statistics object

    const stats = collector.getStats();

    console.log(`Running: ${stats.isRunning}`);
    console.log(`Pending telemetry: ${stats.pendingTelemetry}`);
    console.log(`Counter states: ${stats.counterStates}`);
    console.log(`Memory usage: ${stats.memoryUsage.counterStatesCount}/${stats.memoryUsage.maxCounterStates}`);

    // Check if we're approaching limits
    if (stats.memoryUsage.counterStatesCount > stats.memoryUsage.maxCounterStates * 0.8) {
    console.log("Warning: Approaching counter state limit");
    }
  • 🔄 Legacy method for backward compatibility.

    Parameters

    Returns void

    Use processTelemetry instead

  • 📦 Process received telemetry from Rednet.

    This is called automatically when telemetry data is received over Rednet. You can also call it manually to inject telemetry data for testing.

    Parameters

    Returns void

    // Manual telemetry injection (useful for testing)
    const testMetric: RednetTelemetry = {
    telemetryType: "metric",
    type: "counter",
    name: "test_counter",
    description: "Test counter",
    unit: "count",
    value: 42,
    attributes: {},
    timestamp: os.epoch("utc"),
    resource: { "service.name": "test" },
    scope: { name: "test", version: "1.0.0" }
    };

    collector.processTelemetry(testMetric);
  • 🚀 Start the collector - begins listening for Rednet messages.

    This starts the collector's main processing loop. It will begin listening for telemetry data on the configured Rednet protocol and start the batching timer for periodic exports.

    Returns void

    const collector = new RednetCollector(config);

    // Start the collector
    collector.start();

    // Now it's listening for telemetry data
    console.log("Collector started and listening...");

    // Make sure you have an event loop running!
    runOsEventLoop();
  • 🛑 Stop the collector and clean up resources.

    Gracefully shuts down the collector, processes any remaining telemetry, and cleans up memory. Call this when you're done collecting data.

    Returns void

    // Graceful shutdown
    collector.stop();
    console.log("Collector stopped gracefully");
  • ⏰ Process a single tick of the collector.

    Manually trigger one cycle of the collector's processing logic. This checks if it's time to process a batch and does so if needed.

    Returns void

    // Manual tick processing (useful for testing or custom scheduling)
    collector.tick();

    // Or in a custom loop
    setInterval(() => {
    collector.tick();
    }, 5000);