Skip to main content

Batch Screening

Run periodic fraud audits by screening your customer database against updated blocklists.

Business Scenario

On a regular schedule (daily/weekly), you want to:
  1. Screen all active customer VPAs against blocklists
  2. Identify newly flagged accounts
  3. Generate compliance reports
  4. Take automated actions on flagged accounts

Architecture

Implementation

Step 1: Create Batch Screening Service

// services/batch-screening.ts
import { TxnCheckClient } from './fraud-buster';

export interface CustomerVPA {
  customerId: string;
  vpa: string;
  accountStatus: string;
}

export interface ScreeningResult {
  runId: string;
  startTime: Date;
  endTime: Date;
  totalVPAs: number;
  cleanCount: number;
  blocklistedCount: number;
  errorCount: number;
  blocklisted: BlocklistedVPA[];
  errors: ScreeningError[];
}

export interface BlocklistedVPA {
  customerId: string;
  vpa: string;
  source: string;
  detectedAt: Date;
}

export interface ScreeningError {
  batch: number;
  error: string;
  vpas: string[];
}

export class BatchScreeningService {
  private client: TxnCheckClient;
  private readonly BATCH_SIZE = 100;
  private readonly BATCH_DELAY_MS = 6000; // 10 requests/minute rate limit

  constructor(apiKey: string) {
    this.client = new TxnCheckClient({ apiKey });
  }

  async screenAllCustomers(
    customers: CustomerVPA[],
    options: { onProgress?: (progress: number) => void } = {}
  ): Promise<ScreeningResult> {
    const runId = `screen-${Date.now()}`;
    const startTime = new Date();
    
    const result: ScreeningResult = {
      runId,
      startTime,
      endTime: startTime,
      totalVPAs: customers.length,
      cleanCount: 0,
      blocklistedCount: 0,
      errorCount: 0,
      blocklisted: [],
      errors: [],
    };

    // Create VPA to customer mapping
    const vpaToCustomer = new Map<string, string>();
    customers.forEach((c) => {
      vpaToCustomer.set(c.vpa.toLowerCase(), c.customerId);
    });

    // Extract unique VPAs
    const uniqueVpas = [...new Set(customers.map((c) => c.vpa))];
    
    // Batch VPAs into groups of 100
    const batches: string[][] = [];
    for (let i = 0; i < uniqueVpas.length; i += this.BATCH_SIZE) {
      batches.push(uniqueVpas.slice(i, i + this.BATCH_SIZE));
    }

    console.log(`Starting screening run ${runId}: ${uniqueVpas.length} VPAs in ${batches.length} batches`);

    // Process batches
    for (let i = 0; i < batches.length; i++) {
      const batch = batches[i];
      
      try {
        // Use sync mode for immediate results
        const response = await this.client.bulkVpaCheck(batch, { sync: true });

        if (response.status === 'COMPLETED' && response.result) {
          // Process blocklisted VPAs
          const blocklisted = response.result.blocklisted || [];
          for (const item of blocklisted) {
            const customerId = vpaToCustomer.get(item.vpa.toLowerCase());
            if (customerId) {
              result.blocklisted.push({
                customerId,
                vpa: item.vpa,
                source: item.source || 'unknown',
                detectedAt: new Date(),
              });
            }
          }
          
          result.blocklistedCount += blocklisted.length;
          result.cleanCount += (response.result.clean || []).length;
        } else {
          throw new Error(`Unexpected status: ${response.status}`);
        }

      } catch (error) {
        console.error(`Batch ${i + 1} failed:`, error);
        result.errors.push({
          batch: i + 1,
          error: error instanceof Error ? error.message : 'Unknown error',
          vpas: batch,
        });
        result.errorCount += batch.length;
      }

      // Report progress
      const progress = Math.round(((i + 1) / batches.length) * 100);
      options.onProgress?.(progress);

      // Rate limiting delay (except for last batch)
      if (i < batches.length - 1) {
        await this.sleep(this.BATCH_DELAY_MS);
      }
    }

    result.endTime = new Date();
    console.log(`Screening run ${runId} completed in ${result.endTime.getTime() - startTime.getTime()}ms`);

    return result;
  }

  private sleep(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }
}

Step 2: Scheduled Job Implementation

// jobs/daily-screening.ts
import { BatchScreeningService, ScreeningResult } from '../services/batch-screening';

interface ScreeningJob {
  runScreening(): Promise<void>;
}

export class DailyScreeningJob implements ScreeningJob {
  private screeningService: BatchScreeningService;

  constructor(apiKey: string) {
    this.screeningService = new BatchScreeningService(apiKey);
  }

  async runScreening(): Promise<void> {
    console.log('Starting daily VPA screening...');

    try {
      // Step 1: Load active customers with VPAs
      const customers = await this.loadActiveCustomers();
      console.log(`Loaded ${customers.length} customers for screening`);

      // Step 2: Run screening
      const result = await this.screeningService.screenAllCustomers(customers, {
        onProgress: (progress) => {
          console.log(`Screening progress: ${progress}%`);
        },
      });

      // Step 3: Process results
      await this.processResults(result);

      // Step 4: Generate report
      await this.generateReport(result);

      console.log('Daily screening completed successfully');

    } catch (error) {
      console.error('Daily screening failed:', error);
      await this.alertOnFailure(error);
      throw error;
    }
  }

