"""Core hashing and normalization utilities for IPC provenance tracking."""
from __future__ import annotations
import hashlib
import json
import re
from dataclasses import dataclass
from pathlib import PurePosixPath
from typing import Any, Protocol
_POLICY_TREE_HASH_VERSION = "policy_tree_hash_v1"
[docs]
class SupportsModelDump(Protocol):
"""Protocol for objects exposing Pydantic-style ``model_dump()``."""
[docs]
def model_dump(self) -> dict[str, Any]:
"""Return a plain dictionary representation."""
[docs]
@dataclass(frozen=True, slots=True)
class PolicyHashEntry:
"""Canonical file entry used to compute deterministic policy tree digests."""
relative_path: str
content_hash: str
[docs]
@dataclass(frozen=True, slots=True)
class DirectoryHashEntry:
"""Deterministic hash summary for one policy directory subtree."""
path: str
file_count: int
hash: str
def _normalise_system_prompt(text: str) -> str:
"""Normalize system prompt text for stable hashing."""
lines = [line.strip() for line in text.splitlines()]
return "\n".join(lines).strip()
def _normalise_output(text: str) -> str:
"""Normalize output text while preserving structure and case."""
stripped = text.strip()
return re.sub(r" {2,}", " ", stripped)
def _normalise_policy_relative_path(relative_path: str) -> str:
"""Normalize and validate policy-relative paths used in hash payloads."""
as_posix = PurePosixPath(relative_path.replace("\\", "/")).as_posix()
if as_posix.startswith("../") or "/../" in f"/{as_posix}":
raise ValueError(f"Policy relative path must not traverse upwards: {relative_path!r}")
normalized = as_posix.lstrip("./")
if normalized in {"", "."}:
raise ValueError("Policy relative path must not be empty")
return normalized
[docs]
def compute_payload_hash(payload_dict: dict) -> str:
"""Return SHA-256 hex digest for canonicalized payload JSON."""
canonical = json.dumps(payload_dict, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
[docs]
def compute_system_prompt_hash(prompt_text: str) -> str:
"""Return SHA-256 hex digest for normalized system prompt text."""
normalised = _normalise_system_prompt(prompt_text)
return hashlib.sha256(normalised.encode("utf-8")).hexdigest()
[docs]
def compute_output_hash(output_text: str) -> str:
"""Return SHA-256 hex digest for normalized output text."""
normalised = _normalise_output(output_text)
return hashlib.sha256(normalised.encode("utf-8")).hexdigest()
[docs]
def compute_ipc_id(
*,
input_hash: str,
system_prompt_hash: str,
model: str,
temperature: float,
max_tokens: int,
seed: int,
) -> str:
"""Return IPC identifier digest using explicit colon-delimited provenance fields."""
parts = [
input_hash,
system_prompt_hash,
model,
str(temperature),
str(max_tokens),
str(seed),
]
combined = ":".join(parts)
return hashlib.sha256(combined.encode("utf-8")).hexdigest()
[docs]
def compute_policy_file_hash(relative_path: str, content_bytes: bytes) -> str:
"""Return deterministic hash for one policy file path and its raw bytes."""
normalized_path = _normalise_policy_relative_path(relative_path)
payload = {
"hash_version": _POLICY_TREE_HASH_VERSION,
"relative_path": normalized_path,
"content_bytes_hex": content_bytes.hex(),
}
return compute_payload_hash(payload)
[docs]
def compute_policy_tree_hash(file_entries: list[PolicyHashEntry]) -> str:
"""Return deterministic digest for a full policy file-set snapshot."""
normalized_entries = [
{
"relative_path": _normalise_policy_relative_path(entry.relative_path),
"content_hash": entry.content_hash,
}
for entry in file_entries
]
normalized_entries.sort(key=lambda entry: entry["relative_path"])
payload = {
"hash_version": _POLICY_TREE_HASH_VERSION,
"entries": normalized_entries,
}
return compute_payload_hash(payload)
[docs]
def compute_policy_directory_hashes(
file_entries: list[PolicyHashEntry],
) -> list[DirectoryHashEntry]:
"""Return subtree hashes for each non-root directory in the policy tree."""
grouped: dict[str, list[PolicyHashEntry]] = {}
for entry in file_entries:
normalized_path = _normalise_policy_relative_path(entry.relative_path)
normalized_entry = PolicyHashEntry(
relative_path=normalized_path,
content_hash=entry.content_hash,
)
current = PurePosixPath(normalized_path).parent
while True:
directory = current.as_posix()
grouped.setdefault(directory, []).append(normalized_entry)
if directory == ".":
break
current = current.parent
results: list[DirectoryHashEntry] = []
for directory, directory_entries in grouped.items():
if directory == ".":
continue
results.append(
DirectoryHashEntry(
path=directory,
file_count=len(directory_entries),
hash=compute_policy_tree_hash(directory_entries),
)
)
return sorted(results, key=lambda entry: entry.path)
[docs]
def payload_hash(payload: SupportsModelDump) -> str:
"""Convenience wrapper: hash an object exposing ``model_dump()``."""
payload_dict = payload.model_dump()
if not isinstance(payload_dict, dict):
raise TypeError("payload.model_dump() must return a dict")
return compute_payload_hash(payload_dict)