SHT30 온습도 모니터링 시스템 전체 소스(서버 PHP, STM32 펌웨어, SQL, 테스트). 전체 코드리뷰에서 도출된 보안 하드닝 10건 반영: - 요청 서명 HMAC-SHA256 전환(펌웨어 sig.c/서버 config.php/호스트 패리티 동시) - 재전송 방어 + 기본 API_KEY fail-closed + 디바이스 문자열 정제(api/sensor_data.php) - 오프라인 SMS 중복 발송 경합 제거(cron_heartbeat.php, 원자적 선점) - CSV 수식 주입 방지(monthly_report.php), 감사로그 회전 락(retention_cleanup.php) - 브루트포스 카운터 원자화(login.php), 예시 TOTP 비밀키 무효화, 마이그레이션 멱등화 _backup/(하드코딩 실 비밀값 포함)·config.local.php·런타임 상태는 .gitignore 제외.
94 lines
3.2 KiB
Python
94 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
reference.py - 펌웨어 wire 포맷의 Python 레퍼런스.
|
|
|
|
firmware/common/jsonbody.c 및 sig.c 와 *바이트 단위로 동일*한 출력을 만든다.
|
|
이 레퍼런스를 PHP verify_signature_raw 로 교차검증하면(parity_test.py),
|
|
동일 알고리즘을 쓰는 펌웨어 C 코드의 wire 계약이 서버와 호환됨을 입증한다.
|
|
|
|
대응:
|
|
jb_fixed2() <-> fixed2()
|
|
jb_sht30_event() <-> sht30_event_body()
|
|
sig_raw_body() <-> sign_raw()
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
|
|
|
|
def fixed2(v: float) -> str:
|
|
"""jb_fixed2() 와 동일한 소수 2자리 고정 포맷 (round-half-up, locale 무관)."""
|
|
neg = v < 0
|
|
if neg:
|
|
v = -v
|
|
scaled = int(v * 100.0 + 0.5) # round half up (C: (long long)(v*100+0.5))
|
|
ip = scaled // 100
|
|
fp = scaled % 100
|
|
sign = "-" if (neg and scaled != 0) else ""
|
|
return f"{sign}{ip}.{fp:02d}"
|
|
|
|
|
|
def _esc(s: str) -> str:
|
|
"""jb_estr() 와 동일: '\"','\\\\', 제어문자(<0x20)만 이스케이프. '/'는 그대로."""
|
|
out = []
|
|
for ch in s:
|
|
o = ord(ch)
|
|
if ch == '"':
|
|
out.append('\\"')
|
|
elif ch == '\\':
|
|
out.append('\\\\')
|
|
elif o < 0x20:
|
|
out.append('\\u%04x' % o)
|
|
else:
|
|
out.append(ch)
|
|
return "".join(out)
|
|
|
|
|
|
def sht30_event_body(device_id: str, device_location: str, sensor_id: int,
|
|
sensor_name: str, event_type: str, timestamp: int,
|
|
temperature_c: float, humidity_percent: float,
|
|
metric_status: str, app_version: str) -> str:
|
|
return (
|
|
'{'
|
|
f'"device_id":"{_esc(device_id)}",'
|
|
f'"device_location":"{_esc(device_location)}",'
|
|
f'"sensor_id":{int(sensor_id)},'
|
|
f'"sensor_name":"{_esc(sensor_name)}",'
|
|
f'"event_type":"{_esc(event_type)}",'
|
|
f'"timestamp":{int(timestamp)},'
|
|
f'"metric_type":"sht30",'
|
|
f'"temperature_c":{fixed2(temperature_c)},'
|
|
f'"humidity_percent":{fixed2(humidity_percent)},'
|
|
f'"metric_status":"{_esc(metric_status)}",'
|
|
f'"app_version":"{_esc(app_version)}"'
|
|
'}'
|
|
)
|
|
|
|
|
|
def sign_raw(api_key: str, body: str) -> str:
|
|
"""sig_raw_body(): lowercase_hex(HMAC-SHA256(key=api_key, msg=body))."""
|
|
return hmac.new(api_key.encode("utf-8"), body.encode("utf-8"), hashlib.sha256).hexdigest()
|
|
|
|
|
|
# ── SHT3x CRC8 (sht30_convert.c 의 sht30_crc8 과 동일) ───────────────────────
|
|
def sht30_crc8(data: bytes) -> int:
|
|
crc = 0xFF
|
|
for b in data:
|
|
crc ^= b
|
|
for _ in range(8):
|
|
crc = ((crc << 1) ^ 0x31) & 0xFF if (crc & 0x80) else (crc << 1) & 0xFF
|
|
return crc
|
|
|
|
|
|
def sht30_parse(frame: bytes):
|
|
"""sht30_parse(): (temp_c, rh) 또는 예외."""
|
|
if sht30_crc8(frame[0:2]) != frame[2]:
|
|
raise ValueError("temperature CRC mismatch")
|
|
if sht30_crc8(frame[3:5]) != frame[5]:
|
|
raise ValueError("humidity CRC mismatch")
|
|
raw_t = (frame[0] << 8) | frame[1]
|
|
raw_rh = (frame[3] << 8) | frame[4]
|
|
temp_c = -45.0 + 175.0 * (raw_t / 65535.0)
|
|
rh = 100.0 * (raw_rh / 65535.0)
|
|
return temp_c, rh
|