  private async loadActiveCustomers(): Promise<{ customerId: string; vpa: string; accountStatus: string }[]> {
    // Query your database for active customers with VPAs
    // Example using Prisma:
    /*
    return await prisma.customer.findMany({
      where: {
        status: 'active',
        linkedVpas: { isEmpty: false },
      },
      select: {
        id: true,
        linkedVpas: true,
        status: true,
      },
    }).then(customers => 
      customers.flatMap(c => 
        c.linkedVpas.map(vpa => ({
          customerId: c.id,
          vpa,
          accountStatus: c.status,
        }))
      )
    );
    */
    
    // Placeholder
    return [];
  }

  private async processResults(result: ScreeningResult): Promise<void> {
    if (result.blocklisted.length === 0) {
      console.log('No blocklisted VPAs found');
      return;
    }

    console.log(`Found ${result.blocklisted.length} blocklisted VPAs`);

    // Group by customer
    const byCustomer = new Map<string, typeof result.blocklisted>();
    for (const item of result.blocklisted) {
      const existing = byCustomer.get(item.customerId) || [];
      existing.push(item);
      byCustomer.set(item.customerId, existing);
    }

    // Process each affected customer
    for (const [customerId, vpas] of byCustomer) {
      await this.handleBlocklistedCustomer(customerId, vpas);
    }
  }

  private async handleBlocklistedCustomer(
    customerId: string,
    blocklisted: { vpa: string; source: string }[]
  ): Promise<void> {
    console.log(`Processing blocklisted customer: ${customerId}`);

    // Option 1: Suspend account
    await this.suspendAccount(customerId, 'VPA blocklisted');

    // Option 2: Remove blocklisted VPAs
    for (const item of blocklisted) {
      await this.removeVpaFromAccount(customerId, item.vpa);
    }

    // Option 3: Alert compliance team
    await this.createComplianceTicket(customerId, blocklisted);

    // Option 4: Notify customer
    await this.notifyCustomer(customerId, blocklisted.length);
  }

  private async suspendAccount(customerId: string, reason: string): Promise<void> {
    // Update customer status in database
    console.log(`Suspending account ${customerId}: ${reason}`);
  }

  private async removeVpaFromAccount(customerId: string, vpa: string): Promise<void> {
    // Remove VPA from customer's linked VPAs
    console.log(`Removing VPA ${vpa} from customer ${customerId}`);
  }

  private async createComplianceTicket(
    customerId: string,
    blocklisted: { vpa: string; source: string }[]
  ): Promise<void> {
    // Create ticket in your ticketing system
    console.log(`Creating compliance ticket for customer ${customerId}`);
  }

  private async notifyCustomer(customerId: string, vpaCount: number): Promise<void> {
    // Send notification to customer
    console.log(`Notifying customer ${customerId} about ${vpaCount} flagged VPAs`);
  }

  private async generateReport(result: ScreeningResult): Promise<void> {
    const report = {
      runId: result.runId,
      executionTime: result.endTime.getTime() - result.startTime.getTime(),
      summary: {
        totalScreened: result.totalVPAs,
        clean: result.cleanCount,
        blocklisted: result.blocklistedCount,
        errors: result.errorCount,
        successRate: ((result.cleanCount + result.blocklistedCount) / result.totalVPAs * 100).toFixed(2),
      },
      blocklistedDetails: result.blocklisted.map((b) => ({
        customerId: b.customerId,
        vpa: b.vpa,
        source: b.source,
      })),
      errors: result.errors,
      timestamp: new Date().toISOString(),
    };

    // Save report
    await this.saveReport(report);

    // Send to stakeholders
    await this.emailReport(report);
  }

  private async saveReport(report: any): Promise<void> {
    // Save to database or file storage
    console.log('Report saved:', report.runId);
  }

  private async emailReport(report: any): Promise<void> {
    // Send email to compliance team
    console.log('Report emailed');
  }

  private async alertOnFailure(error: unknown): Promise<void> {
    // Send alert on job failure
    console.error('Screening job failure alert sent');
  }
}

// Schedule with node-cron
import cron from 'node-cron';

const job = new DailyScreeningJob(process.env.FRAUD_BUSTER_API_KEY!);

// Run daily at 2 AM
cron.schedule('0 2 * * *', async () => {
  try {
    await job.runScreening();
  } catch (error) {
    console.error('Scheduled screening failed:', error);
  }
});

Step 3: API for Manual Runs

// routes/screening.ts
import express from 'express';
import { BatchScreeningService } from '../services/batch-screening';

const router = express.Router();
const screeningService = new BatchScreeningService(process.env.FRAUD_BUSTER_API_KEY!);

