Skip to content

Custom Feed Implementation

Learn how to create custom threat intelligence feeds for TrojanHorse.js, integrating with proprietary sources, internal systems, and community feeds.

Overview

TrojanHorse.js supports custom feed implementations, allowing you to integrate with any threat intelligence source, internal security tools, or proprietary databases. This guide shows you how to build robust, production-ready custom feeds.

graph TB
    A[Custom Feed] --> B[Feed Interface]
    A --> C[Data Transformation]
    A --> D[Error Handling]
    A --> E[Performance Optimization]

    B --> B1[fetchThreatData]
    B --> B2[batchQuery]
    B --> B3[getHealthStatus]

    C --> C1[Input Validation]
    C --> C2[Response Mapping]
    C --> C3[Confidence Scoring]

    D --> D1[Retry Logic]
    D --> D2[Circuit Breaker]
    D --> D3[Graceful Degradation]

    E --> E1[Caching]
    E --> E2[Rate Limiting]
    E --> E3[Connection Pooling]

Basic Custom Feed

Simple Implementation

import { ThreatFeed } from 'trojanhorse-js/feeds';
import axios from 'axios';

class SimpleThreatFeed extends ThreatFeed {
  constructor(config) {
    super(config);
    this.name = 'SimpleThreatFeed';
    this.version = '1.0.0';
    this.baseUrl = config.baseUrl;
    this.apiKey = config.apiKey;

    // Configure HTTP client
    this.client = axios.create({
      baseURL: this.baseUrl,
      timeout: config.timeout || 10000,
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'User-Agent': `TrojanHorse.js/${this.version}`,
        'Content-Type': 'application/json'
      }
    });
  }

  // Required: Implement fetchThreatData method
  async fetchThreatData(indicator, options = {}) {
    try {
      this.validateIndicator(indicator);

      const response = await this.client.get(`/threat/${encodeURIComponent(indicator)}`, {
        params: {
          format: 'json',
          include_details: options.includeMetadata || false,
          max_age: options.maxAge || 86400
        }
      });

      return this.transformToStandardFormat(response.data, indicator);
    } catch (error) {
      return this.handleApiError(error, indicator);
    }
  }

  // Transform API response to TrojanHorse standard format
  transformToStandardFormat(apiResponse, indicator) {
    return {
      indicator: indicator,
      type: this.detectIndicatorType(indicator),
      threat: apiResponse.is_malicious || false,
      confidence: this.normalizeConfidence(apiResponse.score),
      sources: [this.name],
      timestamp: new Date(apiResponse.scan_date || Date.now()).toISOString(),

      // Core metadata
      metadata: {
        source: this.name,
        version: this.version,
        scan_id: apiResponse.scan_id,
        threat_types: apiResponse.categories || [],
        raw_score: apiResponse.score,
        last_updated: apiResponse.updated_at
      },

      // Additional correlation fields
      correlationScore: this.calculateCorrelationScore(apiResponse),
      consensusLevel: this.determineConsensusLevel(apiResponse),
      sources: [this.name]
    };
  }

  // Normalize confidence score to 0-100 range
  normalizeConfidence(score) {
    if (typeof score !== 'number') return 0;

    // Assuming API returns 0-1 range, convert to 0-100
    if (score >= 0 && score <= 1) {
      return Math.round(score * 100);
    }

    // If already 0-100 range
    if (score >= 0 && score <= 100) {
      return Math.round(score);
    }

    // Default fallback
    return 0;
  }

  // Calculate correlation score for threat correlation
  calculateCorrelationScore(apiResponse) {
    let score = 0;

    // Factor in threat categories
    if (apiResponse.categories && apiResponse.categories.length > 0) {
      score += apiResponse.categories.length * 10;
    }

    // Factor in confidence
    score += this.normalizeConfidence(apiResponse.score);

    // Factor in recency
    const daysSinceScan = (Date.now() - new Date(apiResponse.scan_date)) / (1000 * 60 * 60 * 24);
    score += Math.max(0, 30 - daysSinceScan); // More recent = higher score

    return Math.min(100, score);
  }

  // Determine consensus level
  determineConsensusLevel(apiResponse) {
    const confidence = this.normalizeConfidence(apiResponse.score);

    if (confidence >= 90) return 'consensus';
    if (confidence >= 70) return 'strong';
    if (confidence >= 50) return 'moderate';
    return 'weak';
  }

  // Validate indicator format
  validateIndicator(indicator) {
    if (!indicator || typeof indicator !== 'string') {
      throw new Error('Invalid indicator: must be a non-empty string');
    }

    if (indicator.length > 2048) {
      throw new Error('Invalid indicator: too long');
    }

    // Basic validation - extend as needed
    const validPatterns = [
      /^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$/, // Domain
      /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/, // IPv4
      /^https?:\/\/.+/, // URL
      /^[a-fA-F0-9]{32,128}$/ // Hash
    ];

    const isValid = validPatterns.some(pattern => pattern.test(indicator));
    if (!isValid) {
      throw new Error(`Invalid indicator format: ${indicator}`);
    }
  }

  // Handle API errors gracefully
  handleApiError(error, indicator) {
    console.error(`${this.name} API error for ${indicator}:`, error.message);

    // Return empty result instead of throwing
    return {
      indicator: indicator,
      type: this.detectIndicatorType(indicator),
      threat: false,
      confidence: 0,
      sources: [this.name],
      timestamp: new Date().toISOString(),
      metadata: {
        source: this.name,
        error: error.message,
        status: 'error'
      },
      correlationScore: 0,
      consensusLevel: 'weak',
      sources: [this.name]
    };
  }
}

