- Add MediaType(StrEnum) and MediaAttachment(BaseModel) to shared/models/message.py - Add media: list[MediaAttachment] field to MessageContent - Add whatsapp_app_secret, whatsapp_verify_token, and MinIO settings to shared/config.py - Add normalize_whatsapp_event() to gateway/normalize.py (text, image, document support) - Create whatsapp.py adapter with verify_whatsapp_signature() and verify_hub_challenge() - 30 new passing tests (signature verification + normalizer)
144 lines
5.0 KiB
Python
144 lines
5.0 KiB
Python
"""
|
|
Unit tests for WhatsApp webhook signature verification.
|
|
|
|
Tests CHAN-03: Webhook signature is verified via HMAC-SHA256 on raw body bytes
|
|
before any JSON parsing.
|
|
|
|
Covers:
|
|
- Valid signature passes (returns raw body bytes)
|
|
- Invalid signature raises HTTPException(403)
|
|
- Timing-safe comparison used (hmac.compare_digest)
|
|
- Hub challenge verification (GET endpoint)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
|
|
|
|
def make_valid_signature(body: bytes, app_secret: str) -> str:
|
|
"""Helper: generate a valid X-Hub-Signature-256 header value."""
|
|
sig = hmac.new(app_secret.encode(), body, hashlib.sha256).hexdigest()
|
|
return f"sha256={sig}"
|
|
|
|
|
|
class TestWhatsAppSignatureVerification:
|
|
"""Tests for verify_whatsapp_signature function."""
|
|
|
|
def test_valid_signature_returns_body(self) -> None:
|
|
"""verify_whatsapp_signature must return raw body bytes when signature is valid."""
|
|
from gateway.channels.whatsapp import verify_whatsapp_signature
|
|
|
|
app_secret = "test_app_secret_abc123"
|
|
body = b'{"object":"whatsapp_business_account"}'
|
|
sig = make_valid_signature(body, app_secret)
|
|
|
|
result = verify_whatsapp_signature(body, sig, app_secret)
|
|
assert result == body
|
|
|
|
def test_invalid_signature_raises_403(self) -> None:
|
|
"""verify_whatsapp_signature must raise HTTPException(403) when signature is invalid."""
|
|
from gateway.channels.whatsapp import verify_whatsapp_signature
|
|
|
|
app_secret = "test_app_secret_abc123"
|
|
body = b'{"object":"whatsapp_business_account"}'
|
|
bad_sig = "sha256=deadbeef0000000000000000000000000000000000000000000000000000000"
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
verify_whatsapp_signature(body, bad_sig, app_secret)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
|
|
def test_missing_sha256_prefix_raises_403(self) -> None:
|
|
"""Signature without 'sha256=' prefix must raise HTTPException(403)."""
|
|
from gateway.channels.whatsapp import verify_whatsapp_signature
|
|
|
|
app_secret = "test_app_secret_abc123"
|
|
body = b'{"test": "data"}'
|
|
# No 'sha256=' prefix
|
|
bad_sig = "abcdef1234567890" * 4
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
verify_whatsapp_signature(body, bad_sig, app_secret)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
|
|
def test_empty_body_valid_signature(self) -> None:
|
|
"""verify_whatsapp_signature works on empty body with correct signature."""
|
|
from gateway.channels.whatsapp import verify_whatsapp_signature
|
|
|
|
app_secret = "secret"
|
|
body = b""
|
|
sig = make_valid_signature(body, app_secret)
|
|
|
|
result = verify_whatsapp_signature(body, sig, app_secret)
|
|
assert result == body
|
|
|
|
def test_tampered_body_raises_403(self) -> None:
|
|
"""Signature computed on original body fails if body is modified."""
|
|
from gateway.channels.whatsapp import verify_whatsapp_signature
|
|
|
|
app_secret = "secret"
|
|
original_body = b'{"message": "hello"}'
|
|
sig = make_valid_signature(original_body, app_secret)
|
|
|
|
# Tamper with the body
|
|
tampered_body = b'{"message": "injected"}'
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
verify_whatsapp_signature(tampered_body, sig, app_secret)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
|
|
|
|
class TestWhatsAppHubVerification:
|
|
"""Tests for WhatsApp webhook GET verification (hub challenge handshake)."""
|
|
|
|
def test_valid_token_returns_challenge(self) -> None:
|
|
"""GET webhook with valid verify_token must return hub.challenge as plain text."""
|
|
from gateway.channels.whatsapp import verify_hub_challenge
|
|
|
|
token = "my_verify_token_xyz"
|
|
challenge = "challenge_abc_123"
|
|
|
|
result = verify_hub_challenge(
|
|
hub_mode="subscribe",
|
|
hub_verify_token=token,
|
|
hub_challenge=challenge,
|
|
expected_token=token,
|
|
)
|
|
assert result == challenge
|
|
|
|
def test_invalid_token_raises_403(self) -> None:
|
|
"""GET webhook with wrong verify_token must raise HTTPException(403)."""
|
|
from gateway.channels.whatsapp import verify_hub_challenge
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
verify_hub_challenge(
|
|
hub_mode="subscribe",
|
|
hub_verify_token="wrong_token",
|
|
hub_challenge="challenge_123",
|
|
expected_token="correct_token",
|
|
)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
|
|
def test_wrong_mode_raises_403(self) -> None:
|
|
"""GET webhook with mode != 'subscribe' must raise HTTPException(403)."""
|
|
from gateway.channels.whatsapp import verify_hub_challenge
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
verify_hub_challenge(
|
|
hub_mode="unsubscribe",
|
|
hub_verify_token="correct_token",
|
|
hub_challenge="challenge_123",
|
|
expected_token="correct_token",
|
|
)
|
|
|
|
assert exc_info.value.status_code == 403
|