SHT30 온습도 모니터링 시스템 전체 소스(서버 PHP, STM32 펌웨어, SQL, 테스트). 전체 코드리뷰에서 도출된 보안 하드닝 10건 반영: - 요청 서명 HMAC-SHA256 전환(펌웨어 sig.c/서버 config.php/호스트 패리티 동시) - 재전송 방어 + 기본 API_KEY fail-closed + 디바이스 문자열 정제(api/sensor_data.php) - 오프라인 SMS 중복 발송 경합 제거(cron_heartbeat.php, 원자적 선점) - CSV 수식 주입 방지(monthly_report.php), 감사로그 회전 락(retention_cleanup.php) - 브루트포스 카운터 원자화(login.php), 예시 TOTP 비밀키 무효화, 마이그레이션 멱등화 _backup/(하드코딩 실 비밀값 포함)·config.local.php·런타임 상태는 .gitignore 제외.
152 lines
5.6 KiB
Python
152 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
parity_test.py - 펌웨어 wire 계약을 서버(PHP)와 교차검증한다.
|
|
|
|
검증 항목:
|
|
1. SHT30 본문이 유효한 JSON 이고 서버 필수 필드를 포함한다.
|
|
2. raw-body 서명(HMAC-SHA256(key=API_KEY, msg=body))이 서버 verify_signature_raw 로 통과한다.
|
|
3. 본문/서명 변조 시 서버가 거부한다.
|
|
4. SHT3x CRC8 및 변환식이 데이터시트 벡터와 일치한다.
|
|
5. fixed2() 소수 포맷이 기대 문자열과 일치한다(펌웨어 jb_fixed2 와 동일).
|
|
|
|
C 펌웨어(jsonbody.c/sig.c/sht30_convert.c)는 reference.py 와 동일 알고리즘으로
|
|
작성되어 있어, 본 테스트 통과는 펌웨어 wire 계약이 서버와 호환됨을 의미한다.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import reference as ref
|
|
|
|
HERE = Path(__file__).resolve().parent
|
|
PHP_VERIFY = HERE / "php_verify.php"
|
|
|
|
failures: list[str] = []
|
|
|
|
|
|
def check(cond: bool, msg: str) -> None:
|
|
if not cond:
|
|
failures.append(msg)
|
|
|
|
|
|
def php_api_key() -> str:
|
|
# bytes 모드로 받아 UTF-8 로 디코드 (Windows 로캘 인코딩 영향 제거)
|
|
out = subprocess.run(["php", str(PHP_VERIFY), "--print-key"],
|
|
capture_output=True, check=True)
|
|
return out.stdout.decode("utf-8")
|
|
|
|
|
|
def php_verify(body: str, sig: str) -> str:
|
|
# 본문은 반드시 UTF-8 바이트로 전달해야 한다. 서명도 동일 UTF-8 본문에 대해
|
|
# HMAC 으로 계산되므로(sign_raw: hmac key=API_KEY, msg=body.encode("utf-8")),
|
|
# 바이트가 일치해야 통과한다.
|
|
out = subprocess.run(["php", str(PHP_VERIFY), sig],
|
|
input=body.encode("utf-8"), capture_output=True, check=True)
|
|
return out.stdout.decode("utf-8").strip()
|
|
|
|
|
|
REQUIRED = ["device_id", "sensor_id", "event_type", "timestamp"]
|
|
|
|
|
|
def assert_server_accepts(body: str, api_key: str, label: str) -> None:
|
|
# 유효한 JSON + 필수 필드
|
|
try:
|
|
obj = json.loads(body)
|
|
except Exception as exc: # noqa: BLE001
|
|
check(False, f"{label}: body is not valid JSON: {exc}")
|
|
return
|
|
for field in REQUIRED:
|
|
check(field in obj, f"{label}: missing required field '{field}'")
|
|
|
|
# 올바른 서명 -> 서버 통과
|
|
good = ref.sign_raw(api_key, body)
|
|
check(php_verify(body, good) == "OK", f"{label}: valid signature should pass server verify")
|
|
|
|
# 본문 1바이트 변조 -> 거부
|
|
check(php_verify(body + " ", good) == "FAIL", f"{label}: tampered body must fail")
|
|
# 서명 변조 -> 거부
|
|
bad = ("0" * 64) if good != "0" * 64 else ("1" * 64)
|
|
check(php_verify(body, bad) == "FAIL", f"{label}: wrong signature must fail")
|
|
|
|
|
|
def main() -> int:
|
|
api_key = php_api_key()
|
|
check(api_key != "", "PHP API_KEY should be readable")
|
|
|
|
# ── 1~3. SHT30 이벤트 (startup/periodic, 정상/경계/음수/정수값 float) ─────
|
|
sht_cases = [
|
|
("startup", 24.0, 48.5, "normal"),
|
|
("periodic", 24.0, 48.5, "normal"),
|
|
("periodic", -3.245, 0.0, "normal"),
|
|
("periodic", 100.0, 100.0, "normal"),
|
|
("periodic", 125.0, 0.004, "out_of_range"),
|
|
]
|
|
for ev, t, h, st in sht_cases:
|
|
body = ref.sht30_event_body(
|
|
device_id="stm32-sht30-01",
|
|
device_location="서버실",
|
|
sensor_id=2,
|
|
sensor_name="2번 센서 (SHT30)",
|
|
event_type=ev,
|
|
timestamp=1700000123,
|
|
temperature_c=t,
|
|
humidity_percent=h,
|
|
metric_status=st,
|
|
app_version="v2606-sht30",
|
|
)
|
|
assert_server_accepts(body, api_key, f"sht30/{ev}/{t}/{h}")
|
|
|
|
# ── 4. SHT3x CRC8 / 변환 데이터시트 벡터 ─────────────────────────────────
|
|
# 데이터시트: CRC8(0xBE,0xEF) = 0x92
|
|
check(ref.sht30_crc8(bytes([0xBE, 0xEF])) == 0x92, "SHT3x CRC8 datasheet vector (0xBEEF -> 0x92)")
|
|
|
|
# raw_t=0x6635 -> 약 25.0C, raw_rh=0x7E66 -> 약 49.4%RH (근사 검증)
|
|
raw_t = 0x6635
|
|
raw_rh = 0x7E66
|
|
frame = bytes([
|
|
(raw_t >> 8) & 0xFF, raw_t & 0xFF, ref.sht30_crc8(bytes([(raw_t >> 8) & 0xFF, raw_t & 0xFF])),
|
|
(raw_rh >> 8) & 0xFF, raw_rh & 0xFF, ref.sht30_crc8(bytes([(raw_rh >> 8) & 0xFF, raw_rh & 0xFF])),
|
|
])
|
|
temp_c, rh = ref.sht30_parse(frame)
|
|
check(abs(temp_c - (-45.0 + 175.0 * raw_t / 65535.0)) < 1e-9, "SHT30 temp conversion")
|
|
check(abs(rh - (100.0 * raw_rh / 65535.0)) < 1e-9, "SHT30 humidity conversion")
|
|
check(24.0 < temp_c < 26.0, "SHT30 temp in expected ballpark (~25C)")
|
|
# CRC 변조 -> 예외
|
|
bad_frame = bytearray(frame)
|
|
bad_frame[2] ^= 0xFF
|
|
try:
|
|
ref.sht30_parse(bytes(bad_frame))
|
|
check(False, "SHT30 bad CRC should raise")
|
|
except ValueError:
|
|
pass
|
|
|
|
# ── 5. fixed2 포맷 (펌웨어 jb_fixed2 와 동일해야 함) ──────────────────────
|
|
fixed2_vectors = {
|
|
24.0: "24.00",
|
|
48.5: "48.50",
|
|
-3.245: "-3.25", # round half up
|
|
0.0: "0.00",
|
|
100.0: "100.00",
|
|
0.004: "0.00",
|
|
-0.001: "0.00", # 반올림 결과 0 -> 음수 부호 없음
|
|
9.999: "10.00",
|
|
}
|
|
for v, expected in fixed2_vectors.items():
|
|
got = ref.fixed2(v)
|
|
check(got == expected, f"fixed2({v}) expected {expected} got {got}")
|
|
|
|
if failures:
|
|
sys.stderr.write("Parity test FAILURES:\n")
|
|
for f in failures:
|
|
sys.stderr.write(f"- {f}\n")
|
|
return 1
|
|
|
|
print("Firmware<->server parity checks passed.")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|