// Usage
const customFeed = new SimpleThreatFeed({
  baseUrl: 'https://api.mythreatfeed.com',
  apiKey: process.env.CUSTOM_FEED_API_KEY,
  timeout: 5000
});

// Test the feed
const result = await customFeed.fetchThreatData('malicious-domain.com');
console.log('Threat result:', result);

Advanced Custom Feed

Enterprise-Grade Implementation

import { ThreatFeed, FeedError, RateLimitError } from 'trojanhorse-js/feeds';
import { CircuitBreaker } from 'trojanhorse-js/core';
import Redis from 'ioredis';

class EnterpriseThreatFeed extends ThreatFeed {
  constructor(config) {
    super(config);
    this.name = config.name || 'EnterpriseFeed';
    this.version = config.version || '1.0.0';
    this.baseUrl = config.baseUrl;
    this.apiKey = config.apiKey;

    // Initialize components
    this.setupHttpClient(config);
    this.setupCaching(config);
    this.setupCircuitBreaker(config);
    this.setupMetrics();
  }

  setupHttpClient(config) {
    this.client = axios.create({
      baseURL: this.baseUrl,
      timeout: config.timeout || 10000,
      maxRedirects: 3,
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'User-Agent': `TrojanHorse.js/${this.version}`,
        'Accept': 'application/json',
        'Content-Type': 'application/json'
      }
    });

    // Request interceptor for rate limiting
    this.client.interceptors.request.use(async (request) => {
      await this.checkRateLimit();
      return request;
    });

