feat(02-03): add MediaAttachment model, WhatsApp normalizer, and signature verification
- Add MediaType(StrEnum) and MediaAttachment(BaseModel) to shared/models/message.py - Add media: list[MediaAttachment] field to MessageContent - Add whatsapp_app_secret, whatsapp_verify_token, and MinIO settings to shared/config.py - Add normalize_whatsapp_event() to gateway/normalize.py (text, image, document support) - Create whatsapp.py adapter with verify_whatsapp_signature() and verify_hub_challenge() - 30 new passing tests (signature verification + normalizer)
This commit is contained in:
143
tests/unit/test_whatsapp_verify.py
Normal file
143
tests/unit/test_whatsapp_verify.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""
|
||||
Unit tests for WhatsApp webhook signature verification.
|
||||
|
||||
Tests CHAN-03: Webhook signature is verified via HMAC-SHA256 on raw body bytes
|
||||
before any JSON parsing.
|
||||
|
||||
Covers:
|
||||
- Valid signature passes (returns raw body bytes)
|
||||
- Invalid signature raises HTTPException(403)
|
||||
- Timing-safe comparison used (hmac.compare_digest)
|
||||
- Hub challenge verification (GET endpoint)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
|
||||
|
||||
def make_valid_signature(body: bytes, app_secret: str) -> str:
|
||||
"""Helper: generate a valid X-Hub-Signature-256 header value."""
|
||||
sig = hmac.new(app_secret.encode(), body, hashlib.sha256).hexdigest()
|
||||
return f"sha256={sig}"
|
||||
|
||||
|
||||
class TestWhatsAppSignatureVerification:
|
||||
"""Tests for verify_whatsapp_signature function."""
|
||||
|
||||
def test_valid_signature_returns_body(self) -> None:
|
||||
"""verify_whatsapp_signature must return raw body bytes when signature is valid."""
|
||||
from gateway.channels.whatsapp import verify_whatsapp_signature
|
||||
|
||||
app_secret = "test_app_secret_abc123"
|
||||
body = b'{"object":"whatsapp_business_account"}'
|
||||
sig = make_valid_signature(body, app_secret)
|
||||
|
||||
result = verify_whatsapp_signature(body, sig, app_secret)
|
||||
assert result == body
|
||||
|
||||
def test_invalid_signature_raises_403(self) -> None:
|
||||
"""verify_whatsapp_signature must raise HTTPException(403) when signature is invalid."""
|
||||
from gateway.channels.whatsapp import verify_whatsapp_signature
|
||||
|
||||
app_secret = "test_app_secret_abc123"
|
||||
body = b'{"object":"whatsapp_business_account"}'
|
||||
bad_sig = "sha256=deadbeef0000000000000000000000000000000000000000000000000000000"
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
verify_whatsapp_signature(body, bad_sig, app_secret)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
def test_missing_sha256_prefix_raises_403(self) -> None:
|
||||
"""Signature without 'sha256=' prefix must raise HTTPException(403)."""
|
||||
from gateway.channels.whatsapp import verify_whatsapp_signature
|
||||
|
||||
app_secret = "test_app_secret_abc123"
|
||||
body = b'{"test": "data"}'
|
||||
# No 'sha256=' prefix
|
||||
bad_sig = "abcdef1234567890" * 4
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
verify_whatsapp_signature(body, bad_sig, app_secret)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
def test_empty_body_valid_signature(self) -> None:
|
||||
"""verify_whatsapp_signature works on empty body with correct signature."""
|
||||
from gateway.channels.whatsapp import verify_whatsapp_signature
|
||||
|
||||
app_secret = "secret"
|
||||
body = b""
|
||||
sig = make_valid_signature(body, app_secret)
|
||||
|
||||
result = verify_whatsapp_signature(body, sig, app_secret)
|
||||
assert result == body
|
||||
|
||||
def test_tampered_body_raises_403(self) -> None:
|
||||
"""Signature computed on original body fails if body is modified."""
|
||||
from gateway.channels.whatsapp import verify_whatsapp_signature
|
||||
|
||||
app_secret = "secret"
|
||||
original_body = b'{"message": "hello"}'
|
||||
sig = make_valid_signature(original_body, app_secret)
|
||||
|
||||
# Tamper with the body
|
||||
tampered_body = b'{"message": "injected"}'
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
verify_whatsapp_signature(tampered_body, sig, app_secret)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
|
||||
class TestWhatsAppHubVerification:
|
||||
"""Tests for WhatsApp webhook GET verification (hub challenge handshake)."""
|
||||
|
||||
def test_valid_token_returns_challenge(self) -> None:
|
||||
"""GET webhook with valid verify_token must return hub.challenge as plain text."""
|
||||
from gateway.channels.whatsapp import verify_hub_challenge
|
||||
|
||||
token = "my_verify_token_xyz"
|
||||
challenge = "challenge_abc_123"
|
||||
|
||||
result = verify_hub_challenge(
|
||||
hub_mode="subscribe",
|
||||
hub_verify_token=token,
|
||||
hub_challenge=challenge,
|
||||
expected_token=token,
|
||||
)
|
||||
assert result == challenge
|
||||
|
||||
def test_invalid_token_raises_403(self) -> None:
|
||||
"""GET webhook with wrong verify_token must raise HTTPException(403)."""
|
||||
from gateway.channels.whatsapp import verify_hub_challenge
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
verify_hub_challenge(
|
||||
hub_mode="subscribe",
|
||||
hub_verify_token="wrong_token",
|
||||
hub_challenge="challenge_123",
|
||||
expected_token="correct_token",
|
||||
)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
def test_wrong_mode_raises_403(self) -> None:
|
||||
"""GET webhook with mode != 'subscribe' must raise HTTPException(403)."""
|
||||
from gateway.channels.whatsapp import verify_hub_challenge
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
verify_hub_challenge(
|
||||
hub_mode="unsubscribe",
|
||||
hub_verify_token="correct_token",
|
||||
hub_challenge="challenge_123",
|
||||
expected_token="correct_token",
|
||||
)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
Reference in New Issue
Block a user