Due Diligence Renforcée (EDD) automatisée : guide 2025 pour CGP et Fintech
Automatisez votre Due Diligence Renforcée (EDD) : critères déclencheurs, workflows par segment, APIs, preuves d'audit et ROI. Guide complet CGP/Fintech 2025.
Architecture technique des intégrations multi-sources KYC : APIs INPI/Sirene/RBE, déduplication intelligente, monitoring qualité et coûts optimisés. Guide complet 2025.
En résumé : Maîtrisez l'architecture des intégrations KYC multi-sources : APIs natives, déduplication intelligente, monitoring qualité temps réel et optimisation des coûts. Découvrir notre architecture →
Sources officielles françaises :
Source | Type de données | Couverture | MàJ fréquence | API native | Coût indicatif |
---|---|---|---|---|---|
INPI | Entreprises, marques, brevets | 2,8M entreprises | Temps réel | Oui | 0,02€/requête |
Sirene (INSEE) | Établissements, activités | 12M établissements | Quotidienne | Oui | Gratuit |
RBE | Bénéficiaires effectifs | 1,2M déclarations | Hebdomadaire | Limitée | 0,05€/requête |
BODACC | Événements juridiques | 500k annonces/an | Quotidienne | Non | 0,01€/requête |
ORIAS | Intermédiaires assurance | 65k intermédiaires | Temps réel | Oui | 0,03€/requête |
Listes de sanctions critiques :
OFAC (Office of Foreign Assets Control) :
Union Européenne :
Nations Unies :
Sources premium :
Sources publiques :
Critères de classification PEP :
pep_categories = {
"domestic_pep": {
"functions": [
"head_of_state", "head_of_government",
"government_minister", "senior_civil_servant",
"judicial_authority", "military_commander"
],
"threshold_level": "national_regional_local"
},
"foreign_pep": {
"same_functions": "domestic_pep",
"international_org": [
"european_commission", "un_senior_management",
"world_bank_group", "international_monetary_fund"
]
},
"family_members": {
"relationship": ["spouse", "children", "parents"],
"inheritance_risk": True
},
"close_associates": {
"business_relationship": "beneficial_owner_together",
"social_relationship": "known_close_association"
}
}
graph TB
subgraph "Data Ingestion Layer"
A[INPI Connector]
B[Sirene Connector]
C[RBE Connector]
D[Sanctions Connector]
E[BODACC Connector]
end
subgraph "Processing Layer"
F[Entity Resolution Engine]
G[Data Normalization]
H[Quality Validator]
I[Deduplication Service]
end
subgraph "Storage Layer"
J[Master Data Store]
K[Cache Layer Redis]
L[Search Index Elasticsearch]
M[Audit Trail DB]
end
subgraph "API Layer"
N[GraphQL Gateway]
O[REST APIs v1]
P[Webhooks Manager]
Q[Rate Limiter]
end
A --> F
B --> G
C --> H
D --> I
E --> F
F --> J
G --> K
H --> L
I --> M
J --> N
K --> O
L --> P
M --> Q
1. Circuit Breaker Pattern
import asyncio
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Fonctionnement normal
OPEN = "open" # Circuit ouvert, pas d'appels
HALF_OPEN = "half_open" # Test de récupération
class SourceCircuitBreaker:
def __init__(self, source_name, failure_threshold=5, recovery_timeout=60):
self.source_name = source_name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call_source(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise SourceUnavailableError(f"{self.source_name} circuit is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise SourceTemporaryError(f"{self.source_name} failed: {e}")
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = asyncio.get_event_loop().time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
2. Retry with Exponential Backoff
import asyncio
import random
from typing import Callable, Any
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_factor: float = 2.0,
jitter: bool = True
) -> Any:
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
raise e # Final attempt failed
# Calculate delay with exponential backoff
delay = min(base_delay * (backoff_factor ** attempt), max_delay)
# Add jitter to prevent thundering herd
if jitter:
delay *= (0.5 + random.random() / 2)
await asyncio.sleep(delay)
# Usage with source APIs
async def resilient_inpi_call(siren: str):
return await retry_with_backoff(
lambda: inpi_api.get_company(siren),
max_retries=3,
base_delay=1.0
)
Configuration multi-région :
sources_config:
inpi:
primary:
endpoint: "https://api.inpi.fr/v1"
region: "europe-west1"
weight: 100
fallback:
endpoint: "https://api-backup.inpi.fr/v1"
region: "europe-west3"
weight: 0 # Activated only on primary failure
world_check:
primary:
endpoint: "https://api.refinitiv.com/v1"
region: "europe-west1"
rate_limit: "1000/hour"
secondary:
endpoint: "https://api-eu.refinitiv.com/v1"
region: "europe-west2"
rate_limit: "800/hour"
load_balance_weight: 30 # 30% of traffic
Endpoints principaux :
# 1. Informations entreprise complètes
GET https://api.inpi.fr/v1/companies/{siren}
{
"siren": "123456789",
"denomination": "ACME CORPORATION SAS",
"legal_form": "SAS",
"status": "active",
"creation_date": "2020-01-15",
"address": {
"street": "123 Avenue des Champs",
"city": "PARIS",
"postal_code": "75008",
"country": "FRANCE"
},
"capital": {
"amount": 100000,
"currency": "EUR"
},
"activity": {
"ape_code": "6201Z",
"description": "Programmation informatique"
},
"directors": [
{
"name": "Jean MARTIN",
"function": "Président",
"appointment_date": "2020-01-15",
"birth_date": "1975-03-20",
"nationality": "FR"
}
]
}
# 2. Surveillance des modifications
GET https://api.inpi.fr/v1/companies/{siren}/changes?since=2025-01-01
{
"changes": [
{
"date": "2025-01-10",
"type": "director_appointment",
"details": {
"new_director": "Marie DUBOIS",
"function": "Directrice Générale"
}
},
{
"date": "2025-01-12",
"type": "capital_increase",
"details": {
"previous_amount": 100000,
"new_amount": 150000
}
}
]
}
Implémentation du connecteur INPI :
import aiohttp
import asyncio
from typing import Optional, Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class INPICompanyInfo:
siren: str
denomination: str
legal_form: str
status: str
directors: List[Dict]
last_updated: datetime
class INPIConnector:
def __init__(self, api_key: str, base_url: str = "https://api.inpi.fr/v1"):
self.api_key = api_key
self.base_url = base_url
self.session = None
self.circuit_breaker = SourceCircuitBreaker("INPI", failure_threshold=5)
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get_company(self, siren: str) -> Optional[INPICompanyInfo]:
async def _api_call():
url = f"{self.base_url}/companies/{siren}"
async with self.session.get(url) as response:
if response.status == 404:
return None
response.raise_for_status()
data = await response.json()
return INPICompanyInfo(
siren=data['siren'],
denomination=data['denomination'],
legal_form=data['legal_form'],
status=data['status'],
directors=data.get('directors', []),
last_updated=datetime.now()
)
return await self.circuit_breaker.call_source(_api_call)
async def monitor_changes(self, siren: str, since: datetime) -> List[Dict]:
async def _api_call():
url = f"{self.base_url}/companies/{siren}/changes"
params = {"since": since.isoformat()}
async with self.session.get(url, params=params) as response:
response.raise_for_status()
data = await response.json()
return data.get('changes', [])
return await self.circuit_breaker.call_source(_api_call)
# Usage exemple
async def monitor_company_changes():
async with INPIConnector(api_key="your_api_key") as inpi:
changes = await inpi.monitor_changes(
siren="123456789",
since=datetime(2025, 1, 1)
)
for change in changes:
print(f"Change detected: {change['type']} on {change['date']}")
Particularités API Sirene INSEE :
class SireneConnector:
"""
Connecteur pour l'API Sirene de l'INSEE
Gratuite mais avec rate limiting : 30 requêtes/minute
"""
def __init__(self):
self.base_url = "https://api.insee.fr/entreprises/sirene/V3"
self.rate_limiter = AsyncRateLimiter(max_calls=25, time_window=60)
async def search_establishments(self, siren: str) -> List[Dict]:
"""Récupère tous les établissements d'une entreprise"""
async with self.rate_limiter:
url = f"{self.base_url}/siret"
params = {
"q": f"siren:{siren}",
"nombre": 1000 # Max par requête
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
data = await response.json()
establishments = []
for etab in data.get('etablissements', []):
establishments.append({
'siret': etab['siret'],
'is_headquarters': etab['etablissementSiege'],
'status': etab['etatAdministratifEtablissement'],
'activity_code': etab['activitePrincipaleEtablissement'],
'address': self._format_address(etab['adresseEtablissement']),
'employees': etab.get('trancheEffectifsEtablissement')
})
return establishments
Challenges techniques RBE :
class RBEConnector:
"""
Registre des Bénéficiaires Effectifs
Accès restreint, données sensibles
"""
async def get_beneficial_owners(self, siren: str) -> Optional[Dict]:
try:
# API call avec authentification renforcée
url = f"https://api.rbe.fr/v1/declarations/{siren}"
headers = {
"Authorization": f"Bearer {self.jwt_token}",
"X-Client-Certificate": self.client_cert_fingerprint
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 404:
# Pas de déclaration RBE = PME sous seuil ou défaillance
return None
data = await response.json()
return {
'declaration_date': data['declarationDate'],
'beneficial_owners': [
{
'name': bo['nom'],
'birth_date': bo['dateNaissance'],
'nationality': bo['nationalite'],
'control_percentage': bo['pourcentageDetention'],
'control_type': bo['typeDetention'], # direct/indirect
'is_natural_person': bo['personnePhysique']
}
for bo in data['beneficiaires']
]
}
except Exception as e:
logger.warning(f"RBE API failed for {siren}: {e}")
return None # Graceful degradation
Architecture webhooks pour notifications instantanées :
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
class WebhookEvent(BaseModel):
event_type: str # "company_updated", "sanction_added", "pep_identified"
entity_id: str
source: str # "inpi", "ofac", "world_check"
timestamp: str
data: Dict[str, Any]
confidence_score: Optional[float] = None
class WebhookManager:
def __init__(self):
self.subscribers = {} # client_id -> webhook_url
self.event_queue = asyncio.Queue()
async def register_webhook(self, client_id: str, webhook_url: str,
events: List[str], secret: str):
"""Register client webhook for specific events"""
self.subscribers[client_id] = {
'url': webhook_url,
'events': events,
'secret': secret,
'active': True
}
async def send_event(self, event: WebhookEvent):
"""Send event to all subscribed clients"""
for client_id, config in self.subscribers.items():
if event.event_type in config['events'] and config['active']:
await self._deliver_webhook(client_id, config, event)
async def _deliver_webhook(self, client_id: str, config: Dict, event: WebhookEvent):
"""Deliver webhook with retry logic"""
payload = event.dict()
signature = self._generate_signature(payload, config['secret'])
headers = {
'Content-Type': 'application/json',
'X-Swiftgum-Signature': signature,
'X-Swiftgum-Event': event.event_type
}
# Retry with exponential backoff
max_retries = 3
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
config['url'],
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
logger.info(f"Webhook delivered to {client_id}")
return
else:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status
)
except Exception as e:
wait_time = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(wait_time)
if attempt == max_retries - 1:
logger.error(f"Failed to deliver webhook to {client_id} after {max_retries} attempts: {e}")
# Disable webhook after repeated failures
config['active'] = False
Problématique de la déduplication : Les mêmes entités apparaissent sous différentes formes dans les sources :
Algorithme de matching multi-niveaux :
import jellyfish # Distance de Levenshtein, Soundex, etc.
import re
from typing import List, Tuple, Optional
from dataclasses import dataclass
@dataclass
class MatchResult:
entity1: str
entity2: str
similarity_score: float
match_type: str # "exact", "high_confidence", "possible", "different"
matching_fields: List[str]
class EntityMatcher:
def __init__(self):
self.exact_threshold = 1.0
self.high_confidence_threshold = 0.85
self.possible_match_threshold = 0.60
def match_companies(self, company1: Dict, company2: Dict) -> MatchResult:
"""Match two company records from different sources"""
# 1. Exact matches (SIREN, SIRET)
if self._exact_identifier_match(company1, company2):
return MatchResult(
entity1=company1['name'],
entity2=company2['name'],
similarity_score=1.0,
match_type="exact",
matching_fields=["siren"]
)
# 2. Company name fuzzy matching
name_score = self._fuzzy_match_name(
company1['name'],
company2['name']
)
# 3. Address matching (if available)
address_score = 0.0
if 'address' in company1 and 'address' in company2:
address_score = self._match_addresses(
company1['address'],
company2['address']
)
# 4. Directors matching
directors_score = 0.0
if 'directors' in company1 and 'directors' in company2:
directors_score = self._match_directors(
company1['directors'],
company2['directors']
)
# Weighted composite score
composite_score = (
name_score * 0.5 +
address_score * 0.25 +
directors_score * 0.25
)
# Classification
if composite_score >= self.high_confidence_threshold:
match_type = "high_confidence"
elif composite_score >= self.possible_match_threshold:
match_type = "possible"
else:
match_type = "different"
return MatchResult(
entity1=company1['name'],
entity2=company2['name'],
similarity_score=composite_score,
match_type=match_type,
matching_fields=self._identify_matching_fields(company1, company2)
)
def _fuzzy_match_name(self, name1: str, name2: str) -> float:
"""Advanced fuzzy matching for company names"""
# Preprocessing: normalize names
norm_name1 = self._normalize_company_name(name1)
norm_name2 = self._normalize_company_name(name2)
# Multiple similarity metrics
jaro_winkler = jellyfish.jaro_winkler_similarity(norm_name1, norm_name2)
levenshtein = 1 - (jellyfish.levenshtein_distance(norm_name1, norm_name2) /
max(len(norm_name1), len(norm_name2)))
# Token-based similarity (for word order variations)
tokens1 = set(norm_name1.split())
tokens2 = set(norm_name2.split())
token_similarity = len(tokens1 & tokens2) / len(tokens1 | tokens2)
# Weighted average
return (jaro_winkler * 0.5 + levenshtein * 0.3 + token_similarity * 0.2)
def _normalize_company_name(self, name: str) -> str:
"""Normalize company name for better matching"""
# Convert to uppercase
normalized = name.upper()
# Remove common legal forms and punctuation
legal_forms = [
"SAS", "SARL", "SA", "SASU", "EURL", "SNC", "SCI",
"SOCIETE", "SOCIÉTÉ", "COMPAGNIE", "ENTREPRISE"
]
for form in legal_forms:
normalized = re.sub(r'\b' + form + r'\b', '', normalized)
# Remove punctuation and extra spaces
normalized = re.sub(r'[^\w\s]', '', normalized)
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized
def _match_directors(self, directors1: List[Dict], directors2: List[Dict]) -> float:
"""Match directors between two company records"""
if not directors1 or not directors2:
return 0.0
matches = 0
total_directors = max(len(directors1), len(directors2))
for dir1 in directors1:
best_match = 0.0
for dir2 in directors2:
name_similarity = jellyfish.jaro_winkler_similarity(
dir1.get('name', '').upper(),
dir2.get('name', '').upper()
)
# Bonus for matching birth date
if (dir1.get('birth_date') and dir2.get('birth_date') and
dir1['birth_date'] == dir2['birth_date']):
name_similarity = min(name_similarity + 0.2, 1.0)
best_match = max(best_match, name_similarity)
if best_match > 0.8: # High confidence director match
matches += 1
return matches / total_directors
Engine de résolution d'entités :
class EntityResolutionEngine:
def __init__(self):
self.matcher = EntityMatcher()
self.confidence_threshold = 0.85
async def resolve_entity(self, entity_id: str) -> Dict:
"""Resolve entity across all sources and return consolidated view"""
# 1. Collect data from all sources
sources_data = await self._collect_from_sources(entity_id)
# 2. Find matches across sources
matches = await self._find_cross_source_matches(sources_data)
# 3. Consolidate information
consolidated = await self._consolidate_entity_data(matches)
# 4. Calculate confidence score
consolidated['confidence_score'] = self._calculate_confidence(consolidated)
return consolidated
async def _collect_from_sources(self, entity_id: str) -> Dict[str, Any]:
"""Collect data from all available sources"""
sources_data = {}
# Parallel API calls to all sources
tasks = [
self._get_from_inpi(entity_id),
self._get_from_sirene(entity_id),
self._get_from_rbe(entity_id),
self._get_from_sanctions_lists(entity_id)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
source_names = ['inpi', 'sirene', 'rbe', 'sanctions']
for i, result in enumerate(results):
if not isinstance(result, Exception) and result is not None:
sources_data[source_names[i]] = result
return sources_data
async def _consolidate_entity_data(self, matches: List[MatchResult]) -> Dict:
"""Create consolidated entity view with data quality scoring"""
consolidated = {
'entity_id': None,
'names': [],
'identifiers': {},
'addresses': [],
'directors': [],
'activities': [],
'beneficial_owners': [],
'sanctions_status': 'clear',
'pep_status': False,
'data_sources': [],
'last_updated': datetime.now().isoformat(),
'data_quality_score': 0.0
}
# Priority order for data sources (most authoritative first)
source_priority = ['inpi', 'sirene', 'rbe', 'sanctions', 'world_check']
for source in source_priority:
if source in matches:
self._merge_source_data(consolidated, matches[source], source)
# Calculate data quality score
consolidated['data_quality_score'] = self._calculate_data_quality(consolidated)
return consolidated
def _calculate_data_quality(self, entity: Dict) -> float:
"""Calculate data quality score based on completeness and freshness"""
quality_factors = {
'has_official_identifier': 0.3, # SIREN/SIRET
'has_current_address': 0.2,
'has_directors_info': 0.15,
'has_activity_code': 0.1,
'sanctions_screened': 0.15,
'data_freshness': 0.1
}
score = 0.0
# Check each quality factor
if entity.get('identifiers', {}).get('siren'):
score += quality_factors['has_official_identifier']
if entity.get('addresses') and len(entity['addresses']) > 0:
score += quality_factors['has_current_address']
if entity.get('directors') and len(entity['directors']) > 0:
score += quality_factors['has_directors_info']
if entity.get('activities') and len(entity['activities']) > 0:
score += quality_factors['has_activity_code']
if 'sanctions' in entity.get('data_sources', []):
score += quality_factors['sanctions_screened']
# Data freshness (based on most recent update)
# Implementation would check timestamps from sources
score += quality_factors['data_freshness'] * 0.8 # Assume recent
return min(score, 1.0)
Métriques critiques par source :
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
class SourceStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
DOWN = "down"
MAINTENANCE = "maintenance"
@dataclass
class SourceMetrics:
source_name: str
status: SourceStatus
uptime_percentage: float # Last 24h
avg_response_time: float # milliseconds
p95_response_time: float
p99_response_time: float
error_rate: float # percentage
requests_per_minute: int
data_freshness: timedelta # How old is the data
coverage_percentage: float # % entities with data from this source
cost_per_request: float
monthly_quota_used: float # percentage of monthly quota used
class SourceMonitor:
def __init__(self):
self.metrics_store = {} # In production: use InfluxDB/Prometheus
self.alert_thresholds = {
'uptime_min': 99.5,
'response_time_max': 2000, # 2 seconds
'error_rate_max': 1.0, # 1%
'data_freshness_max': timedelta(hours=24)
}
async def collect_metrics(self) -> Dict[str, SourceMetrics]:
"""Collect current metrics from all sources"""
sources = ['inpi', 'sirene', 'rbe', 'ofac', 'world_check', 'bodacc']
tasks = [self._collect_source_metrics(source) for source in sources]
results = await asyncio.gather(*tasks, return_exceptions=True)
metrics = {}
for i, result in enumerate(results):
if not isinstance(result, Exception):
metrics[sources[i]] = result
return metrics
async def _collect_source_metrics(self, source_name: str) -> SourceMetrics:
"""Collect metrics for a specific source"""
# Get metrics from monitoring system (Prometheus, DataDog, etc.)
# This is a simplified example
now = datetime.now()
start_time = now - timedelta(hours=24)
# Query metrics store
uptime = await self._get_uptime_percentage(source_name, start_time, now)
response_times = await self._get_response_times(source_name, start_time, now)
error_rate = await self._get_error_rate(source_name, start_time, now)
request_rate = await self._get_request_rate(source_name)
# Determine status
status = self._determine_status(uptime, response_times.p95, error_rate)
return SourceMetrics(
source_name=source_name,
status=status,
uptime_percentage=uptime,
avg_response_time=response_times.avg,
p95_response_time=response_times.p95,
p99_response_time=response_times.p99,
error_rate=error_rate,
requests_per_minute=request_rate,
data_freshness=await self._get_data_freshness(source_name),
coverage_percentage=await self._get_coverage_percentage(source_name),
cost_per_request=self._get_cost_per_request(source_name),
monthly_quota_used=await self._get_quota_usage(source_name)
)
def _determine_status(self, uptime: float, p95_response: float, error_rate: float) -> SourceStatus:
"""Determine overall source status based on metrics"""
if uptime < 95.0 or p95_response > 5000 or error_rate > 5.0:
return SourceStatus.DOWN
elif uptime < 99.0 or p95_response > 2000 or error_rate > 1.0:
return SourceStatus.DEGRADED
else:
return SourceStatus.HEALTHY
class IntelligentAlerting:
def __init__(self):
self.alert_channels = {
'slack': SlackNotifier(),
'email': EmailNotifier(),
'pagerduty': PagerDutyNotifier(),
'webhook': WebhookNotifier()
}
# Different escalation levels
self.escalation_rules = {
'warning': ['slack'],
'critical': ['slack', 'email'],
'emergency': ['slack', 'email', 'pagerduty']
}
async def evaluate_alerts(self, metrics: Dict[str, SourceMetrics]):
"""Evaluate metrics and trigger alerts if needed"""
for source_name, metric in metrics.items():
alerts = self._check_metric_thresholds(source_name, metric)
for alert in alerts:
await self._send_alert(alert)
def _check_metric_thresholds(self, source_name: str, metric: SourceMetrics) -> List[Alert]:
"""Check if metrics breach thresholds"""
alerts = []
# Uptime alert
if metric.uptime_percentage < 99.0:
severity = 'critical' if metric.uptime_percentage < 95.0 else 'warning'
alerts.append(Alert(
source=source_name,
type='uptime_low',
severity=severity,
message=f"{source_name} uptime is {metric.uptime_percentage:.1f}% (threshold: 99.0%)",
value=metric.uptime_percentage,
threshold=99.0
))
# Response time alert
if metric.p95_response_time > 2000:
severity = 'critical' if metric.p95_response_time > 5000 else 'warning'
alerts.append(Alert(
source=source_name,
type='response_time_high',
severity=severity,
message=f"{source_name} P95 response time is {metric.p95_response_time:.0f}ms (threshold: 2000ms)",
value=metric.p95_response_time,
threshold=2000
))
# Error rate alert
if metric.error_rate > 1.0:
severity = 'critical' if metric.error_rate > 5.0 else 'warning'
alerts.append(Alert(
source=source_name,
type='error_rate_high',
severity=severity,
message=f"{source_name} error rate is {metric.error_rate:.1f}% (threshold: 1.0%)",
value=metric.error_rate,
threshold=1.0
))
# Data freshness alert
max_freshness = timedelta(hours=24)
if metric.data_freshness > max_freshness:
alerts.append(Alert(
source=source_name,
type='data_stale',
severity='warning',
message=f"{source_name} data is {metric.data_freshness} old (threshold: 24h)",
value=metric.data_freshness.total_seconds(),
threshold=max_freshness.total_seconds()
))
return alerts
Tableaux de bord opérationnels :
# Configuration SLA par source et segment client
SLA_CONFIG = {
"enterprise": { # Clients enterprise
"inpi": {
"uptime": 99.9,
"response_time_p95": 500, # ms
"data_freshness": timedelta(minutes=15)
},
"world_check": {
"uptime": 99.95,
"response_time_p95": 200,
"data_freshness": timedelta(minutes=5)
}
},
"standard": { # Clients standard
"inpi": {
"uptime": 99.5,
"response_time_p95": 1000,
"data_freshness": timedelta(hours=1)
},
"sirene": {
"uptime": 99.0, # Service gratuit
"response_time_p95": 2000,
"data_freshness": timedelta(hours=24)
}
}
}
class SLAReporter:
def __init__(self):
self.sla_config = SLA_CONFIG
async def generate_sla_report(self, period: timedelta = timedelta(days=30)) -> Dict:
"""Generate SLA compliance report"""
end_time = datetime.now()
start_time = end_time - period
report = {
'period': {
'start': start_time.isoformat(),
'end': end_time.isoformat()
},
'sources': {},
'overall_compliance': 0.0
}
total_sla_score = 0.0
source_count = 0
for source in ['inpi', 'sirene', 'rbe', 'ofac', 'world_check']:
source_metrics = await self._get_historical_metrics(source, start_time, end_time)
sla_compliance = self._calculate_sla_compliance(source, source_metrics)
report['sources'][source] = {
'compliance_percentage': sla_compliance.overall,
'uptime_compliance': sla_compliance.uptime,
'performance_compliance': sla_compliance.performance,
'freshness_compliance': sla_compliance.freshness,
'incidents': sla_compliance.incidents
}
total_sla_score += sla_compliance.overall
source_count += 1
report['overall_compliance'] = total_sla_score / source_count
return report
Structure des coûts :
Source | Modèle tarifaire | Coût unitaire | Coût 1k entités | Seuil dégressif | Tarif réduit |
---|---|---|---|---|---|
INPI API | Pay-per-request | 0,02€/requête | 20€ | 50k requêtes | 0,015€/requête |
Sirene INSEE | Gratuit + rate limiting | 0€ | 0€ | N/A | 0€ |
RBE | Abonnement + usage | 0,05€/requête | 50€ | 20k requêtes | 0,03€/requête |
World-Check | Abonnement annuel | 0,15€/profil | 150€ | 100k profils | 0,08€/profil |
OFAC/EU Sanctions | Gratuit (public) | 0€ | 0€ | N/A | 0€ |
class CostOptimizer:
def __init__(self):
self.cost_per_source = {
'inpi': 0.02,
'rbe': 0.05,
'world_check': 0.15,
'sirene': 0.0, # Gratuit
'ofac': 0.0 # Gratuit
}
self.cache_ttl = {
'inpi': timedelta(hours=24), # Données statiques
'rbe': timedelta(days=7), # Mis à jour hebdomadaire
'world_check': timedelta(hours=6), # Données critiques
'sirene': timedelta(days=1), # Données semi-statiques
'ofac': timedelta(hours=1) # Données critiques
}
async def optimize_data_collection(self, entity_id: str, required_sources: List[str]) -> Dict:
"""Optimize data collection strategy to minimize costs"""
strategy = {
'sources_to_query': [],
'sources_from_cache': [],
'estimated_cost': 0.0,
'cache_strategy': {}
}
for source in required_sources:
cached_data = await self._check_cache(entity_id, source)
if cached_data and not self._is_cache_stale(cached_data, source):
# Use cached data
strategy['sources_from_cache'].append(source)
else:
# Need fresh API call
strategy['sources_to_query'].append(source)
strategy['estimated_cost'] += self.cost_per_source[source]
# Batch optimization for expensive sources
if 'world_check' in strategy['sources_to_query']:
strategy['cache_strategy']['world_check'] = 'aggressive' # Cache longer
return strategy
async def batch_requests_optimization(self, entity_ids: List[str]) -> Dict:
"""Optimize batch requests to minimize API costs"""
# Group by source capabilities
batch_capable = ['world_check', 'ofac'] # Support batch requests
individual_only = ['inpi', 'rbe', 'sirene'] # Individual requests only
optimization_plan = {
'batch_requests': {},
'individual_requests': {},
'total_cost_before': 0.0,
'total_cost_after': 0.0
}
for source in batch_capable:
# Batch requests have bulk discounts
batch_size = min(len(entity_ids), 100) # Max batch size
cost_per_entity = self.cost_per_source[source]
# Volume discount for batch requests
if batch_size >= 50:
cost_per_entity *= 0.7 # 30% discount
elif batch_size >= 20:
cost_per_entity *= 0.8 # 20% discount
optimization_plan['batch_requests'][source] = {
'entity_count': len(entity_ids),
'batch_size': batch_size,
'cost_per_entity': cost_per_entity,
'total_cost': len(entity_ids) * cost_per_entity
}
return optimization_plan
class SmartCache:
"""Intelligent caching system with cost-aware TTL"""
def __init__(self):
self.redis_client = Redis() # Redis for fast cache
self.cost_per_source = CostOptimizer().cost_per_source
async def cache_with_smart_ttl(self, key: str, data: Dict, source: str,
request_cost: float = None):
"""Cache data with smart TTL based on cost and data type"""
if request_cost is None:
request_cost = self.cost_per_source.get(source, 0.0)
# Longer TTL for expensive sources
if request_cost > 0.1:
ttl = timedelta(hours=12) # Cache expensive data longer
elif request_cost > 0.05:
ttl = timedelta(hours=6)
elif request_cost > 0.0:
ttl = timedelta(hours=2)
else:
ttl = timedelta(minutes=30) # Short cache for free sources
# Adjust TTL based on data criticality
if source in ['ofac', 'sanctions']:
ttl = min(ttl, timedelta(hours=1)) # Never cache sanctions too long
await self.redis_client.setex(
key,
int(ttl.total_seconds()),
json.dumps(data)
)
Analyse coût-bénéfice par source :
class SourceROIAnalyzer:
def __init__(self):
self.source_value_metrics = {
'detection_rate': {}, # % de risques détectés par source
'false_positive_rate': {}, # % de faux positifs
'coverage': {}, # % entités avec données disponibles
'data_quality': {} # Score qualité des données
}
async def calculate_source_roi(self, source: str, time_period: timedelta) -> Dict:
"""Calculate ROI for a specific source over time period"""
# Costs
total_requests = await self._get_request_count(source, time_period)
cost_per_request = self.cost_per_source[source]
total_cost = total_requests * cost_per_request
# Benefits
risks_detected = await self._get_risks_detected(source, time_period)
false_positives = await self._get_false_positives(source, time_period)
# Value calculation
value_per_risk_detected = 1000 # Estimated value of preventing one risk
cost_per_false_positive = 50 # Cost to investigate false positive
total_value = (risks_detected * value_per_risk_detected) - \
(false_positives * cost_per_false_positive)
roi = ((total_value - total_cost) / total_cost) * 100 if total_cost > 0 else 0
return {
'source': source,
'period_days': time_period.days,
'total_requests': total_requests,
'total_cost': total_cost,
'risks_detected': risks_detected,
'false_positives': false_positives,
'total_value': total_value,
'roi_percentage': roi,
'cost_per_risk_detected': total_cost / risks_detected if risks_detected > 0 else float('inf'),
'recommendation': self._get_recommendation(roi, source)
}
def _get_recommendation(self, roi: float, source: str) -> str:
"""Get optimization recommendation based on ROI"""
if roi > 200:
return f"Excellent ROI for {source}. Consider increasing usage."
elif roi > 100:
return f"Good ROI for {source}. Current usage is optimal."
elif roi > 0:
return f"Positive but low ROI for {source}. Monitor closely."
else:
return f"Negative ROI for {source}. Consider reducing usage or finding alternatives."
Configuration optimisée CGP :
cgp_optimized_config:
priority_sources:
tier_1: # Sources critiques obligatoires
- "sirene" # Gratuit, couverture complète FR
- "ofac" # Gratuit, sanctions critiques
- "eu_sanctions" # Gratuit, sanctions UE
tier_2: # Sources importantes si budget permet
- "inpi" # Payant mais données riches dirigeants
- "rbe" # Obligatoire pour UBO si >25%
tier_3: # Sources premium pour cas complexes
- "world_check" # Cher mais très complet PEP/sanctions
automation_level: 70 # 70% automatisation, 30% validation humaine
integrations:
crm:
primary: "salesforce_financial"
sync_frequency: "real_time"
fields_mapped: ["kyc_status", "risk_score", "last_check"]
notifications:
channels: ["email", "sms"]
urgency_levels:
high: "immediate" # Sanctions, PEP
medium: "daily_digest" # Changements dirigeants
low: "weekly_summary" # Mises à jour mineures
Workflow CGP optimisé :
class CGPOptimizedWorkflow:
def __init__(self):
self.free_sources = ['sirene', 'ofac', 'eu_sanctions']
self.paid_sources = ['inpi', 'rbe', 'world_check']
self.budget_threshold = 50 # €/mois budget sources payantes
async def screen_client(self, client_data: Dict) -> Dict:
"""Screening optimisé pour CGP avec contrôle des coûts"""
results = {
'client_id': client_data['id'],
'risk_level': 'unknown',
'sources_checked': [],
'total_cost': 0.0,
'findings': []
}
# 1. TOUJOURS vérifier sources gratuites en premier
for source in self.free_sources:
try:
finding = await self._check_source(source, client_data)
if finding:
results['findings'].append(finding)
results['sources_checked'].append(source)
except Exception as e:
logger.warning(f"Free source {source} failed: {e}")
# 2. Évaluation du risque basée sur sources gratuites
initial_risk = self._assess_risk_from_findings(results['findings'])
# 3. Sources payantes SI risque élevé ou client important
if initial_risk > 0.3 or client_data.get('patrimony', 0) > 500000:
for source in self.paid_sources:
cost = self._get_source_cost(source)
if results['total_cost'] + cost <= self.budget_threshold:
try:
finding = await self._check_paid_source(source, client_data)
if finding:
results['findings'].append(finding)
results['sources_checked'].append(source)
results['total_cost'] += cost
except Exception as e:
logger.warning(f"Paid source {source} failed: {e}")
# 4. Classification finale du risque
results['risk_level'] = self._final_risk_assessment(results['findings'])
return results
async def _check_source(self, source: str, client_data: Dict) -> Optional[Dict]:
"""Vérification source avec gestion d'erreurs gracieuse"""
if source == 'sirene':
return await self._check_sirene(client_data.get('siren'))
elif source == 'ofac':
return await self._check_ofac(client_data.get('name'))
elif source == 'eu_sanctions':
return await self._check_eu_sanctions(client_data.get('name'))
return None
Architecture haute performance pour Fintech :
class FintechScalableArchitecture:
def __init__(self):
self.async_executor = AsyncioExecutor(max_workers=100)
self.batch_processor = BatchProcessor(batch_size=50)
self.rate_limiters = self._setup_rate_limiters()
def _setup_rate_limiters(self) -> Dict[str, AsyncRateLimiter]:
"""Configure rate limiters per source API"""
return {
'inpi': AsyncRateLimiter(1000, 60), # 1000/minute
'world_check': AsyncRateLimiter(100, 60), # 100/minute
'sirene': AsyncRateLimiter(25, 60), # 25/minute (free limit)
'rbe': AsyncRateLimiter(200, 60) # 200/minute
}
async def bulk_screening(self, entities: List[Dict],
required_sources: List[str]) -> List[Dict]:
"""High-performance bulk screening for Fintech volumes"""
# 1. Pre-processing: group entities by type and risk level
entity_groups = self._group_entities_by_risk(entities)
# 2. Parallel processing by risk level
tasks = []
for risk_level, entity_group in entity_groups.items():
task = self._process_entity_group(entity_group, required_sources, risk_level)
tasks.append(task)
# 3. Execute all groups in parallel
group_results = await asyncio.gather(*tasks, return_exceptions=True)
# 4. Flatten results
all_results = []
for group_result in group_results:
if not isinstance(group_result, Exception):
all_results.extend(group_result)
return all_results
async def _process_entity_group(self, entities: List[Dict],
sources: List[str], risk_level: str) -> List[Dict]:
"""Process entity group with appropriate optimization"""
if risk_level == 'low':
# Low risk: use cache aggressively, fewer sources
optimized_sources = [s for s in sources if s in ['sirene', 'ofac']]
cache_ttl = timedelta(hours=24)
elif risk_level == 'medium':
# Medium risk: balanced approach
optimized_sources = sources
cache_ttl = timedelta(hours=6)
else:
# High risk: all sources, fresh data
optimized_sources = sources + ['world_check'] # Add premium source
cache_ttl = timedelta(hours=1)
# Batch processing for supported sources
batch_results = []
individual_results = []
for source in optimized_sources:
if self._supports_batch(source):
batch_result = await self._batch_process_source(
entities, source, cache_ttl
)
batch_results.extend(batch_result)
else:
# Process individually with rate limiting
async with self.rate_limiters[source]:
individual_result = await self._process_individual_source(
entities, source, cache_ttl
)
individual_results.extend(individual_result)
# Merge results
return self._merge_source_results(batch_results + individual_results)
async def real_time_webhook_processing(self, webhook_data: Dict):
"""Process webhook events in real-time for instant decisions"""
entity_id = webhook_data['entity_id']
event_type = webhook_data['event_type']
# Determine required sources based on event type
if event_type == 'sanctions_update':
required_sources = ['ofac', 'eu_sanctions']
elif event_type == 'pep_update':
required_sources = ['world_check']
elif event_type == 'company_change':
required_sources = ['inpi', 'rbe']
else:
required_sources = ['sirene'] # Default minimal check
# Fast-track processing with circuit breakers
results = {}
for source in required_sources:
try:
async with asyncio.timeout(5.0): # 5 second timeout
result = await self._quick_check_source(entity_id, source)
results[source] = result
except asyncio.TimeoutError:
logger.warning(f"Source {source} timed out for entity {entity_id}")
results[source] = {'status': 'timeout', 'data': None}
except Exception as e:
logger.error(f"Source {source} failed for entity {entity_id}: {e}")
results[source] = {'status': 'error', 'data': None}
# Send results via webhook to client
await self._send_client_webhook(entity_id, results)
return results
Configuration spécialisée Asset Management :
class AssetManagementSpecialized:
def __init__(self):
self.specialized_sources = {
'ubo_tracking': ['rbe', 'bodacc', 'inpi'],
'mandataires': ['inpi', 'bodacc', 'orias'],
'institutional_screening': ['world_check', 'ofac', 'eu_sanctions'],
'fund_specific': ['amf_database', 'esma_database']
}
async def comprehensive_investor_screening(self, investor_data: Dict,
fund_type: str) -> Dict:
"""Comprehensive screening for asset management investors"""
screening_config = self._get_screening_config(fund_type)
results = {
'investor_id': investor_data['id'],
'fund_type': fund_type,
'screening_level': screening_config['level'],
'ubo_analysis': {},
'mandataires_verification': {},
'sanctions_pep_screening': {},
'regulatory_compliance': {},
'final_assessment': {}
}
# 1. UBO Analysis (most complex part)
if screening_config['requires_ubo']:
results['ubo_analysis'] = await self._deep_ubo_analysis(investor_data)
# 2. Mandataires verification
if 'mandataires' in investor_data:
results['mandataires_verification'] = await self._verify_mandataires(
investor_data['mandataires']
)
# 3. Sanctions/PEP screening with enhanced thresholds
results['sanctions_pep_screening'] = await self._enhanced_sanctions_screening(
investor_data, screening_config['pep_threshold']
)
# 4. Regulatory compliance checks
results['regulatory_compliance'] = await self._regulatory_compliance_check(
investor_data, fund_type
)
# 5. Final assessment and risk rating
results['final_assessment'] = self._compute_final_assessment(results)
return results
async def _deep_ubo_analysis(self, investor_data: Dict) -> Dict:
"""Deep analysis of Ultimate Beneficial Owners"""
siren = investor_data.get('siren')
if not siren:
return {'status': 'no_siren', 'ubo_identified': False}
# Multi-source UBO investigation
ubo_sources = {}
# 1. Official RBE declaration
try:
rbe_data = await self._get_rbe_declaration(siren)
ubo_sources['rbe'] = rbe_data
except Exception as e:
logger.warning(f"RBE failed for {siren}: {e}")
ubo_sources['rbe'] = None
# 2. INPI corporate structure
try:
inpi_data = await self._get_inpi_ownership(siren)
ubo_sources['inpi'] = inpi_data
except Exception as e:
logger.warning(f"INPI failed for {siren}: {e}")
ubo_sources['inpi'] = None
# 3. BODACC historical changes
try:
bodacc_data = await self._get_bodacc_ownership_changes(siren)
ubo_sources['bodacc'] = bodacc_data
except Exception as e:
logger.warning(f"BODACC failed for {siren}: {e}")
ubo_sources['bodacc'] = None
# 4. Cross-validate and reconcile UBO information
reconciled_ubo = self._reconcile_ubo_sources(ubo_sources)
return {
'status': 'completed',
'sources_checked': list(ubo_sources.keys()),
'ubo_count': len(reconciled_ubo.get('beneficial_owners', [])),
'beneficial_owners': reconciled_ubo.get('beneficial_owners', []),
'ownership_structure': reconciled_ubo.get('structure', {}),
'confidence_score': reconciled_ubo.get('confidence', 0.0),
'flags': reconciled_ubo.get('flags', [])
}
Q1 2025 : Fondations robustes
Q2 2025 : IA et machine learning
Q3 2025 : Sources internationales
Q4 2025 : Compliance automatisée
Graph databases pour relations complexes :
# Neo4j pour mapper les relations UBO complexes
from neo4j import GraphDatabase
class UBOGraphAnalyzer:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
async def map_ownership_structure(self, root_company_siren: str) -> Dict:
"""Create graph representation of ownership structure"""
with self.driver.session() as session:
# Create nodes for companies and individuals
query = """
MATCH (root:Company {siren: $siren})
OPTIONAL MATCH (root)-[r:OWNS*1..5]->(subsidiary:Company)
OPTIONAL MATCH (person:Person)-[owns:OWNS]->(root)
RETURN root, collect(DISTINCT subsidiary) as subsidiaries,
collect(DISTINCT person) as owners,
collect(DISTINCT r) as ownership_chain
"""
result = session.run(query, siren=root_company_siren)
# Process graph data to find ultimate beneficial owners
ownership_graph = self._process_graph_results(result)
return {
'root_company': ownership_graph['root'],
'ownership_levels': ownership_graph['levels'],
'ultimate_beneficial_owners': ownership_graph['ubos'],
'risk_indicators': self._analyze_ownership_risks(ownership_graph)
}
Blockchain pour audit trail :
class BlockchainAuditTrail:
def __init__(self, blockchain_provider: str = "ethereum"):
self.provider = blockchain_provider
self.smart_contract_address = "0x..."
async def record_screening_decision(self, decision_data: Dict) -> str:
"""Record KYC decision on blockchain for immutable audit trail"""
# Create hash of decision data
decision_hash = hashlib.sha256(
json.dumps(decision_data, sort_keys=True).encode()
).hexdigest()
# Record on blockchain
transaction_hash = await self._submit_to_blockchain({
'decision_hash': decision_hash,
'timestamp': datetime.now().isoformat(),
'entity_id': decision_data['entity_id'],
'decision': decision_data['final_decision'],
'sources_used': decision_data['sources'],
'risk_score': decision_data['risk_score']
})
return transaction_hash
async def verify_audit_trail(self, entity_id: str) -> List[Dict]:
"""Verify complete audit trail for an entity"""
# Query blockchain for all records
records = await self._query_blockchain_records(entity_id)
# Verify integrity of each record
verified_records = []
for record in records:
is_valid = await self._verify_record_integrity(record)
verified_records.append({
**record,
'integrity_verified': is_valid,
'blockchain_confirmed': True
})
return verified_records
Projections de charge 2025-2026 :
SCALABILITY_PROJECTIONS = {
"2025": {
"entities_monitored": 1_000_000,
"requests_per_second": 500,
"data_ingestion_gb_day": 100,
"api_calls_per_day": 2_000_000
},
"2026": {
"entities_monitored": 5_000_000,
"requests_per_second": 2000,
"data_ingestion_gb_day": 500,
"api_calls_per_day": 10_000_000
}
}
class ScalabilityPlanner:
def __init__(self):
self.current_capacity = self._assess_current_capacity()
def plan_infrastructure_scaling(self, target_year: str) -> Dict:
"""Plan infrastructure scaling for target projections"""
target_metrics = SCALABILITY_PROJECTIONS[target_year]
scaling_plan = {
'compute_scaling': self._plan_compute_scaling(target_metrics),
'storage_scaling': self._plan_storage_scaling(target_metrics),
'network_scaling': self._plan_network_scaling(target_metrics),
'database_sharding': self._plan_database_sharding(target_metrics),
'cost_projections': self._calculate_infrastructure_costs(target_metrics)
}
return scaling_plan
def _plan_compute_scaling(self, target_metrics: Dict) -> Dict:
"""Plan compute resource scaling"""
current_rps = self.current_capacity['requests_per_second']
target_rps = target_metrics['requests_per_second']
scaling_factor = target_rps / current_rps
return {
'kubernetes_nodes': {
'current': 10,
'target': int(10 * scaling_factor * 1.2), # 20% buffer
'node_type': 'c5.2xlarge'
},
'auto_scaling': {
'min_nodes': int(5 * scaling_factor),
'max_nodes': int(20 * scaling_factor),
'cpu_threshold': 70,
'memory_threshold': 80
},
'load_balancing': {
'algorithm': 'least_connections',
'health_checks': True,
'sticky_sessions': False
}
}
La maîtrise des intégrations multi-sources transforme le KYC d'un processus fragmenté en orchestration intelligente. Les entreprises financières qui excellent dans cette intégration gagnent :
Les 3 piliers du succès technique :
🔧 Architecture résiliente
💰 Optimisation économique
🚀 Scalabilité préparée
L'intégration multi-sources n'est plus un "nice-to-have" technique mais le fondement architectural du KYC perpétuel performant.
Moins de faux positifs, moins de relances inutiles, plus de dossiers complets. Exports prêts-contrôle AMF/ACPR en quelques clics.
Automatisez votre Due Diligence Renforcée (EDD) : critères déclencheurs, workflows par segment, APIs, preuves d'audit et ROI. Guide complet CGP/Fintech 2025.
Découvrez l'orchestration KYC perpétuel : surveillance continue, policy-as-code, case management et relances automatisées. Guide pratique AMF/ACPR.
Moins de faux positifs, moins de relances inutiles, plus de dossiers complets. Exports prêts-contrôle AMF/ACPR en quelques clics.