    // Response interceptor for error handling
    this.client.interceptors.response.use(
      (response) => response,
      (error) => this.handleHttpError(error)
    );
  }

  setupCaching(config) {
    if (config.cache && config.cache.enabled) {
      this.cache = new Redis({
        host: config.cache.host || 'localhost',
        port: config.cache.port || 6379,
        password: config.cache.password,
        db: config.cache.db || 0,
        retryDelayOnFailover: 100,
        maxRetriesPerRequest: 3
      });

      this.cacheTTL = config.cache.ttl || 3600; // 1 hour default
    }
  }

  setupCircuitBreaker(config) {
    this.circuitBreaker = new CircuitBreaker({
      threshold: config.circuitBreaker?.threshold || 5,
      timeout: config.circuitBreaker?.timeout || 60000,
      resetTimeout: config.circuitBreaker?.resetTimeout || 30000,

      onOpen: () => {
        console.warn(`${this.name}: Circuit breaker opened`);
        this.metrics.circuitBreakerOpens++;
      },

      onClose: () => {
        console.info(`${this.name}: Circuit breaker closed`);
        this.metrics.circuitBreakerCloses++;
      }
    });
  }

  setupMetrics() {
    this.metrics = {
      requestCount: 0,
      errorCount: 0,
      cacheHits: 0,
      cacheMisses: 0,
      circuitBreakerOpens: 0,
      circuitBreakerCloses: 0,
      averageResponseTime: 0,
      lastSuccessfulRequest: null,
      responseTimeHistory: []
    };

    // Clear old response time history every hour
    setInterval(() => {
      this.metrics.responseTimeHistory = this.metrics.responseTimeHistory.slice(-100);
    }, 3600000);
  }

  async fetchThreatData(indicator, options = {}) {
    const startTime = Date.now();
    this.metrics.requestCount++;

    try {
      this.validateIndicator(indicator);

      // Check cache first
      if (this.cache && !options.ignoreCache) {
        const cached = await this.getCachedResult(indicator);
        if (cached) {
          this.metrics.cacheHits++;
          return cached;
        }
        this.metrics.cacheMisses++;
      }

      // Use circuit breaker for API call
      const result = await this.circuitBreaker.execute(async () => {
        const response = await this.client.get(`/api/v2/threat/${encodeURIComponent(indicator)}`, {
          params: this.buildRequestParams(options),
          timeout: options.timeout || 10000
        });

        return this.transformToStandardFormat(response.data, indicator);
      });

      // Cache successful result
      if (this.cache && result && result.confidence > 0) {
        await this.cacheResult(indicator, result);
      }

      // Update metrics
      this.updateMetrics(startTime, true);

      return result;
    } catch (error) {
      this.updateMetrics(startTime, false);
      return this.handleError(error, indicator);
    }
  }

  async batchQuery(indicators, options = {}) {
    const batchSize = options.batchSize || 100;
    const concurrency = options.concurrency || 5;
    const results = [];

    // Process in batches
    for (let i = 0; i < indicators.length; i += batchSize) {
      const batch = indicators.slice(i, i + batchSize);

      // Process batch with concurrency control
      const batchPromises = [];
      for (let j = 0; j < batch.length; j += concurrency) {
        const concurrentBatch = batch.slice(j, j + concurrency);
        const concurrentPromises = concurrentBatch.map(indicator =>
          this.fetchThreatData(indicator, options).catch(error => ({
            indicator,
            error: error.message,
            threat: false,
            confidence: 0
          }))
        );

        batchPromises.push(Promise.all(concurrentPromises));
      }

      const batchResults = await Promise.all(batchPromises);
      results.push(...batchResults.flat());

      // Rate limiting delay between batches
      if (i + batchSize < indicators.length) {
        await this.delay(options.waitBetweenBatches || 1000);
      }
    }

    return results;
  }

  buildRequestParams(options) {
    const params = {
      format: 'json',
      api_version: '2.0'
    };

    if (options.includeMetadata) {
      params.include_metadata = true;
    }

    if (options.includeHistory) {
      params.include_history = true;
    }

    if (options.maxAge) {
      params.max_age = options.maxAge;
    }

    if (options.confidence) {
      params.min_confidence = options.confidence;
    }

    return params;
  }

  transformToStandardFormat(apiResponse, indicator) {
    // Enhanced transformation with validation
    const result = {
      indicator: indicator,
      type: this.detectIndicatorType(indicator),
      threat: Boolean(apiResponse.is_threat),
      confidence: this.normalizeConfidence(apiResponse.confidence_score),
      sources: [this.name],
      timestamp: new Date(apiResponse.scan_timestamp || Date.now()).toISOString(),

      metadata: {
        source: this.name,
        version: this.version,
        scan_id: apiResponse.scan_id,
        engine_version: apiResponse.engine_version,
        threat_categories: apiResponse.threat_categories || [],
        risk_score: apiResponse.risk_score,
        geographical_risk: apiResponse.geo_risk,
        temporal_risk: apiResponse.temporal_risk,
        behavioral_indicators: apiResponse.behavioral_indicators,
        infrastructure_analysis: apiResponse.infrastructure,
        attribution: apiResponse.attribution,
        campaigns: apiResponse.associated_campaigns,
        raw_response: options.includeRawResponse ? apiResponse : undefined
      },

      // Enhanced correlation fields
      correlationScore: this.calculateAdvancedCorrelationScore(apiResponse),
      consensusLevel: this.determineConsensusLevel(apiResponse),
      sources: [this.name],

      // Additional fields for advanced analysis
      riskFactors: this.extractRiskFactors(apiResponse),
      contextualInfo: this.extractContextualInfo(apiResponse),
      recommendedActions: this.generateRecommendedActions(apiResponse)
    };

    // Validate result before returning
    this.validateResult(result);
    return result;
  }

  calculateAdvancedCorrelationScore(apiResponse) {
    let score = 0;
    const weights = {
      confidence: 0.3,
      categories: 0.2,
      recency: 0.15,
      geography: 0.1,
      behavior: 0.15,
      infrastructure: 0.1
    };

    // Confidence component
    score += this.normalizeConfidence(apiResponse.confidence_score) * weights.confidence;

    // Threat categories component
    const categoryCount = (apiResponse.threat_categories || []).length;
    score += Math.min(categoryCount * 10, 50) * weights.categories;

    // Recency component
    const scanDate = new Date(apiResponse.scan_timestamp);
    const daysSinceScan = (Date.now() - scanDate.getTime()) / (1000 * 60 * 60 * 24);
    const recencyScore = Math.max(0, 100 - (daysSinceScan * 2));
    score += recencyScore * weights.recency;

    // Geographic risk component
    if (apiResponse.geo_risk) {
      score += apiResponse.geo_risk * weights.geography;
    }

    // Behavioral indicators component
    if (apiResponse.behavioral_indicators) {
      const behaviorScore = Object.values(apiResponse.behavioral_indicators).reduce((sum, val) => sum + val, 0);
      score += Math.min(behaviorScore, 100) * weights.behavior;
    }

    // Infrastructure analysis component
    if (apiResponse.infrastructure && apiResponse.infrastructure.risk_score) {
      score += apiResponse.infrastructure.risk_score * weights.infrastructure;
    }

    return Math.min(100, Math.round(score));
  }

  extractRiskFactors(apiResponse) {
    const riskFactors = [];

    if (apiResponse.threat_categories) {
      riskFactors.push(...apiResponse.threat_categories.map(cat => ({
        type: 'threat_category',
        value: cat,
        impact: 'high'
      })));
    }

    if (apiResponse.geo_risk && apiResponse.geo_risk > 70) {
      riskFactors.push({
        type: 'geographical_risk',
        value: 'high_risk_country',
        impact: 'medium'
      });
    }

    if (apiResponse.infrastructure && apiResponse.infrastructure.shared_hosting) {
      riskFactors.push({
        type: 'infrastructure',
        value: 'shared_hosting',
        impact: 'low'
      });
    }

    return riskFactors;
  }

  extractContextualInfo(apiResponse) {
    return {
      firstSeen: apiResponse.first_seen,
      lastSeen: apiResponse.last_seen,
      reportingOrganizations: apiResponse.reporting_orgs || [],
      associatedMalwareFamilies: apiResponse.malware_families || [],
      attackVectors: apiResponse.attack_vectors || [],
      victimSectors: apiResponse.victim_sectors || [],
      geographicalTargets: apiResponse.geo_targets || []
    };
  }

  generateRecommendedActions(apiResponse) {
    const actions = [];
    const confidence = this.normalizeConfidence(apiResponse.confidence_score);

    if (confidence >= 90) {
      actions.push('immediate_block');
      actions.push('alert_security_team');
      actions.push('initiate_incident_response');
    } else if (confidence >= 70) {
      actions.push('enhanced_monitoring');
      actions.push('add_to_watchlist');
      actions.push('notify_analysts');
    } else if (confidence >= 50) {
      actions.push('log_for_investigation');
      actions.push('periodic_recheck');
    }

    if (apiResponse.threat_categories && apiResponse.threat_categories.includes('malware')) {
      actions.push('scan_systems');
      actions.push('update_signatures');
    }

    return actions;
  }

  async getCachedResult(indicator) {
    if (!this.cache) return null;

    try {
      const cached = await this.cache.get(`threat:${this.name}:${indicator}`);
      return cached ? JSON.parse(cached) : null;
    } catch (error) {
      console.warn(`Cache retrieval error: ${error.message}`);
      return null;
    }
  }

  async cacheResult(indicator, result) {
    if (!this.cache) return;

    try {
      await this.cache.setex(
        `threat:${this.name}:${indicator}`,
        this.cacheTTL,
        JSON.stringify(result)
      );
    } catch (error) {
      console.warn(`Cache storage error: ${error.message}`);
    }
  }

  async checkRateLimit() {
    // Implement rate limiting logic
    // This is a simple example - use more sophisticated rate limiting in production
    const rateLimitKey = `rate_limit:${this.name}`;

    if (this.cache) {
      const currentCount = await this.cache.get(rateLimitKey) || 0;
      const maxRequests = this.config.rateLimit?.requestsPerMinute || 60;

      if (currentCount >= maxRequests) {
        throw new RateLimitError(`Rate limit exceeded: ${maxRequests} requests per minute`);
      }

      // Increment counter with expiration
      await this.cache.multi()
        .incr(rateLimitKey)
        .expire(rateLimitKey, 60)
        .exec();
    }
  }

  updateMetrics(startTime, success) {
    const responseTime = Date.now() - startTime;
    this.metrics.responseTimeHistory.push(responseTime);

    // Calculate moving average
    const recentTimes = this.metrics.responseTimeHistory.slice(-50);
    this.metrics.averageResponseTime = recentTimes.reduce((sum, time) => sum + time, 0) / recentTimes.length;

    if (success) {
      this.metrics.lastSuccessfulRequest = new Date().toISOString();
    } else {
      this.metrics.errorCount++;
    }
  }

  async getHealthStatus() {
    try {
      // Test API connectivity
      const healthResponse = await this.client.get('/health', { timeout: 5000 });

      return {
        status: 'healthy',
        lastSuccessfulRequest: this.metrics.lastSuccessfulRequest,
        requestCount: this.metrics.requestCount,
        errorRate: this.metrics.requestCount > 0 ? this.metrics.errorCount / this.metrics.requestCount : 0,
        averageResponseTime: Math.round(this.metrics.averageResponseTime),
        cacheHitRate: this.metrics.cacheHits / (this.metrics.cacheHits + this.metrics.cacheMisses) || 0,
        circuitBreakerStatus: this.circuitBreaker.getState(),
        apiVersion: healthResponse.data?.version,
        capabilities: healthResponse.data?.capabilities || []
      };
    } catch (error) {
      return {
        status: 'unhealthy',
        error: error.message,
        lastSuccessfulRequest: this.metrics.lastSuccessfulRequest,
        requestCount: this.metrics.requestCount,
        errorRate: this.metrics.requestCount > 0 ? this.metrics.errorCount / this.metrics.requestCount : 0
      };
    }
  }

  validateResult(result) {
    const required = ['indicator', 'type', 'threat', 'confidence', 'sources', 'timestamp'];

    for (const field of required) {
      if (result[field] === undefined || result[field] === null) {
        throw new Error(`Missing required field in result: ${field}`);
      }
    }

    if (typeof result.confidence !== 'number' || result.confidence < 0 || result.confidence > 100) {
      throw new Error(`Invalid confidence score: ${result.confidence}`);
    }
  }

  handleHttpError(error) {
    if (error.response) {
      // API responded with error status
      const status = error.response.status;
      const data = error.response.data;

      if (status === 429) {
        const retryAfter = error.response.headers['retry-after'] || 60;
        throw new RateLimitError(`Rate limit exceeded. Retry after ${retryAfter} seconds.`, retryAfter);
      } else if (status === 401 || status === 403) {
        throw new ApiKeyError(`Authentication failed: ${data?.message || 'Invalid API key'}`);
      } else if (status >= 500) {
        throw new FeedError(`Server error: ${status} - ${data?.message || 'Internal server error'}`);
      } else {
        throw new FeedError(`API error: ${status} - ${data?.message || 'Unknown error'}`);
      }
    } else if (error.request) {
      // Request made but no response received
      throw new NetworkError('No response received from API');
    } else {
      // Error in request setup
      throw new FeedError(`Request setup error: ${error.message}`);
    }
  }
}

