class RemoteSnapshot(SnapshotBase):
type: Literal["remote"] = "remote"
client_dependency_key: str
async def persist(self, data: io.IOBase, *, dependencies: Dependencies | None = None) -> None:
try:
upload = await self._require_client_method("upload", dependencies)
await _maybe_await(upload(self.id, data))
except Exception as e:
raise SnapshotPersistError(
snapshot_id=self.id,
path=self._remote_path(),
cause=e,
) from e
async def restore(self, *, dependencies: Dependencies | None = None) -> io.IOBase:
try:
download = await self._require_client_method("download", dependencies)
restored = await _maybe_await(download(self.id))
except Exception as e:
raise SnapshotRestoreError(
snapshot_id=self.id,
path=self._remote_path(),
cause=e,
) from e
if not isinstance(restored, io.IOBase):
raise SnapshotRestoreError(
snapshot_id=self.id,
path=self._remote_path(),
cause=TypeError("Remote snapshot client download() must return an IOBase stream"),
)
return restored
async def restorable(self, *, dependencies: Dependencies | None = None) -> bool:
check = await self._require_client_method("exists", dependencies)
result = await _maybe_await(check(self.id))
return bool(result)
async def _require_client_method(
self, method_name: str, dependencies: Dependencies | None
) -> Callable[..., object]:
if dependencies is None:
raise RuntimeError(
f"RemoteSnapshot(id={self.id!r}) requires session dependencies to resolve "
f"remote client `{self.client_dependency_key}`"
)
client = await dependencies.require(self.client_dependency_key, consumer="RemoteSnapshot")
method = getattr(client, method_name, None)
if not callable(method):
raise TypeError(
f"Remote snapshot client must implement `{method_name}(snapshot_id, ...)`"
)
return cast(Callable[..., object], method)
def _remote_path(self) -> Path:
return Path(f"<remote:{self.client_dependency_key}>")