POSA_LEAKSMS/firmware/test/host/parity_test.py
유창욱 90f121e14c chore: import codebase with security hardening
SHT30 온습도 모니터링 시스템 전체 소스(서버 PHP, STM32 펌웨어, SQL, 테스트).
전체 코드리뷰에서 도출된 보안 하드닝 10건 반영:
- 요청 서명 HMAC-SHA256 전환(펌웨어 sig.c/서버 config.php/호스트 패리티 동시)
- 재전송 방어 + 기본 API_KEY fail-closed + 디바이스 문자열 정제(api/sensor_data.php)
- 오프라인 SMS 중복 발송 경합 제거(cron_heartbeat.php, 원자적 선점)
- CSV 수식 주입 방지(monthly_report.php), 감사로그 회전 락(retention_cleanup.php)
- 브루트포스 카운터 원자화(login.php), 예시 TOTP 비밀키 무효화, 마이그레이션 멱등화

_backup/(하드코딩 실 비밀값 포함)·config.local.php·런타임 상태는 .gitignore 제외.
2026-06-20 09:37:40 +09:00

152 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""
parity_test.py - 펌웨어 wire 계약을 서버(PHP)와 교차검증한다.
검증 항목:
1. SHT30 본문이 유효한 JSON 이고 서버 필수 필드를 포함한다.
2. raw-body 서명(HMAC-SHA256(key=API_KEY, msg=body))이 서버 verify_signature_raw 로 통과한다.
3. 본문/서명 변조 시 서버가 거부한다.
4. SHT3x CRC8 및 변환식이 데이터시트 벡터와 일치한다.
5. fixed2() 소수 포맷이 기대 문자열과 일치한다(펌웨어 jb_fixed2 와 동일).
C 펌웨어(jsonbody.c/sig.c/sht30_convert.c)는 reference.py 와 동일 알고리즘으로
작성되어 있어, 본 테스트 통과는 펌웨어 wire 계약이 서버와 호환됨을 의미한다.
"""
from __future__ import annotations
import json
import subprocess
import sys
from pathlib import Path
import reference as ref
HERE = Path(__file__).resolve().parent
PHP_VERIFY = HERE / "php_verify.php"
failures: list[str] = []
def check(cond: bool, msg: str) -> None:
if not cond:
failures.append(msg)
def php_api_key() -> str:
# bytes 모드로 받아 UTF-8 로 디코드 (Windows 로캘 인코딩 영향 제거)
out = subprocess.run(["php", str(PHP_VERIFY), "--print-key"],
capture_output=True, check=True)
return out.stdout.decode("utf-8")
def php_verify(body: str, sig: str) -> str:
# 본문은 반드시 UTF-8 바이트로 전달해야 한다. 서명도 동일 UTF-8 본문에 대해
# HMAC 으로 계산되므로(sign_raw: hmac key=API_KEY, msg=body.encode("utf-8")),
# 바이트가 일치해야 통과한다.
out = subprocess.run(["php", str(PHP_VERIFY), sig],
input=body.encode("utf-8"), capture_output=True, check=True)
return out.stdout.decode("utf-8").strip()
REQUIRED = ["device_id", "sensor_id", "event_type", "timestamp"]
def assert_server_accepts(body: str, api_key: str, label: str) -> None:
# 유효한 JSON + 필수 필드
try:
obj = json.loads(body)
except Exception as exc: # noqa: BLE001
check(False, f"{label}: body is not valid JSON: {exc}")
return
for field in REQUIRED:
check(field in obj, f"{label}: missing required field '{field}'")
# 올바른 서명 -> 서버 통과
good = ref.sign_raw(api_key, body)
check(php_verify(body, good) == "OK", f"{label}: valid signature should pass server verify")
# 본문 1바이트 변조 -> 거부
check(php_verify(body + " ", good) == "FAIL", f"{label}: tampered body must fail")
# 서명 변조 -> 거부
bad = ("0" * 64) if good != "0" * 64 else ("1" * 64)
check(php_verify(body, bad) == "FAIL", f"{label}: wrong signature must fail")
def main() -> int:
api_key = php_api_key()
check(api_key != "", "PHP API_KEY should be readable")
# ── 1~3. SHT30 이벤트 (startup/periodic, 정상/경계/음수/정수값 float) ─────
sht_cases = [
("startup", 24.0, 48.5, "normal"),
("periodic", 24.0, 48.5, "normal"),
("periodic", -3.245, 0.0, "normal"),
("periodic", 100.0, 100.0, "normal"),
("periodic", 125.0, 0.004, "out_of_range"),
]
for ev, t, h, st in sht_cases:
body = ref.sht30_event_body(
device_id="stm32-sht30-01",
device_location="서버실",
sensor_id=2,
sensor_name="2번 센서 (SHT30)",
event_type=ev,
timestamp=1700000123,
temperature_c=t,
humidity_percent=h,
metric_status=st,
app_version="v2606-sht30",
)
assert_server_accepts(body, api_key, f"sht30/{ev}/{t}/{h}")
# ── 4. SHT3x CRC8 / 변환 데이터시트 벡터 ─────────────────────────────────
# 데이터시트: CRC8(0xBE,0xEF) = 0x92
check(ref.sht30_crc8(bytes([0xBE, 0xEF])) == 0x92, "SHT3x CRC8 datasheet vector (0xBEEF -> 0x92)")
# raw_t=0x6635 -> 약 25.0C, raw_rh=0x7E66 -> 약 49.4%RH (근사 검증)
raw_t = 0x6635
raw_rh = 0x7E66
frame = bytes([
(raw_t >> 8) & 0xFF, raw_t & 0xFF, ref.sht30_crc8(bytes([(raw_t >> 8) & 0xFF, raw_t & 0xFF])),
(raw_rh >> 8) & 0xFF, raw_rh & 0xFF, ref.sht30_crc8(bytes([(raw_rh >> 8) & 0xFF, raw_rh & 0xFF])),
])
temp_c, rh = ref.sht30_parse(frame)
check(abs(temp_c - (-45.0 + 175.0 * raw_t / 65535.0)) < 1e-9, "SHT30 temp conversion")
check(abs(rh - (100.0 * raw_rh / 65535.0)) < 1e-9, "SHT30 humidity conversion")
check(24.0 < temp_c < 26.0, "SHT30 temp in expected ballpark (~25C)")
# CRC 변조 -> 예외
bad_frame = bytearray(frame)
bad_frame[2] ^= 0xFF
try:
ref.sht30_parse(bytes(bad_frame))
check(False, "SHT30 bad CRC should raise")
except ValueError:
pass
# ── 5. fixed2 포맷 (펌웨어 jb_fixed2 와 동일해야 함) ──────────────────────
fixed2_vectors = {
24.0: "24.00",
48.5: "48.50",
-3.245: "-3.25", # round half up
0.0: "0.00",
100.0: "100.00",
0.004: "0.00",
-0.001: "0.00", # 반올림 결과 0 -> 음수 부호 없음
9.999: "10.00",
}
for v, expected in fixed2_vectors.items():
got = ref.fixed2(v)
check(got == expected, f"fixed2({v}) expected {expected} got {got}")
if failures:
sys.stderr.write("Parity test FAILURES:\n")
for f in failures:
sys.stderr.write(f"- {f}\n")
return 1
print("Firmware<->server parity checks passed.")
return 0
if __name__ == "__main__":
raise SystemExit(main())