// Usage
const enterpriseFeed = new EnterpriseThreatFeed({
  name: 'EnterpriseThreatIntel',
  version: '2.1.0',
  baseUrl: 'https://api.enterprise-threat-intel.com',
  apiKey: process.env.ENTERPRISE_API_KEY,
  timeout: 15000,

  cache: {
    enabled: true,
    host: 'redis.company.com',
    port: 6379,
    password: process.env.REDIS_PASSWORD,
    ttl: 3600
  },

  circuitBreaker: {
    threshold: 5,
    timeout: 60000,
    resetTimeout: 30000
  },

  rateLimit: {
    requestsPerMinute: 100
  }
});

Database Integration Feed

Internal Database Feed

import { ThreatFeed } from 'trojanhorse-js/feeds';
import { Pool } from 'pg'; // PostgreSQL client

class DatabaseThreatFeed extends ThreatFeed {
  constructor(config) {
    super(config);
    this.name = 'InternalDatabase';
    this.version = '1.0.0';

    // Initialize database connection pool
    this.pool = new Pool({
      host: config.database.host,
      port: config.database.port,
      database: config.database.name,
      user: config.database.user,
      password: config.database.password,
      max: config.database.maxConnections || 20,
      idleTimeoutMillis: 30000,
      connectionTimeoutMillis: 2000,
    });

    this.setupQueries();
  }

