Batch Screening
Run periodic fraud audits by screening your customer database against updated blocklists.Business Scenario
On a regular schedule (daily/weekly), you want to:- Screen all active customer VPAs against blocklists
- Identify newly flagged accounts
- Generate compliance reports
- Take automated actions on flagged accounts
Architecture
Implementation
Step 1: Create Batch Screening Service
Copy
// services/batch-screening.ts
import { TxnCheckClient } from './fraud-buster';
export interface CustomerVPA {
customerId: string;
vpa: string;
accountStatus: string;
}
export interface ScreeningResult {
runId: string;
startTime: Date;
endTime: Date;
totalVPAs: number;
cleanCount: number;
blocklistedCount: number;
errorCount: number;
blocklisted: BlocklistedVPA[];
errors: ScreeningError[];
}
export interface BlocklistedVPA {
customerId: string;
vpa: string;
source: string;
detectedAt: Date;
}
export interface ScreeningError {
batch: number;
error: string;
vpas: string[];
}
export class BatchScreeningService {
private client: TxnCheckClient;
private readonly BATCH_SIZE = 100;
private readonly BATCH_DELAY_MS = 6000; // 10 requests/minute rate limit
constructor(apiKey: string) {
this.client = new TxnCheckClient({ apiKey });
}
async screenAllCustomers(
customers: CustomerVPA[],
options: { onProgress?: (progress: number) => void } = {}
): Promise<ScreeningResult> {
const runId = `screen-${Date.now()}`;
const startTime = new Date();
const result: ScreeningResult = {
runId,
startTime,
endTime: startTime,
totalVPAs: customers.length,
cleanCount: 0,
blocklistedCount: 0,
errorCount: 0,
blocklisted: [],
errors: [],
};
// Create VPA to customer mapping
const vpaToCustomer = new Map<string, string>();
customers.forEach((c) => {
vpaToCustomer.set(c.vpa.toLowerCase(), c.customerId);
});
// Extract unique VPAs
const uniqueVpas = [...new Set(customers.map((c) => c.vpa))];
// Batch VPAs into groups of 100
const batches: string[][] = [];
for (let i = 0; i < uniqueVpas.length; i += this.BATCH_SIZE) {
batches.push(uniqueVpas.slice(i, i + this.BATCH_SIZE));
}
console.log(`Starting screening run ${runId}: ${uniqueVpas.length} VPAs in ${batches.length} batches`);
// Process batches
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
try {
// Use sync mode for immediate results
const response = await this.client.bulkVpaCheck(batch, { sync: true });
if (response.status === 'COMPLETED' && response.result) {
// Process blocklisted VPAs
const blocklisted = response.result.blocklisted || [];
for (const item of blocklisted) {
const customerId = vpaToCustomer.get(item.vpa.toLowerCase());
if (customerId) {
result.blocklisted.push({
customerId,
vpa: item.vpa,
source: item.source || 'unknown',
detectedAt: new Date(),
});
}
}
result.blocklistedCount += blocklisted.length;
result.cleanCount += (response.result.clean || []).length;
} else {
throw new Error(`Unexpected status: ${response.status}`);
}
} catch (error) {
console.error(`Batch ${i + 1} failed:`, error);
result.errors.push({
batch: i + 1,
error: error instanceof Error ? error.message : 'Unknown error',
vpas: batch,
});
result.errorCount += batch.length;
}
// Report progress
const progress = Math.round(((i + 1) / batches.length) * 100);
options.onProgress?.(progress);
// Rate limiting delay (except for last batch)
if (i < batches.length - 1) {
await this.sleep(this.BATCH_DELAY_MS);
}
}
result.endTime = new Date();
console.log(`Screening run ${runId} completed in ${result.endTime.getTime() - startTime.getTime()}ms`);
return result;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
Step 2: Scheduled Job Implementation
Copy
// jobs/daily-screening.ts
import { BatchScreeningService, ScreeningResult } from '../services/batch-screening';
interface ScreeningJob {
runScreening(): Promise<void>;
}
export class DailyScreeningJob implements ScreeningJob {
private screeningService: BatchScreeningService;
constructor(apiKey: string) {
this.screeningService = new BatchScreeningService(apiKey);
}
async runScreening(): Promise<void> {
console.log('Starting daily VPA screening...');
try {
// Step 1: Load active customers with VPAs
const customers = await this.loadActiveCustomers();
console.log(`Loaded ${customers.length} customers for screening`);
// Step 2: Run screening
const result = await this.screeningService.screenAllCustomers(customers, {
onProgress: (progress) => {
console.log(`Screening progress: ${progress}%`);
},
});
// Step 3: Process results
await this.processResults(result);
// Step 4: Generate report
await this.generateReport(result);
console.log('Daily screening completed successfully');
} catch (error) {
console.error('Daily screening failed:', error);
await this.alertOnFailure(error);
throw error;
}
}
private async loadActiveCustomers(): Promise<{ customerId: string; vpa: string; accountStatus: string }[]> {
// Query your database for active customers with VPAs
// Example using Prisma:
/*
return await prisma.customer.findMany({
where: {
status: 'active',
linkedVpas: { isEmpty: false },
},
select: {
id: true,
linkedVpas: true,
status: true,
},
}).then(customers =>
customers.flatMap(c =>
c.linkedVpas.map(vpa => ({
customerId: c.id,
vpa,
accountStatus: c.status,
}))
)
);
*/
// Placeholder
return [];
}
private async processResults(result: ScreeningResult): Promise<void> {
if (result.blocklisted.length === 0) {
console.log('No blocklisted VPAs found');
return;
}
console.log(`Found ${result.blocklisted.length} blocklisted VPAs`);
// Group by customer
const byCustomer = new Map<string, typeof result.blocklisted>();
for (const item of result.blocklisted) {
const existing = byCustomer.get(item.customerId) || [];
existing.push(item);
byCustomer.set(item.customerId, existing);
}
// Process each affected customer
for (const [customerId, vpas] of byCustomer) {
await this.handleBlocklistedCustomer(customerId, vpas);
}
}
private async handleBlocklistedCustomer(
customerId: string,
blocklisted: { vpa: string; source: string }[]
): Promise<void> {
console.log(`Processing blocklisted customer: ${customerId}`);
// Option 1: Suspend account
await this.suspendAccount(customerId, 'VPA blocklisted');
// Option 2: Remove blocklisted VPAs
for (const item of blocklisted) {
await this.removeVpaFromAccount(customerId, item.vpa);
}
// Option 3: Alert compliance team
await this.createComplianceTicket(customerId, blocklisted);
// Option 4: Notify customer
await this.notifyCustomer(customerId, blocklisted.length);
}
private async suspendAccount(customerId: string, reason: string): Promise<void> {
// Update customer status in database
console.log(`Suspending account ${customerId}: ${reason}`);
}
private async removeVpaFromAccount(customerId: string, vpa: string): Promise<void> {
// Remove VPA from customer's linked VPAs
console.log(`Removing VPA ${vpa} from customer ${customerId}`);
}
private async createComplianceTicket(
customerId: string,
blocklisted: { vpa: string; source: string }[]
): Promise<void> {
// Create ticket in your ticketing system
console.log(`Creating compliance ticket for customer ${customerId}`);
}
private async notifyCustomer(customerId: string, vpaCount: number): Promise<void> {
// Send notification to customer
console.log(`Notifying customer ${customerId} about ${vpaCount} flagged VPAs`);
}
private async generateReport(result: ScreeningResult): Promise<void> {
const report = {
runId: result.runId,
executionTime: result.endTime.getTime() - result.startTime.getTime(),
summary: {
totalScreened: result.totalVPAs,
clean: result.cleanCount,
blocklisted: result.blocklistedCount,
errors: result.errorCount,
successRate: ((result.cleanCount + result.blocklistedCount) / result.totalVPAs * 100).toFixed(2),
},
blocklistedDetails: result.blocklisted.map((b) => ({
customerId: b.customerId,
vpa: b.vpa,
source: b.source,
})),
errors: result.errors,
timestamp: new Date().toISOString(),
};
// Save report
await this.saveReport(report);
// Send to stakeholders
await this.emailReport(report);
}
private async saveReport(report: any): Promise<void> {
// Save to database or file storage
console.log('Report saved:', report.runId);
}
private async emailReport(report: any): Promise<void> {
// Send email to compliance team
console.log('Report emailed');
}
private async alertOnFailure(error: unknown): Promise<void> {
// Send alert on job failure
console.error('Screening job failure alert sent');
}
}
// Schedule with node-cron
import cron from 'node-cron';
const job = new DailyScreeningJob(process.env.FRAUD_BUSTER_API_KEY!);
// Run daily at 2 AM
cron.schedule('0 2 * * *', async () => {
try {
await job.runScreening();
} catch (error) {
console.error('Scheduled screening failed:', error);
}
});
Step 3: API for Manual Runs
Copy
// routes/screening.ts
import express from 'express';
import { BatchScreeningService } from '../services/batch-screening';
const router = express.Router();
const screeningService = new BatchScreeningService(process.env.FRAUD_BUSTER_API_KEY!);
// Trigger manual screening run
router.post('/run', async (req, res) => {
const { customerIds } = req.body;
// Load specific customers or all if not specified
let customers;
if (customerIds && customerIds.length > 0) {
customers = await loadCustomersByIds(customerIds);
} else {
customers = await loadAllActiveCustomers();
}
// Start screening in background
const jobId = `manual-${Date.now()}`;
// Don't await - run in background
screeningService.screenAllCustomers(customers, {
onProgress: (progress) => {
// Update job status in database
updateJobProgress(jobId, progress);
},
}).then((result) => {
saveScreeningResult(jobId, result);
}).catch((error) => {
saveScreeningError(jobId, error);
});
res.json({
jobId,
status: 'started',
totalVPAs: customers.length,
});
});
// Get screening job status
router.get('/jobs/:jobId', async (req, res) => {
const { jobId } = req.params;
const job = await getScreeningJob(jobId);
if (!job) {
return res.status(404).json({ error: 'Job not found' });
}
res.json(job);
});
// Get screening history
router.get('/history', async (req, res) => {
const { limit = 10, offset = 0 } = req.query;
const history = await getScreeningHistory(Number(limit), Number(offset));
res.json(history);
});
export default router;
Handling Large Datasets
For millions of VPAs, implement streaming and parallel processing:Copy
// services/large-batch-screening.ts
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
export class LargeBatchScreeningService {
private readonly WORKER_COUNT = 4;
private readonly BATCH_SIZE = 100;
async screenLargeDataset(vpaStream: AsyncIterable<CustomerVPA>): Promise<ScreeningResult> {
const batches: string[][] = [];
let currentBatch: string[] = [];
// Stream VPAs into batches
for await (const customer of vpaStream) {
currentBatch.push(customer.vpa);
if (currentBatch.length >= this.BATCH_SIZE) {
batches.push(currentBatch);
currentBatch = [];
}
}
if (currentBatch.length > 0) {
batches.push(currentBatch);
}
// Distribute batches across workers
const batchesPerWorker = Math.ceil(batches.length / this.WORKER_COUNT);
const workerPromises: Promise<any>[] = [];
for (let i = 0; i < this.WORKER_COUNT; i++) {
const workerBatches = batches.slice(
i * batchesPerWorker,
(i + 1) * batchesPerWorker
);
if (workerBatches.length > 0) {
workerPromises.push(this.runWorker(workerBatches));
}
}
// Collect results from all workers
const results = await Promise.all(workerPromises);
return this.mergeResults(results);
}
private runWorker(batches: string[][]): Promise<any> {
return new Promise((resolve, reject) => {
const worker = new Worker('./screening-worker.js', {
workerData: {
batches,
apiKey: process.env.FRAUD_BUSTER_API_KEY,
},
});
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker exited with code ${code}`));
}
});
});
}
private mergeResults(results: any[]): ScreeningResult {
// Merge results from all workers
return results.reduce((merged, result) => ({
...merged,
cleanCount: merged.cleanCount + result.cleanCount,
blocklistedCount: merged.blocklistedCount + result.blocklistedCount,
blocklisted: [...merged.blocklisted, ...result.blocklisted],
}), {
runId: `large-${Date.now()}`,
startTime: new Date(),
endTime: new Date(),
totalVPAs: 0,
cleanCount: 0,
blocklistedCount: 0,
errorCount: 0,
blocklisted: [],
errors: [],
});
}
}
Compliance Reporting
Generate reports for regulatory compliance:Copy
// services/compliance-report.ts
export interface ComplianceReport {
period: { start: Date; end: Date };
screeningRuns: number;
totalVPAsScreened: number;
uniqueCustomersAffected: number;
actionsTaken: {
accountsSuspended: number;
vpasRemoved: number;
ticketsCreated: number;
};
trendsAnalysis: {
blocklistGrowthRate: number;
falsePositiveRate: number;
avgDetectionTime: number;
};
}
async function generateComplianceReport(
startDate: Date,
endDate: Date
): Promise<ComplianceReport> {
// Query screening history and compile report
const runs = await getScreeningRunsInPeriod(startDate, endDate);
return {
period: { start: startDate, end: endDate },
screeningRuns: runs.length,
totalVPAsScreened: runs.reduce((sum, r) => sum + r.totalVPAs, 0),
uniqueCustomersAffected: new Set(
runs.flatMap((r) => r.blocklisted.map((b) => b.customerId))
).size,
actionsTaken: {
accountsSuspended: await countSuspensions(startDate, endDate),
vpasRemoved: await countVpaRemovals(startDate, endDate),
ticketsCreated: await countTickets(startDate, endDate),
},
trendsAnalysis: await calculateTrends(runs),
};
}
Production Recommendations
Off-Peak Scheduling
Run batch jobs during off-peak hours (2-5 AM) to avoid impacting production
Incremental Screening
Screen only new/changed VPAs daily, full screening weekly
Retry Failed Batches
Implement retry logic for failed batches with exponential backoff
Result Caching
Cache screening results to avoid re-checking recently verified VPAs
