Bases: SessionABC
Encrypted wrapper for Session implementations with TTL-based expiration.
This class wraps any SessionABC implementation to provide transparent
encryption/decryption of stored items using Fernet encryption with
per-session key derivation and automatic expiration of old data.
When items expire (exceed TTL), they are silently skipped during retrieval.
Note: Expired tokens are rejected based on the system clock of the application server.
To avoid valid tokens being rejected due to clock drift, ensure all servers in
your environment are synchronized using NTP.
Source code in src/agents/extensions/memory/encrypt_session.py
| class EncryptedSession(SessionABC):
"""Encrypted wrapper for Session implementations with TTL-based expiration.
This class wraps any SessionABC implementation to provide transparent
encryption/decryption of stored items using Fernet encryption with
per-session key derivation and automatic expiration of old data.
When items expire (exceed TTL), they are silently skipped during retrieval.
Note: Expired tokens are rejected based on the system clock of the application server.
To avoid valid tokens being rejected due to clock drift, ensure all servers in
your environment are synchronized using NTP.
"""
def __init__(
self,
session_id: str,
underlying_session: SessionABC,
encryption_key: str,
ttl: int = 600,
):
"""
Args:
session_id: ID for this session
underlying_session: The real session store (e.g. SQLiteSession, SQLAlchemySession)
encryption_key: Master key (Fernet key or raw secret)
ttl: Token time-to-live in seconds (default 10 min)
"""
self.session_id = session_id
self.underlying_session = underlying_session
self.ttl = ttl
master = _ensure_fernet_key_bytes(encryption_key)
self.cipher = _derive_session_fernet_key(master, session_id)
self._kid = "hkdf-v1"
self._ver = 1
def __getattr__(self, name):
return getattr(self.underlying_session, name)
def _wrap(self, item: TResponseInputItem) -> EncryptedEnvelope:
if isinstance(item, dict):
payload = item
elif hasattr(item, "model_dump"):
payload = item.model_dump()
elif hasattr(item, "__dict__"):
payload = item.__dict__
else:
payload = dict(item)
token = self.cipher.encrypt(_to_json_bytes(payload)).decode("utf-8")
return {"__enc__": 1, "v": self._ver, "kid": self._kid, "payload": token}
def _unwrap(self, item: TResponseInputItem | EncryptedEnvelope) -> TResponseInputItem | None:
if not _is_encrypted_envelope(item):
return cast(TResponseInputItem, item)
try:
token = item["payload"].encode("utf-8")
plaintext = self.cipher.decrypt(token, ttl=self.ttl)
return cast(TResponseInputItem, _from_json_bytes(plaintext))
except (InvalidToken, KeyError):
return None
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
encrypted_items = await self.underlying_session.get_items(limit)
valid_items: list[TResponseInputItem] = []
for enc in encrypted_items:
item = self._unwrap(enc)
if item is not None:
valid_items.append(item)
return valid_items
async def add_items(self, items: list[TResponseInputItem]) -> None:
wrapped: list[EncryptedEnvelope] = [self._wrap(it) for it in items]
await self.underlying_session.add_items(cast(list[TResponseInputItem], wrapped))
async def pop_item(self) -> TResponseInputItem | None:
while True:
enc = await self.underlying_session.pop_item()
if not enc:
return None
item = self._unwrap(enc)
if item is not None:
return item
async def clear_session(self) -> None:
await self.underlying_session.clear_session()
|
__init__
__init__(
session_id: str,
underlying_session: SessionABC,
encryption_key: str,
ttl: int = 600,
)
Parameters:
Name |
Type |
Description |
Default |
session_id
|
str
|
|
required
|
underlying_session
|
SessionABC
|
The real session store (e.g. SQLiteSession, SQLAlchemySession)
|
required
|
encryption_key
|
str
|
Master key (Fernet key or raw secret)
|
required
|
ttl
|
int
|
Token time-to-live in seconds (default 10 min)
|
600
|
Source code in src/agents/extensions/memory/encrypt_session.py
| def __init__(
self,
session_id: str,
underlying_session: SessionABC,
encryption_key: str,
ttl: int = 600,
):
"""
Args:
session_id: ID for this session
underlying_session: The real session store (e.g. SQLiteSession, SQLAlchemySession)
encryption_key: Master key (Fernet key or raw secret)
ttl: Token time-to-live in seconds (default 10 min)
"""
self.session_id = session_id
self.underlying_session = underlying_session
self.ttl = ttl
master = _ensure_fernet_key_bytes(encryption_key)
self.cipher = _derive_session_fernet_key(master, session_id)
self._kid = "hkdf-v1"
self._ver = 1
|