  setupQueries() {
    this.queries = {
      findThreat: `
        SELECT 
          indicator,
          threat_type,
          confidence_score,
          first_seen,
          last_seen,
          source_systems,
          threat_categories,
          risk_level,
          metadata,
          created_at,
          updated_at
        FROM threat_intelligence 
        WHERE indicator = $1 
          AND active = true 
          AND (expires_at IS NULL OR expires_at > NOW())
        ORDER BY confidence_score DESC, updated_at DESC
        LIMIT 1
      `,

      findRelatedThreats: `
        SELECT 
          indicator,
          threat_type,
          confidence_score,
          correlation_score
        FROM threat_intelligence 
        WHERE (
          domain_root = get_domain_root($1) 
          OR ip_subnet = get_ip_subnet($1)
          OR sha256_prefix = LEFT($1, 8)
        )
          AND indicator != $1
          AND active = true
          AND confidence_score > 50
        ORDER BY correlation_score DESC
        LIMIT 10
      `,

      insertThreat: `
        INSERT INTO threat_intelligence (
          indicator, threat_type, confidence_score, first_seen, last_seen,
          source_systems, threat_categories, risk_level, metadata, created_at, updated_at
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW())
        ON CONFLICT (indicator) DO UPDATE SET
          confidence_score = GREATEST(threat_intelligence.confidence_score, EXCLUDED.confidence_score),
          last_seen = EXCLUDED.last_seen,
          source_systems = EXCLUDED.source_systems,
          metadata = EXCLUDED.metadata,
          updated_at = NOW()
        RETURNING id
      `,

      getHealthMetrics: `
        SELECT 
          COUNT(*) as total_indicators,
          COUNT(*) FILTER (WHERE confidence_score >= 90) as high_confidence,
          COUNT(*) FILTER (WHERE confidence_score >= 70) as medium_confidence,
          COUNT(*) FILTER (WHERE updated_at > NOW() - INTERVAL '24 hours') as recent_updates,
          AVG(confidence_score) as avg_confidence
        FROM threat_intelligence 
        WHERE active = true
      `
    };
  }

  async fetchThreatData(indicator, options = {}) {
    const client = await this.pool.connect();

    try {
      this.validateIndicator(indicator);

      // Query main threat data
      const threatResult = await client.query(this.queries.findThreat, [indicator]);

      if (threatResult.rows.length === 0) {
        return this.createEmptyResult(indicator);
      }

      const threatData = threatResult.rows[0];

      // Optionally get related threats for correlation
      let relatedThreats = [];
      if (options.includeCorrelation) {
        const correlationResult = await client.query(this.queries.findRelatedThreats, [indicator]);
        relatedThreats = correlationResult.rows;
      }

      return this.transformDatabaseResult(threatData, relatedThreats, indicator);
    } catch (error) {
      console.error(`Database query error for ${indicator}:`, error.message);
      return this.createErrorResult(indicator, error.message);
    } finally {
      client.release();
    }
  }

  transformDatabaseResult(threatData, relatedThreats, indicator) {
    const confidence = parseInt(threatData.confidence_score) || 0;

    return {
      indicator: indicator,
      type: this.detectIndicatorType(indicator),
      threat: confidence > 50,
      confidence: confidence,
      sources: [this.name, ...(threatData.source_systems || [])],
      timestamp: new Date(threatData.updated_at).toISOString(),

      metadata: {
        source: this.name,
        database_id: threatData.id,
        threat_type: threatData.threat_type,
        risk_level: threatData.risk_level,
        threat_categories: threatData.threat_categories || [],
        first_seen: threatData.first_seen,
        last_seen: threatData.last_seen,
        source_systems: threatData.source_systems || [],
        related_threats_count: relatedThreats.length,
        database_metadata: threatData.metadata || {},
        query_timestamp: new Date().toISOString()
      },

      correlationScore: this.calculateDatabaseCorrelationScore(threatData, relatedThreats),
      consensusLevel: this.determineDatabaseConsensusLevel(threatData, relatedThreats),
      sources: [this.name, ...(threatData.source_systems || [])],

      // Include related threats for correlation analysis
      relatedThreats: relatedThreats.map(rt => ({
        indicator: rt.indicator,
        type: rt.threat_type,
        confidence: parseInt(rt.confidence_score),
        correlationScore: parseInt(rt.correlation_score) || 0
      }))
    };
  }

  calculateDatabaseCorrelationScore(threatData, relatedThreats) {
    let score = parseInt(threatData.confidence_score) || 0;

    // Boost score based on number of related threats
    score += Math.min(relatedThreats.length * 5, 25);

    // Boost score based on multiple source systems
    const sourceSystems = threatData.source_systems || [];
    score += Math.min(sourceSystems.length * 3, 15);

    // Boost score based on threat categories
    const categories = threatData.threat_categories || [];
    score += Math.min(categories.length * 2, 10);

    return Math.min(100, score);
  }