// Trigger manual screening run
router.post('/run', async (req, res) => {
  const { customerIds } = req.body;

  // Load specific customers or all if not specified
  let customers;
  if (customerIds && customerIds.length > 0) {
    customers = await loadCustomersByIds(customerIds);
  } else {
    customers = await loadAllActiveCustomers();
  }

  // Start screening in background
  const jobId = `manual-${Date.now()}`;
  
  // Don't await - run in background
  screeningService.screenAllCustomers(customers, {
    onProgress: (progress) => {
      // Update job status in database
      updateJobProgress(jobId, progress);
    },
  }).then((result) => {
    saveScreeningResult(jobId, result);
  }).catch((error) => {
    saveScreeningError(jobId, error);
  });

  res.json({
    jobId,
    status: 'started',
    totalVPAs: customers.length,
  });
});

// Get screening job status
router.get('/jobs/:jobId', async (req, res) => {
  const { jobId } = req.params;
  const job = await getScreeningJob(jobId);
  
  if (!job) {
    return res.status(404).json({ error: 'Job not found' });
  }

  res.json(job);
});

// Get screening history
router.get('/history', async (req, res) => {
  const { limit = 10, offset = 0 } = req.query;
  const history = await getScreeningHistory(Number(limit), Number(offset));
  res.json(history);
});

export default router;

Handling Large Datasets

For millions of VPAs, implement streaming and parallel processing:
// services/large-batch-screening.ts
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';

export class LargeBatchScreeningService {
  private readonly WORKER_COUNT = 4;
  private readonly BATCH_SIZE = 100;

  async screenLargeDataset(vpaStream: AsyncIterable<CustomerVPA>): Promise<ScreeningResult> {
    const batches: string[][] = [];
    let currentBatch: string[] = [];

    // Stream VPAs into batches
    for await (const customer of vpaStream) {
      currentBatch.push(customer.vpa);
      
      if (currentBatch.length >= this.BATCH_SIZE) {
        batches.push(currentBatch);
        currentBatch = [];
      }
    }
    
    if (currentBatch.length > 0) {
      batches.push(currentBatch);
    }

    // Distribute batches across workers
    const batchesPerWorker = Math.ceil(batches.length / this.WORKER_COUNT);
    const workerPromises: Promise<any>[] = [];

    for (let i = 0; i < this.WORKER_COUNT; i++) {
      const workerBatches = batches.slice(
        i * batchesPerWorker,
        (i + 1) * batchesPerWorker
      );
      
      if (workerBatches.length > 0) {
        workerPromises.push(this.runWorker(workerBatches));
      }
    }

    // Collect results from all workers
    const results = await Promise.all(workerPromises);
    return this.mergeResults(results);
  }

  private runWorker(batches: string[][]): Promise<any> {
    return new Promise((resolve, reject) => {
      const worker = new Worker('./screening-worker.js', {
        workerData: {
          batches,
          apiKey: process.env.FRAUD_BUSTER_API_KEY,
        },
      });

      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0) {
          reject(new Error(`Worker exited with code ${code}`));
        }
      });
    });
  }

  private mergeResults(results: any[]): ScreeningResult {
    // Merge results from all workers
    return results.reduce((merged, result) => ({
      ...merged,
      cleanCount: merged.cleanCount + result.cleanCount,
      blocklistedCount: merged.blocklistedCount + result.blocklistedCount,
      blocklisted: [...merged.blocklisted, ...result.blocklisted],
    }), {
      runId: `large-${Date.now()}`,
      startTime: new Date(),
      endTime: new Date(),
      totalVPAs: 0,
      cleanCount: 0,
      blocklistedCount: 0,
      errorCount: 0,
      blocklisted: [],
      errors: [],
    });
  }
}

Compliance Reporting

Generate reports for regulatory compliance:
// services/compliance-report.ts
export interface ComplianceReport {
  period: { start: Date; end: Date };
  screeningRuns: number;
  totalVPAsScreened: number;
  uniqueCustomersAffected: number;
  actionsTaken: {
    accountsSuspended: number;
    vpasRemoved: number;
    ticketsCreated: number;
  };
  trendsAnalysis: {
    blocklistGrowthRate: number;
    falsePositiveRate: number;
    avgDetectionTime: number;
  };
}

async function generateComplianceReport(
  startDate: Date,
  endDate: Date
): Promise<ComplianceReport> {
  // Query screening history and compile report
  const runs = await getScreeningRunsInPeriod(startDate, endDate);
  
  return {
    period: { start: startDate, end: endDate },
    screeningRuns: runs.length,
    totalVPAsScreened: runs.reduce((sum, r) => sum + r.totalVPAs, 0),
    uniqueCustomersAffected: new Set(
      runs.flatMap((r) => r.blocklisted.map((b) => b.customerId))
    ).size,
    actionsTaken: {
      accountsSuspended: await countSuspensions(startDate, endDate),
      vpasRemoved: await countVpaRemovals(startDate, endDate),
      ticketsCreated: await countTickets(startDate, endDate),
    },
    trendsAnalysis: await calculateTrends(runs),
  };
}

Production Recommendations

Off-Peak Scheduling

Run batch jobs during off-peak hours (2-5 AM) to avoid impacting production

Incremental Screening

Screen only new/changed VPAs daily, full screening weekly

Retry Failed Batches

Implement retry logic for failed batches with exponential backoff

Result Caching

Cache screening results to avoid re-checking recently verified VPAs