  determineDatabaseConsensusLevel(threatData, relatedThreats) {
    const confidence = parseInt(threatData.confidence_score) || 0;
    const sourceSystems = (threatData.source_systems || []).length;
    const relatedCount = relatedThreats.length;

    if (confidence >= 90 && sourceSystems >= 3 && relatedCount >= 3) {
      return 'consensus';
    } else if (confidence >= 80 && sourceSystems >= 2) {
      return 'strong';
    } else if (confidence >= 60) {
      return 'moderate';
    } else {
      return 'weak';
    }
  }

  async storeThreatData(threatResult) {
    const client = await this.pool.connect();

    try {
      await client.query(this.queries.insertThreat, [
        threatResult.indicator,
        threatResult.type,
        threatResult.confidence,
        new Date(threatResult.metadata.first_seen || threatResult.timestamp),
        new Date(threatResult.metadata.last_seen || threatResult.timestamp),
        threatResult.sources,
        threatResult.metadata.threat_categories || [],
        threatResult.metadata.risk_level || 'medium',
        JSON.stringify(threatResult.metadata)
      ]);

      console.log(`Stored threat data for: ${threatResult.indicator}`);
    } catch (error) {
      console.error(`Error storing threat data: ${error.message}`);
      throw error;
    } finally {
      client.release();
    }
  }

  async batchQuery(indicators, options = {}) {
    const batchSize = options.batchSize || 1000;
    const results = [];

    for (let i = 0; i < indicators.length; i += batchSize) {
      const batch = indicators.slice(i, i + batchSize);

      const client = await this.pool.connect();
      try {
        // Use SQL IN clause for batch querying
        const placeholders = batch.map((_, index) => `$${index + 1}`).join(',');
        const query = `
          SELECT 
            indicator, threat_type, confidence_score, first_seen, last_seen,
            source_systems, threat_categories, risk_level, metadata, updated_at
          FROM threat_intelligence 
          WHERE indicator IN (${placeholders})
            AND active = true 
            AND (expires_at IS NULL OR expires_at > NOW())
          ORDER BY confidence_score DESC
        `;

        const result = await client.query(query, batch);

        // Transform all results
        const batchResults = result.rows.map(row => 
          this.transformDatabaseResult(row, [], row.indicator)
        );

        // Add empty results for indicators not found
        const foundIndicators = new Set(result.rows.map(row => row.indicator));
        const missingResults = batch
          .filter(indicator => !foundIndicators.has(indicator))
          .map(indicator => this.createEmptyResult(indicator));

        results.push(...batchResults, ...missingResults);
      } finally {
        client.release();
      }
    }

    return results;
  }

  async getHealthStatus() {
    const client = await this.pool.connect();

    try {
      const result = await client.query(this.queries.getHealthMetrics);
      const metrics = result.rows[0];

      return {
        status: 'healthy',
        database: {
          connected: true,
          total_indicators: parseInt(metrics.total_indicators),
          high_confidence_indicators: parseInt(metrics.high_confidence),
          medium_confidence_indicators: parseInt(metrics.medium_confidence),
          recent_updates: parseInt(metrics.recent_updates),
          average_confidence: parseFloat(metrics.avg_confidence)
        },
        connection_pool: {
          total_connections: this.pool.totalCount,
          idle_connections: this.pool.idleCount,
          waiting_clients: this.pool.waitingCount
        },
        last_query: new Date().toISOString()
      };
    } catch (error) {
      return {
        status: 'unhealthy',
        error: error.message,
        database: { connected: false }
      };
    } finally {
      client.release();
    }
  }

  async close() {
    await this.pool.end();
  }
}

// Usage
const databaseFeed = new DatabaseThreatFeed({
  database: {
    host: 'threat-db.company.com',
    port: 5432,
    name: 'threat_intelligence',
    user: process.env.DB_USER,
    password: process.env.DB_PASSWORD,
    maxConnections: 20
  }
});

File-Based Feed

CSV/JSON File Feed

import { ThreatFeed } from 'trojanhorse-js/feeds';
import fs from 'fs/promises';
import csv from 'csv-parser';
import { createReadStream } from 'fs';

class FileThreatFeed extends ThreatFeed {
  constructor(config) {
    super(config);
    this.name = 'FileFeed';
    this.version = '1.0.0';
    this.filePath = config.filePath;
    this.fileFormat = config.fileFormat || 'csv'; // 'csv', 'json', 'txt'
    this.watchForChanges = config.watchForChanges || false;

    this.data = new Map();
    this.lastModified = null;

    this.initializeFileWatcher();
  }

  async initializeFileWatcher() {
    // Load initial data
    await this.loadDataFromFile();

    // Watch for file changes
    if (this.watchForChanges) {
      const chokidar = await import('chokidar');
      this.watcher = chokidar.watch(this.filePath);

      this.watcher.on('change', async () => {
        console.log(`File ${this.filePath} changed, reloading data...`);
        await this.loadDataFromFile();
      });
    }
  }

  async loadDataFromFile() {
    try {
      const stats = await fs.stat(this.filePath);

      // Skip if file hasn't changed
      if (this.lastModified && stats.mtime <= this.lastModified) {
        return;
      }

      this.lastModified = stats.mtime;

      switch (this.fileFormat) {
        case 'csv':
          await this.loadCSVData();
          break;
        case 'json':
          await this.loadJSONData();
          break;
        case 'txt':
          await this.loadTextData();
          break;
        default:
          throw new Error(`Unsupported file format: ${this.fileFormat}`);
      }

      console.log(`Loaded ${this.data.size} threat indicators from ${this.filePath}`);
    } catch (error) {
      console.error(`Error loading data from file: ${error.message}`);
      throw error;
    }
  }

  async loadCSVData() {
    return new Promise((resolve, reject) => {
      const newData = new Map();

      createReadStream(this.filePath)
        .pipe(csv())
        .on('data', (row) => {
          try {
            const indicator = row.indicator || row.ioc || row.value;
            if (!indicator) return;

            const threatData = {
              indicator: indicator.trim(),
              type: row.type || this.detectIndicatorType(indicator),
              threat: this.parseBoolean(row.threat || row.malicious),
              confidence: parseInt(row.confidence || row.score || 50),
              categories: this.parseArray(row.categories || row.tags),
              first_seen: row.first_seen || row.date,
              source: row.source || 'file',
              description: row.description || row.comment,
              metadata: {
                line_number: newData.size + 1,
                raw_data: row
              }
            };

            newData.set(indicator.toLowerCase(), threatData);
          } catch (error) {
            console.warn(`Error parsing CSV row: ${error.message}`);
          }
        })
        .on('end', () => {
          this.data = newData;
          resolve();
        })
        .on('error', reject);
    });
  }

  async loadJSONData() {
    const fileContent = await fs.readFile(this.filePath, 'utf-8');
    const jsonData = JSON.parse(fileContent);

    const newData = new Map();

    // Handle different JSON structures
    const indicators = Array.isArray(jsonData) ? jsonData : jsonData.indicators || jsonData.threats;

    if (!Array.isArray(indicators)) {
      throw new Error('JSON file must contain an array of indicators');
    }

    indicators.forEach((item, index) => {
      try {
        const indicator = item.indicator || item.ioc || item.value;
        if (!indicator) return;

        const threatData = {
          indicator: indicator.trim(),
          type: item.type || this.detectIndicatorType(indicator),
          threat: Boolean(item.threat || item.malicious || item.is_malicious),
          confidence: parseInt(item.confidence || item.score || 50),
          categories: Array.isArray(item.categories) ? item.categories : (item.tags || []),
          first_seen: item.first_seen || item.date,
          source: item.source || 'file',
          description: item.description || item.comment,
          metadata: {
            index: index,
            raw_data: item
          }
        };

        newData.set(indicator.toLowerCase(), threatData);
      } catch (error) {
        console.warn(`Error parsing JSON item at index ${index}: ${error.message}`);
      }
    });

    this.data = newData;
  }

  async loadTextData() {
    const fileContent = await fs.readFile(this.filePath, 'utf-8');
    const lines = fileContent.split('\n').filter(line => line.trim());

    const newData = new Map();

    lines.forEach((line, index) => {
      const indicator = line.trim();
      if (!indicator || indicator.startsWith('#')) return; // Skip comments

      const threatData = {
        indicator: indicator,
        type: this.detectIndicatorType(indicator),
        threat: true, // Assume all indicators in file are threats
        confidence: 75, // Default confidence
        categories: ['file_feed'],
        first_seen: this.lastModified?.toISOString(),
        source: 'file',
        description: 'Indicator from threat feed file',
        metadata: {
          line_number: index + 1
        }
      };

      newData.set(indicator.toLowerCase(), threatData);
    });

    this.data = newData;
  }

  async fetchThreatData(indicator, options = {}) {
    try {
      this.validateIndicator(indicator);

      const threatData = this.data.get(indicator.toLowerCase());

      if (!threatData) {
        return this.createEmptyResult(indicator);
      }

      return this.transformFileResult(threatData, indicator);
    } catch (error) {
      return this.createErrorResult(indicator, error.message);
    }
  }

  transformFileResult(threatData, indicator) {
    return {
      indicator: indicator,
      type: threatData.type,
      threat: threatData.threat,
      confidence: threatData.confidence,
      sources: [this.name],
      timestamp: new Date().toISOString(),

      metadata: {
        source: this.name,
        file_path: this.filePath,
        file_format: this.fileFormat,
        categories: threatData.categories,
        first_seen: threatData.first_seen,
        source_system: threatData.source,
        description: threatData.description,
        file_metadata: threatData.metadata,
        last_file_update: this.lastModified?.toISOString()
      },

      correlationScore: this.calculateFileCorrelationScore(threatData),
      consensusLevel: this.determineFileConsensusLevel(threatData),
      sources: [this.name]
    };
  }

  calculateFileCorrelationScore(threatData) {
    let score = threatData.confidence;

    // Boost score based on categories
    if (threatData.categories && threatData.categories.length > 0) {
      score += threatData.categories.length * 5;
    }

    // Boost score if description is provided
    if (threatData.description && threatData.description.length > 10) {
      score += 10;
    }

    return Math.min(100, score);
  }

  determineFileConsensusLevel(threatData) {
    if (threatData.confidence >= 90) return 'consensus';
    if (threatData.confidence >= 70) return 'strong';
    if (threatData.confidence >= 50) return 'moderate';
    return 'weak';
  }

  async batchQuery(indicators, options = {}) {
    return indicators.map(indicator => {
      try {
        return this.fetchThreatData(indicator, options);
      } catch (error) {
        return this.createErrorResult(indicator, error.message);
      }
    });
  }

  async getHealthStatus() {
    try {
      const stats = await fs.stat(this.filePath);

      return {
        status: 'healthy',
        file: {
          path: this.filePath,
          format: this.fileFormat,
          size: stats.size,
          last_modified: stats.mtime.toISOString(),
          indicators_loaded: this.data.size
        },
        watching_for_changes: this.watchForChanges,
        last_reload: this.lastModified?.toISOString()
      };
    } catch (error) {
      return {
        status: 'unhealthy',
        error: error.message,
        file: { path: this.filePath }
      };
    }
  }

  parseBoolean(value) {
    if (typeof value === 'boolean') return value;
    if (typeof value === 'string') {
      return ['true', '1', 'yes', 'malicious', 'threat'].includes(value.toLowerCase());
    }
    return Boolean(value);
  }

  parseArray(value) {
    if (Array.isArray(value)) return value;
    if (typeof value === 'string') {
      return value.split(',').map(item => item.trim()).filter(Boolean);
    }
    return [];
  }

  async close() {
    if (this.watcher) {
      await this.watcher.close();
    }
  }
}

// Usage
const fileFeed = new FileThreatFeed({
  filePath: './threat-feeds/malware-domains.csv',
  fileFormat: 'csv',
  watchForChanges: true
});

Integration with TrojanHorse

Register Custom Feeds

import { TrojanHorse } from 'trojanhorse-js';

// Initialize TrojanHorse with custom feeds
const trojan = new TrojanHorse({
  sources: [], // Will register custom feeds separately
  customFeeds: {
    enterprise: enterpriseFeed,
    database: databaseFeed,
    file: fileFeed
  }
});

// Or register feeds individually
trojan.registerFeed('enterprise', enterpriseFeed);
trojan.registerFeed('database', databaseFeed);
trojan.registerFeed('file', fileFeed);

// Use custom feeds in queries
const results = await trojan.scout('malicious-domain.com', {
  sources: ['enterprise', 'database', 'file']
});

console.log('Custom feed results:', results);

// Test all custom feeds
const healthStatus = await trojan.getHealthStatus();
console.log('Feed health:', healthStatus.feeds);

Feed Management

// Create feed manager for advanced operations
class CustomFeedManager {
  constructor() {
    this.feeds = new Map();
    this.metrics = new Map();
  }

  registerFeed(name, feed) {
    this.feeds.set(name, feed);
    this.metrics.set(name, {
      queries: 0,
      errors: 0,
      averageResponseTime: 0,
      lastQuery: null
    });
  }

  async queryAllFeeds(indicator, options = {}) {
    const results = [];
    const promises = [];

    for (const [name, feed] of this.feeds) {
      const promise = this.queryFeedWithMetrics(name, feed, indicator, options);
      promises.push(promise);
    }

    const settledResults = await Promise.allSettled(promises);

    return settledResults.map((result, index) => {
      const feedName = Array.from(this.feeds.keys())[index];

      if (result.status === 'fulfilled') {
        return { feed: feedName, ...result.value };
      } else {
        return {
          feed: feedName,
          error: result.reason.message,
          indicator: indicator,
          threat: false,
          confidence: 0
        };
      }
    });
  }

  async queryFeedWithMetrics(name, feed, indicator, options) {
    const startTime = Date.now();
    const metrics = this.metrics.get(name);

    try {
      metrics.queries++;
      const result = await feed.fetchThreatData(indicator, options);

      // Update metrics
      const responseTime = Date.now() - startTime;
      metrics.averageResponseTime = (metrics.averageResponseTime + responseTime) / 2;
      metrics.lastQuery = new Date().toISOString();

      return result;
    } catch (error) {
      metrics.errors++;
      throw error;
    }
  }

  getMetrics() {
    const metricsReport = {};

    for (const [name, metrics] of this.metrics) {
      metricsReport[name] = {
        ...metrics,
        errorRate: metrics.queries > 0 ? metrics.errors / metrics.queries : 0
      };
    }

    return metricsReport;
  }

  async healthCheck() {
    const healthReport = {};

    for (const [name, feed] of this.feeds) {
      try {
        healthReport[name] = await feed.getHealthStatus();
      } catch (error) {
        healthReport[name] = {
          status: 'unhealthy',
          error: error.message
        };
      }
    }

    return healthReport;
  }
}

// Usage
const feedManager = new CustomFeedManager();
feedManager.registerFeed('enterprise', enterpriseFeed);
feedManager.registerFeed('database', databaseFeed);
feedManager.registerFeed('file', fileFeed);

// Query all custom feeds
const allResults = await feedManager.queryAllFeeds('malicious-domain.com');
console.log('All feed results:', allResults);

// Get performance metrics
const metrics = feedManager.getMetrics();
console.log('Feed metrics:', metrics);

// Health check all feeds
const health = await feedManager.healthCheck();
console.log('Feed health:', health);

Next Steps: - Review Advanced Examples for complex enterprise integrations - Check Enterprise Examples for large-scale deployments - Explore Basic Examples for simple use cases