コンテンツにスキップ

Workspace entries

Dir

Bases: BaseEntry

Source code in src/agents/sandbox/entries/artifacts.py
class Dir(BaseEntry):
    type: Literal["dir"] = "dir"
    is_dir: bool = True
    children: dict[str | Path, BaseEntry] = Field(default_factory=dict)

    @field_validator("children", mode="before")
    @classmethod
    def _parse_children(cls, value: object) -> dict[str | Path, BaseEntry]:
        if value is None:
            return {}
        if not isinstance(value, Mapping):
            raise TypeError(f"Artifact mapping must be a mapping, got {type(value).__name__}")
        return {key: BaseEntry.parse(entry) for key, entry in value.items()}

    @field_serializer("children", when_used="json")
    def _serialize_children(self, children: Mapping[str | Path, BaseEntry]) -> dict[str, object]:
        out: dict[str, object] = {}
        for key, entry in children.items():
            key_str = key.as_posix() if isinstance(key, Path) else str(key)
            out[key_str] = entry.model_dump(mode="json")
        return out

    def model_post_init(self, context: object, /) -> None:
        _ = context
        self.permissions.directory = True

    async def apply(
        self,
        session: BaseSandboxSession,
        dest: Path,
        base_dir: Path,
    ) -> list[MaterializedFile]:
        await session.mkdir(dest, parents=True)
        await self._apply_metadata(session, dest)
        return await session._apply_entry_batch(
            [(dest / Path(rel_dest), artifact) for rel_dest, artifact in self.children.items()],
            base_dir=base_dir,
        )

File

Bases: BaseEntry

Source code in src/agents/sandbox/entries/artifacts.py
class File(BaseEntry):
    type: Literal["file"] = "file"
    content: bytes

    async def apply(
        self,
        session: BaseSandboxSession,
        dest: Path,
        base_dir: Path,
    ) -> list[MaterializedFile]:
        await session.write(dest, io.BytesIO(self.content))
        await self._apply_metadata(session, dest)
        return []

GitRepo

Bases: BaseEntry

Source code in src/agents/sandbox/entries/artifacts.py
class GitRepo(BaseEntry):
    type: Literal["git_repo"] = "git_repo"
    is_dir: bool = True
    host: str = "github.com"
    repo: str  # "owner/name" (or any host-specific path)
    ref: str  # tag/branch/sha
    subpath: str | None = None

    def model_post_init(self, context: object, /) -> None:
        _ = context
        self.permissions.directory = True

    async def apply(
        self,
        session: BaseSandboxSession,
        dest: Path,
        base_dir: Path,
    ) -> list[MaterializedFile]:
        # Ensure git exists in the container.
        git_check = await session.exec("command -v git >/dev/null 2>&1")
        if not git_check.ok():
            context: dict[str, object] = {"repo": self.repo, "ref": self.ref}
            image = getattr(session.state, "image", None)
            if image is not None:
                context["image"] = image
            raise GitMissingInImageError(context=context)

        tmp_dir = f"/tmp/sandbox-git-{session.state.session_id.hex}-{uuid.uuid4().hex}"
        url = f"https://{self.host}/{self.repo}.git"

        _ = await session.exec("rm", "-rf", "--", tmp_dir, shell=False)
        clone_error: ExecResult | None = None
        if self._looks_like_commit_ref(self.ref):
            clone = await self._fetch_commit_ref(session=session, url=url, tmp_dir=tmp_dir)
            if not clone.ok():
                clone_error = clone
                _ = await session.exec("rm", "-rf", "--", tmp_dir, shell=False)
                clone = await self._clone_named_ref(session=session, url=url, tmp_dir=tmp_dir)
        else:
            clone = await self._clone_named_ref(session=session, url=url, tmp_dir=tmp_dir)
        if not clone.ok():
            if clone_error is not None:
                clone = clone_error
            raise GitCloneError(
                url=url,
                ref=self.ref,
                stderr=clone.stderr.decode("utf-8", errors="replace"),
                context={"repo": self.repo, "subpath": self.subpath},
            )

        git_src_root: str = tmp_dir
        if self.subpath is not None:
            git_src_root = f"{tmp_dir}/{self.subpath.lstrip('/')}"

        # Copy into destination in the container.
        await session.mkdir(dest, parents=True)
        copy = await session.exec("cp", "-R", "--", f"{git_src_root}/.", f"{dest}/", shell=False)
        if not copy.ok():
            raise GitCopyError(
                src_root=git_src_root,
                dest=dest,
                stderr=copy.stderr.decode("utf-8", errors="replace"),
                context={"repo": self.repo, "ref": self.ref, "subpath": self.subpath},
            )

        _ = await session.exec("rm", "-rf", "--", tmp_dir, shell=False)
        await self._apply_metadata(session, dest)

        # Receipt: leave checksums empty for now. (Computing them would
        # require reading each file back out of the container.)
        return []

    @staticmethod
    def _looks_like_commit_ref(ref: str) -> bool:
        return _COMMIT_REF_RE.fullmatch(ref) is not None

    async def _clone_named_ref(
        self,
        *,
        session: BaseSandboxSession,
        url: str,
        tmp_dir: str,
    ) -> ExecResult:
        return await session.exec(
            "git",
            "clone",
            "--depth",
            "1",
            "--no-tags",
            "--branch",
            self.ref,
            url,
            tmp_dir,
            shell=False,
        )

    async def _fetch_commit_ref(
        self,
        *,
        session: BaseSandboxSession,
        url: str,
        tmp_dir: str,
    ) -> ExecResult:
        init = await session.exec("git", "init", tmp_dir, shell=False)
        if not init.ok():
            return init

        remote_add = await session.exec(
            "git",
            "-C",
            tmp_dir,
            "remote",
            "add",
            "origin",
            url,
            shell=False,
        )
        if not remote_add.ok():
            return remote_add

        fetch = await session.exec(
            "git",
            "-C",
            tmp_dir,
            "fetch",
            "--depth",
            "1",
            "--no-tags",
            "origin",
            self.ref,
            shell=False,
        )
        if not fetch.ok():
            return fetch

        return await session.exec(
            "git",
            "-C",
            tmp_dir,
            "checkout",
            "--detach",
            "FETCH_HEAD",
            shell=False,
        )

LocalDir

Bases: BaseEntry

Source code in src/agents/sandbox/entries/artifacts.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
class LocalDir(BaseEntry):
    type: Literal["local_dir"] = "local_dir"
    is_dir: bool = True
    src: Path | None = Field(default=None)

    def model_post_init(self, context: object, /) -> None:
        _ = context
        self.permissions.directory = True

    async def apply(
        self,
        session: BaseSandboxSession,
        dest: Path,
        base_dir: Path,
        *,
        user: str | User | None = None,
    ) -> list[MaterializedFile]:
        files: list[MaterializedFile] = []
        if self.src:
            src_root = self._resolve_local_dir_src_root(base_dir)
            # Minimal v1: copy all files recursively.
            try:
                await session.mkdir(dest, parents=True, user=user)
                files = []
                local_files = self._list_local_dir_files(base_dir=base_dir, src_root=src_root)

                def _make_copy_task(child: Path) -> Callable[[], Awaitable[MaterializedFile]]:
                    async def _copy() -> MaterializedFile:
                        return await self._copy_local_dir_file(
                            base_dir=base_dir,
                            session=session,
                            src_root=src_root,
                            src=src_root / child,
                            dest_root=dest,
                            user=user,
                        )

                    return _copy

                copied_files = await gather_in_order(
                    [_make_copy_task(child) for child in local_files],
                    max_concurrency=session._max_local_dir_file_concurrency,
                )
                files.extend(copied_files)
            except OSError as e:
                raise LocalDirReadError(src=src_root, cause=e) from e
            if user is None:
                await self._apply_metadata(session, dest)
        else:
            await session.mkdir(dest, parents=True, user=user)
            if user is None:
                await self._apply_metadata(session, dest)
        return files

    def _resolve_local_dir_src_root(self, base_dir: Path) -> Path:
        assert self.src is not None
        src_input = base_dir / self.src
        for current in self._iter_local_dir_source_paths(base_dir):
            try:
                current_stat = current.lstat()
            except FileNotFoundError:
                raise LocalDirReadError(
                    src=src_input if src_input.is_absolute() else src_input.absolute(),
                    context={"reason": "path_not_found"},
                ) from None
            except OSError as e:
                raise LocalDirReadError(src=current, cause=e) from e
            if stat.S_ISLNK(current_stat.st_mode):
                raise LocalDirReadError(
                    src=src_input,
                    context={
                        "reason": "symlink_not_supported",
                        "child": self._local_dir_source_child_label(base_dir, current),
                    },
                )
        return src_input if src_input.is_absolute() else src_input.absolute()

    def _iter_local_dir_source_paths(self, base_dir: Path) -> list[Path]:
        assert self.src is not None
        if self.src.is_absolute():
            current = Path(self.src.anchor)
            parts = self.src.parts[1:]
        else:
            current = base_dir
            parts = self.src.parts

        paths: list[Path] = []
        if not parts:
            paths.append(current)
            return paths

        for part in parts:
            current = current / part
            paths.append(current)
        return paths

    def _local_dir_source_child_label(self, base_dir: Path, current: Path) -> str:
        try:
            return current.relative_to(base_dir).as_posix()
        except ValueError:
            return current.as_posix()

    def _list_local_dir_files(self, *, base_dir: Path, src_root: Path) -> list[Path]:
        if _OPEN_SUPPORTS_DIR_FD and _HAS_O_DIRECTORY:
            return self._list_local_dir_files_pinned(base_dir=base_dir, src_root=src_root)

        local_files: list[Path] = []
        for child in src_root.rglob("*"):
            child_stat = child.lstat()
            if stat.S_ISLNK(child_stat.st_mode):
                raise LocalDirReadError(
                    src=src_root,
                    context={
                        "reason": "symlink_not_supported",
                        "child": child.relative_to(src_root).as_posix(),
                    },
                )
            if stat.S_ISREG(child_stat.st_mode):
                local_files.append(child.relative_to(src_root))
        return local_files

    def _list_local_dir_files_pinned(self, *, base_dir: Path, src_root: Path) -> list[Path]:
        root_fd: int | None = None
        try:
            root_fd = self._open_local_dir_src_root_fd(base_dir=base_dir, src_root=src_root)
            return self._list_local_dir_files_from_dir_fd(src_root=src_root, dir_fd=root_fd)
        finally:
            if root_fd is not None:
                os.close(root_fd)

    def _list_local_dir_files_from_dir_fd(
        self,
        *,
        src_root: Path,
        dir_fd: int,
        rel_dir: Path = Path(),
    ) -> list[Path]:
        dir_flags = (
            os.O_RDONLY
            | getattr(os, "O_BINARY", 0)
            | getattr(os, "O_DIRECTORY", 0)
            | getattr(os, "O_NOFOLLOW", 0)
        )
        local_files: list[Path] = []
        for entry in os.scandir(dir_fd):
            rel_child = rel_dir / entry.name if rel_dir.parts else Path(entry.name)
            try:
                entry_stat = entry.stat(follow_symlinks=False)
            except FileNotFoundError:
                raise LocalDirReadError(
                    src=src_root,
                    context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
                ) from None
            except OSError as e:
                raise LocalDirReadError(src=src_root, cause=e) from e
            if stat.S_ISLNK(entry_stat.st_mode):
                raise LocalDirReadError(
                    src=src_root,
                    context={"reason": "symlink_not_supported", "child": rel_child.as_posix()},
                )
            if stat.S_ISREG(entry_stat.st_mode):
                local_files.append(rel_child)
                continue
            if not stat.S_ISDIR(entry_stat.st_mode):
                continue

            child_fd: int | None = None
            try:
                child_fd = os.open(entry.name, dir_flags, dir_fd=dir_fd)
                child_stat = os.fstat(child_fd)
                if not stat.S_ISDIR(child_stat.st_mode):
                    raise LocalDirReadError(
                        src=src_root,
                        context={
                            "reason": "path_changed_during_copy",
                            "child": rel_child.as_posix(),
                        },
                    )
                local_files.extend(
                    self._list_local_dir_files_from_dir_fd(
                        src_root=src_root,
                        dir_fd=child_fd,
                        rel_dir=rel_child,
                    )
                )
            except FileNotFoundError:
                raise LocalDirReadError(
                    src=src_root,
                    context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
                ) from None
            except OSError as e:
                raise self._local_dir_open_error(
                    src_root=src_root,
                    parent_fd=dir_fd,
                    entry_name=entry.name,
                    rel_child=rel_child,
                    expect_dir=True,
                    error=e,
                ) from e
            finally:
                if child_fd is not None:
                    os.close(child_fd)
        return local_files

    async def _copy_local_dir_file(
        self,
        *,
        base_dir: Path,
        session: BaseSandboxSession,
        src_root: Path,
        src: Path,
        dest_root: Path,
        user: str | User | None = None,
    ) -> MaterializedFile:
        rel_child = src.relative_to(src_root)
        child_dest = dest_root / rel_child
        fd: int | None = None
        try:
            fd = self._open_local_dir_file_for_copy(
                base_dir=base_dir,
                src_root=src_root,
                rel_child=rel_child,
            )
            with os.fdopen(fd, "rb") as f:
                fd = None
                checksum = _sha256_handle(f)
                f.seek(0)
                await session.mkdir(child_dest.parent, parents=True, user=user)
                await session.write(child_dest, f, user=user)
        except OSError as e:
            raise LocalFileReadError(src=src, cause=e) from e
        finally:
            if fd is not None:
                os.close(fd)
        return MaterializedFile(path=child_dest, sha256=checksum)

    def _open_local_dir_file_for_copy(
        self, *, base_dir: Path, src_root: Path, rel_child: Path
    ) -> int:
        if not _OPEN_SUPPORTS_DIR_FD or not _HAS_O_DIRECTORY:
            return self._open_local_dir_file_for_copy_fallback(
                src_root=src_root,
                rel_child=rel_child,
            )

        dir_flags = (
            os.O_RDONLY
            | getattr(os, "O_BINARY", 0)
            | getattr(os, "O_DIRECTORY", 0)
            | getattr(os, "O_NOFOLLOW", 0)
        )
        file_flags = os.O_RDONLY | getattr(os, "O_BINARY", 0) | getattr(os, "O_NOFOLLOW", 0)
        dir_fds: list[int] = []
        current_rel = Path()
        try:
            current_fd = self._open_local_dir_src_root_fd(base_dir=base_dir, src_root=src_root)
            dir_fds.append(current_fd)
            for part in rel_child.parts[:-1]:
                current_rel = current_rel / part if current_rel.parts else Path(part)
                try:
                    next_fd = os.open(part, dir_flags, dir_fd=current_fd)
                except OSError as e:
                    raise self._local_dir_open_error(
                        src_root=src_root,
                        parent_fd=current_fd,
                        entry_name=part,
                        rel_child=current_rel,
                        expect_dir=True,
                        error=e,
                    ) from e
                next_stat = os.fstat(next_fd)
                if not stat.S_ISDIR(next_stat.st_mode):
                    raise LocalDirReadError(
                        src=src_root,
                        context={
                            "reason": "path_changed_during_copy",
                            "child": rel_child.as_posix(),
                        },
                    )
                dir_fds.append(next_fd)
                current_fd = next_fd

            try:
                leaf_fd = os.open(rel_child.name, file_flags, dir_fd=current_fd)
            except OSError as e:
                raise self._local_dir_open_error(
                    src_root=src_root,
                    parent_fd=current_fd,
                    entry_name=rel_child.name,
                    rel_child=rel_child,
                    expect_dir=False,
                    error=e,
                ) from e
            leaf_stat = os.fstat(leaf_fd)
            if not stat.S_ISREG(leaf_stat.st_mode):
                os.close(leaf_fd)
                raise LocalDirReadError(
                    src=src_root,
                    context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
                )
            return leaf_fd
        except FileNotFoundError:
            raise LocalDirReadError(
                src=src_root,
                context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
            ) from None
        except OSError as e:
            if e.errno == errno.ELOOP:
                raise LocalDirReadError(
                    src=src_root,
                    context={"reason": "symlink_not_supported", "child": rel_child.as_posix()},
                ) from e
            raise LocalFileReadError(src=src_root / rel_child, cause=e) from e
        finally:
            for dir_fd in reversed(dir_fds):
                os.close(dir_fd)

    def _open_local_dir_src_root_fd(self, *, base_dir: Path, src_root: Path) -> int:
        assert self.src is not None

        dir_flags = (
            os.O_RDONLY
            | getattr(os, "O_BINARY", 0)
            | getattr(os, "O_DIRECTORY", 0)
            | getattr(os, "O_NOFOLLOW", 0)
        )
        dir_fds: list[int] = []
        current_rel = Path()
        if self.src.is_absolute():
            current_path = Path(self.src.anchor)
            parts = self.src.parts[1:]
        else:
            current_path = base_dir
            parts = self.src.parts

        try:
            current_fd = os.open(current_path, dir_flags)
            dir_fds.append(current_fd)
            for part in parts:
                current_rel = current_rel / part if current_rel.parts else Path(part)
                try:
                    next_fd = os.open(part, dir_flags, dir_fd=current_fd)
                except OSError as e:
                    raise self._local_dir_open_error(
                        src_root=src_root,
                        parent_fd=current_fd,
                        entry_name=part,
                        rel_child=current_rel,
                        expect_dir=True,
                        error=e,
                    ) from e
                next_stat = os.fstat(next_fd)
                if not stat.S_ISDIR(next_stat.st_mode):
                    raise LocalDirReadError(
                        src=src_root,
                        context={
                            "reason": "path_changed_during_copy",
                            "child": current_rel.as_posix(),
                        },
                    )
                dir_fds.append(next_fd)
                current_fd = next_fd
            return dir_fds.pop()
        except FileNotFoundError:
            raise LocalDirReadError(
                src=src_root, context={"reason": "path_changed_during_copy"}
            ) from None
        except OSError as e:
            raise LocalDirReadError(src=src_root, cause=e) from e
        finally:
            for dir_fd in reversed(dir_fds):
                os.close(dir_fd)

    def _local_dir_open_error(
        self,
        *,
        src_root: Path,
        parent_fd: int,
        entry_name: str,
        rel_child: Path,
        expect_dir: bool,
        error: OSError,
    ) -> LocalDirReadError:
        try:
            entry_stat = os.stat(entry_name, dir_fd=parent_fd, follow_symlinks=False)
        except (AttributeError, NotImplementedError, TypeError):
            entry_stat = None
        except FileNotFoundError:
            return LocalDirReadError(
                src=src_root,
                context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
            )
        except OSError:
            entry_stat = None

        if entry_stat is not None and stat.S_ISLNK(entry_stat.st_mode):
            return LocalDirReadError(
                src=src_root,
                context={"reason": "symlink_not_supported", "child": rel_child.as_posix()},
            )
        if entry_stat is not None and (
            (expect_dir and not stat.S_ISDIR(entry_stat.st_mode))
            or (not expect_dir and not stat.S_ISREG(entry_stat.st_mode))
        ):
            return LocalDirReadError(
                src=src_root,
                context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
            )
        if error.errno == errno.ELOOP:
            return LocalDirReadError(
                src=src_root,
                context={"reason": "symlink_not_supported", "child": rel_child.as_posix()},
            )
        return LocalDirReadError(src=src_root, cause=error)

    def _open_local_dir_file_for_copy_fallback(self, *, src_root: Path, rel_child: Path) -> int:
        src = src_root / rel_child
        try:
            src_stat = src.lstat()
        except FileNotFoundError:
            raise LocalDirReadError(
                src=src_root,
                context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
            ) from None
        except OSError as e:
            raise LocalDirReadError(src=src_root, cause=e) from e
        if stat.S_ISLNK(src_stat.st_mode):
            raise LocalDirReadError(
                src=src_root,
                context={"reason": "symlink_not_supported", "child": rel_child.as_posix()},
            )
        if not stat.S_ISREG(src_stat.st_mode):
            raise LocalDirReadError(
                src=src_root,
                context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
            )

        file_flags = os.O_RDONLY | getattr(os, "O_BINARY", 0) | getattr(os, "O_NOFOLLOW", 0)
        try:
            leaf_fd = os.open(src, file_flags)
            leaf_stat = os.fstat(leaf_fd)
            if not stat.S_ISREG(leaf_stat.st_mode) or not os.path.samestat(src_stat, leaf_stat):
                os.close(leaf_fd)
                raise LocalDirReadError(
                    src=src_root,
                    context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
                )
            return leaf_fd
        except FileNotFoundError:
            raise LocalDirReadError(
                src=src_root,
                context={"reason": "path_changed_during_copy", "child": rel_child.as_posix()},
            ) from None
        except OSError as e:
            if e.errno == errno.ELOOP:
                raise LocalDirReadError(
                    src=src_root,
                    context={"reason": "symlink_not_supported", "child": rel_child.as_posix()},
                ) from e
            raise LocalFileReadError(src=src, cause=e) from e

LocalFile

Bases: BaseEntry

Source code in src/agents/sandbox/entries/artifacts.py
class LocalFile(BaseEntry):
    type: Literal["local_file"] = "local_file"
    src: Path

    async def apply(
        self,
        session: BaseSandboxSession,
        dest: Path,
        base_dir: Path,
    ) -> list[MaterializedFile]:
        src = (base_dir / self.src).resolve()
        try:
            checksum = sha256_file(src)
        except OSError as e:
            raise LocalChecksumError(src=src, cause=e) from e
        await session.mkdir(Path(dest).parent, parents=True)
        try:
            with src.open("rb") as f:
                await session.write(dest, f)
        except OSError as e:
            raise LocalFileReadError(src=src, cause=e) from e
        await self._apply_metadata(session, dest)
        return [MaterializedFile(path=dest, sha256=checksum)]

Mount

Bases: BaseEntry

A manifest entry that exposes external storage inside the sandbox workspace.

Mount holds strategy-independent mount metadata and delegates lifecycle behavior to mount_strategy. Provider subclasses describe what to mount; the strategy describes how the backend should make it available.

Source code in src/agents/sandbox/entries/mounts/base.py
class Mount(BaseEntry):
    """A manifest entry that exposes external storage inside the sandbox workspace.

    `Mount` holds strategy-independent mount metadata and delegates lifecycle behavior to
    `mount_strategy`. Provider subclasses describe what to mount; the strategy describes how the
    backend should make it available.
    """

    is_dir: bool = True
    _abstract_entry_base: ClassVar[bool] = True
    mount_path: Path | None = None
    # Mounts are runtime-attached external filesystems, not durable workspace state, so
    # snapshots must always treat them as ephemeral.
    ephemeral: bool = True
    read_only: bool = Field(default=True)
    mount_strategy: MountStrategy

    @field_validator("mount_strategy", mode="before")
    @classmethod
    def _parse_mount_strategy(cls, value: object) -> MountStrategyBase:
        return MountStrategyBase.parse(value)

    def model_post_init(self, context: object, /) -> None:
        """Normalize mount metadata and validate that the active strategy fits this mount type."""

        _ = context

        default_permissions = Permissions(
            owner=FileMode.ALL,
            group=FileMode.READ | FileMode.EXEC,
            other=FileMode.READ | FileMode.EXEC,
        )
        if (
            self.permissions.owner != default_permissions.owner
            or self.permissions.group != default_permissions.group
            or self.permissions.other != default_permissions.other
        ):
            warnings.warn(
                "Mount permissions are not enforced. "
                "Please configure access in the cloud provider instead; "
                "mount-level permissions can be unreliable.",
                stacklevel=2,
            )
            self.permissions.owner = default_permissions.owner
            self.permissions.group = default_permissions.group
            self.permissions.other = default_permissions.other
        self.permissions.directory = True
        if (
            not self.supported_in_container_patterns()
            and not self.supported_docker_volume_drivers()
        ):
            raise MountConfigError(
                message="mount type must support at least one mount strategy",
                context={"mount_type": self.type},
            )
        self.mount_strategy.validate_mount(self)

    def in_container_adapter(self) -> InContainerMountAdapter:
        """Return the strategy adapter for in-container mount lifecycle.

        Mount subclasses that do not support in-container mounts inherit this default unsupported
        implementation.
        """

        raise MountConfigError(
            message="in-container mounts are not supported for this mount type",
            context={"mount_type": self.type},
        )

    def docker_volume_adapter(self) -> DockerVolumeMountAdapter:
        """Return the strategy adapter for Docker volume lifecycle."""

        return DockerVolumeMountAdapter(self)

    async def apply(
        self,
        session: BaseSandboxSession,
        dest: Path,
        base_dir: Path,
    ) -> list[MaterializedFile]:
        """Activate this mount for a manifest application pass.

        In-container strategies run a live mount command here. Docker-volume strategies are
        intentionally no-ops because the backend attaches them before the session starts.
        """

        return await self.mount_strategy.activate(self, session, dest, base_dir)

    async def unmount(
        self,
        session: BaseSandboxSession,
        dest: Path,
        base_dir: Path,
    ) -> None:
        """Deactivate this mount for manifest teardown."""

        await self.mount_strategy.deactivate(self, session, dest, base_dir)

    async def build_in_container_mount_config(
        self,
        session: BaseSandboxSession,
        pattern: MountPattern,
        *,
        include_config_text: bool,
    ) -> MountPatternConfig | None:
        """Return pattern runtime config for provider-backed in-container mounts."""

        _ = (session, pattern, include_config_text)
        return None

    def supported_in_container_patterns(self) -> tuple[builtins.type[MountPatternBase], ...]:
        """Return the `MountPattern` classes accepted by `InContainerMountStrategy`."""

        return ()

    def supported_docker_volume_drivers(self) -> frozenset[str]:
        """Return Docker volume driver names accepted by `DockerVolumeMountStrategy`."""

        return frozenset()

    def build_docker_volume_driver_config(
        self,
        strategy: DockerVolumeMountStrategy,
    ) -> tuple[str, dict[str, str], bool]:
        """Build the Docker volume driver tuple for Docker-volume mounts.

        Mount subclasses that do not support Docker volumes inherit this default unsupported
        implementation.
        """

        _ = strategy
        raise MountConfigError(
            message="docker-volume mounts are not supported for this mount type",
            context={"mount_type": self.type},
        )

    def _resolve_mount_path(
        self,
        session: BaseSandboxSession,
        dest: Path,
    ) -> Path:
        """Resolve the concrete path where this mount should appear in the active workspace."""

        manifest_root = Path(getattr(session.state.manifest, "root", "/"))
        return self._resolve_mount_path_for_root(manifest_root, dest)

    def _resolve_mount_path_for_root(
        self,
        manifest_root: Path,
        dest: Path,
    ) -> Path:
        """Resolve a mount path against an explicit manifest root.

        This helper is used both by live sessions and by container-creation code that only has the
        manifest root, not a started session.
        """

        if self.mount_path is not None:
            mount_path = Path(self.mount_path)
            if mount_path.is_absolute():
                return mount_path
            # Relative explicit mount paths are interpreted inside the active workspace root so a
            # manifest can stay portable across backends with different concrete root prefixes.
            return manifest_root / mount_path

        if dest.is_absolute():
            try:
                rel_dest = dest.relative_to(manifest_root)
            except ValueError:
                return dest
            # `dest` may already be normalized to an absolute workspace path; re-anchor it to the
            # current manifest root instead of nesting the root twice.
            return manifest_root / rel_dest
        return manifest_root / dest

model_post_init

model_post_init(context: object) -> None

Normalize mount metadata and validate that the active strategy fits this mount type.

Source code in src/agents/sandbox/entries/mounts/base.py
def model_post_init(self, context: object, /) -> None:
    """Normalize mount metadata and validate that the active strategy fits this mount type."""

    _ = context

    default_permissions = Permissions(
        owner=FileMode.ALL,
        group=FileMode.READ | FileMode.EXEC,
        other=FileMode.READ | FileMode.EXEC,
    )
    if (
        self.permissions.owner != default_permissions.owner
        or self.permissions.group != default_permissions.group
        or self.permissions.other != default_permissions.other
    ):
        warnings.warn(
            "Mount permissions are not enforced. "
            "Please configure access in the cloud provider instead; "
            "mount-level permissions can be unreliable.",
            stacklevel=2,
        )
        self.permissions.owner = default_permissions.owner
        self.permissions.group = default_permissions.group
        self.permissions.other = default_permissions.other
    self.permissions.directory = True
    if (
        not self.supported_in_container_patterns()
        and not self.supported_docker_volume_drivers()
    ):
        raise MountConfigError(
            message="mount type must support at least one mount strategy",
            context={"mount_type": self.type},
        )
    self.mount_strategy.validate_mount(self)

in_container_adapter

in_container_adapter() -> InContainerMountAdapter

Return the strategy adapter for in-container mount lifecycle.

Mount subclasses that do not support in-container mounts inherit this default unsupported implementation.

Source code in src/agents/sandbox/entries/mounts/base.py
def in_container_adapter(self) -> InContainerMountAdapter:
    """Return the strategy adapter for in-container mount lifecycle.

    Mount subclasses that do not support in-container mounts inherit this default unsupported
    implementation.
    """

    raise MountConfigError(
        message="in-container mounts are not supported for this mount type",
        context={"mount_type": self.type},
    )

docker_volume_adapter

docker_volume_adapter() -> DockerVolumeMountAdapter

Return the strategy adapter for Docker volume lifecycle.

Source code in src/agents/sandbox/entries/mounts/base.py
def docker_volume_adapter(self) -> DockerVolumeMountAdapter:
    """Return the strategy adapter for Docker volume lifecycle."""

    return DockerVolumeMountAdapter(self)

apply async

apply(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> list[MaterializedFile]

Activate this mount for a manifest application pass.

In-container strategies run a live mount command here. Docker-volume strategies are intentionally no-ops because the backend attaches them before the session starts.

Source code in src/agents/sandbox/entries/mounts/base.py
async def apply(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> list[MaterializedFile]:
    """Activate this mount for a manifest application pass.

    In-container strategies run a live mount command here. Docker-volume strategies are
    intentionally no-ops because the backend attaches them before the session starts.
    """

    return await self.mount_strategy.activate(self, session, dest, base_dir)

unmount async

unmount(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> None

Deactivate this mount for manifest teardown.

Source code in src/agents/sandbox/entries/mounts/base.py
async def unmount(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> None:
    """Deactivate this mount for manifest teardown."""

    await self.mount_strategy.deactivate(self, session, dest, base_dir)

build_in_container_mount_config async

build_in_container_mount_config(
    session: BaseSandboxSession,
    pattern: MountPattern,
    *,
    include_config_text: bool,
) -> MountPatternConfig | None

Return pattern runtime config for provider-backed in-container mounts.

Source code in src/agents/sandbox/entries/mounts/base.py
async def build_in_container_mount_config(
    self,
    session: BaseSandboxSession,
    pattern: MountPattern,
    *,
    include_config_text: bool,
) -> MountPatternConfig | None:
    """Return pattern runtime config for provider-backed in-container mounts."""

    _ = (session, pattern, include_config_text)
    return None

supported_in_container_patterns

supported_in_container_patterns() -> tuple[
    type[MountPatternBase], ...
]

Return the MountPattern classes accepted by InContainerMountStrategy.

Source code in src/agents/sandbox/entries/mounts/base.py
def supported_in_container_patterns(self) -> tuple[builtins.type[MountPatternBase], ...]:
    """Return the `MountPattern` classes accepted by `InContainerMountStrategy`."""

    return ()

supported_docker_volume_drivers

supported_docker_volume_drivers() -> frozenset[str]

Return Docker volume driver names accepted by DockerVolumeMountStrategy.

Source code in src/agents/sandbox/entries/mounts/base.py
def supported_docker_volume_drivers(self) -> frozenset[str]:
    """Return Docker volume driver names accepted by `DockerVolumeMountStrategy`."""

    return frozenset()

build_docker_volume_driver_config

build_docker_volume_driver_config(
    strategy: DockerVolumeMountStrategy,
) -> tuple[str, dict[str, str], bool]

Build the Docker volume driver tuple for Docker-volume mounts.

Mount subclasses that do not support Docker volumes inherit this default unsupported implementation.

Source code in src/agents/sandbox/entries/mounts/base.py
def build_docker_volume_driver_config(
    self,
    strategy: DockerVolumeMountStrategy,
) -> tuple[str, dict[str, str], bool]:
    """Build the Docker volume driver tuple for Docker-volume mounts.

    Mount subclasses that do not support Docker volumes inherit this default unsupported
    implementation.
    """

    _ = strategy
    raise MountConfigError(
        message="docker-volume mounts are not supported for this mount type",
        context={"mount_type": self.type},
    )

AzureBlobMount

Bases: _ConfiguredMount

Source code in src/agents/sandbox/entries/mounts/providers/azure_blob.py
class AzureBlobMount(_ConfiguredMount):
    type: Literal["azure_blob_mount"] = "azure_blob_mount"
    account: str  # AZURE_STORAGE_ACCOUNT
    container: str  # AZURE_STORAGE_CONTAINER
    endpoint: str | None = None
    identity_client_id: str | None = None  # AZURE_CLIENT_ID
    account_key: str | None = None  # AZURE_STORAGE_ACCOUNT_KEY

    def supported_in_container_patterns(self) -> tuple[builtins.type[MountPattern], ...]:
        return (RcloneMountPattern, FuseMountPattern)

    def supported_docker_volume_drivers(self) -> frozenset[str]:
        return frozenset({"rclone"})

    def build_docker_volume_driver_config(
        self,
        strategy: DockerVolumeMountStrategy,
    ) -> tuple[str, dict[str, str], bool]:
        options = {
            "type": "azureblob",
            "path": self.container,
            "azureblob-account": self.account,
        }
        if self.endpoint is not None:
            options["azureblob-endpoint"] = self.endpoint
        if self.identity_client_id is not None:
            options["azureblob-msi-client-id"] = self.identity_client_id
        if self.account_key is not None:
            options["azureblob-key"] = self.account_key
        return strategy.driver, options | strategy.driver_options, self.read_only

    async def build_in_container_mount_config(
        self,
        session: BaseSandboxSession,
        pattern: MountPattern,
        *,
        include_config_text: bool,
    ) -> MountPatternConfig:
        if isinstance(pattern, RcloneMountPattern):
            return await self._build_rclone_config(
                session=session,
                pattern=pattern,
                remote_kind="azureblob",
                remote_path=self.container,
                required_lines=self._rclone_required_lines(
                    pattern.resolve_remote_name(
                        session_id=self._require_session_id_hex(session, self.type),
                        remote_kind="azureblob",
                        mount_type=self.type,
                    )
                ),
                include_config_text=include_config_text,
            )
        if isinstance(pattern, FuseMountPattern):
            return FuseMountConfig(
                account=self.account,
                container=self.container,
                endpoint=self.endpoint,
                identity_client_id=self.identity_client_id,
                account_key=self.account_key,
                mount_type=self.type,
                read_only=self.read_only,
            )
        raise MountConfigError(
            message="invalid mount_pattern type",
            context={"type": self.type},
        )

    def _rclone_required_lines(self, remote_name: str) -> list[str]:
        lines = [
            f"[{remote_name}]",
            "type = azureblob",
            f"account = {self.account}",
        ]
        if self.endpoint:
            lines.append(f"endpoint = {self.endpoint}")
        if self.account_key:
            lines.append(f"key = {self.account_key}")
        else:
            lines.append("use_msi = true")
            if self.identity_client_id:
                lines.append(f"msi_client_id = {self.identity_client_id}")
        return lines

apply async

apply(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> list[MaterializedFile]

Activate this mount for a manifest application pass.

In-container strategies run a live mount command here. Docker-volume strategies are intentionally no-ops because the backend attaches them before the session starts.

Source code in src/agents/sandbox/entries/mounts/base.py
async def apply(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> list[MaterializedFile]:
    """Activate this mount for a manifest application pass.

    In-container strategies run a live mount command here. Docker-volume strategies are
    intentionally no-ops because the backend attaches them before the session starts.
    """

    return await self.mount_strategy.activate(self, session, dest, base_dir)

model_post_init

model_post_init(context: object) -> None

Normalize mount metadata and validate that the active strategy fits this mount type.

Source code in src/agents/sandbox/entries/mounts/base.py
def model_post_init(self, context: object, /) -> None:
    """Normalize mount metadata and validate that the active strategy fits this mount type."""

    _ = context

    default_permissions = Permissions(
        owner=FileMode.ALL,
        group=FileMode.READ | FileMode.EXEC,
        other=FileMode.READ | FileMode.EXEC,
    )
    if (
        self.permissions.owner != default_permissions.owner
        or self.permissions.group != default_permissions.group
        or self.permissions.other != default_permissions.other
    ):
        warnings.warn(
            "Mount permissions are not enforced. "
            "Please configure access in the cloud provider instead; "
            "mount-level permissions can be unreliable.",
            stacklevel=2,
        )
        self.permissions.owner = default_permissions.owner
        self.permissions.group = default_permissions.group
        self.permissions.other = default_permissions.other
    self.permissions.directory = True
    if (
        not self.supported_in_container_patterns()
        and not self.supported_docker_volume_drivers()
    ):
        raise MountConfigError(
            message="mount type must support at least one mount strategy",
            context={"mount_type": self.type},
        )
    self.mount_strategy.validate_mount(self)

in_container_adapter

in_container_adapter() -> InContainerMountAdapter

Use pattern-driven in-container behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def in_container_adapter(self) -> InContainerMountAdapter:
    """Use pattern-driven in-container behavior for built-in provider mounts."""

    return InContainerMountAdapter(self)

docker_volume_adapter

docker_volume_adapter() -> DockerVolumeMountAdapter

Use Docker volume-driver behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def docker_volume_adapter(self) -> DockerVolumeMountAdapter:
    """Use Docker volume-driver behavior for built-in provider mounts."""

    return DockerVolumeMountAdapter(self)

unmount async

unmount(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> None

Deactivate this mount for manifest teardown.

Source code in src/agents/sandbox/entries/mounts/base.py
async def unmount(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> None:
    """Deactivate this mount for manifest teardown."""

    await self.mount_strategy.deactivate(self, session, dest, base_dir)

GCSMount

Bases: _ConfiguredMount

Source code in src/agents/sandbox/entries/mounts/providers/gcs.py
class GCSMount(_ConfiguredMount):
    type: Literal["gcs_mount"] = "gcs_mount"
    bucket: str
    access_id: str | None = None
    secret_access_key: str | None = None
    prefix: str | None = None
    region: str | None = None
    endpoint_url: str | None = None
    service_account_file: str | None = None
    service_account_credentials: str | None = None
    access_token: str | None = None

    def supported_in_container_patterns(self) -> tuple[builtins.type[MountPattern], ...]:
        return (RcloneMountPattern, MountpointMountPattern)

    def supported_docker_volume_drivers(self) -> frozenset[str]:
        return frozenset({"mountpoint", "rclone"})

    def _use_s3_compatible_rclone(self) -> bool:
        """Return true when this mount has GCS HMAC credentials for rclone's S3 backend."""

        return self.access_id is not None and self.secret_access_key is not None

    def _rclone_remote_kind(self) -> str:
        if self._use_s3_compatible_rclone():
            # Keep HMAC-auth GCS mounts in a distinct generated remote-name namespace from real S3
            # mounts. The config backend is still rclone's S3 backend, but the remote section/file
            # name must not collide with `S3Mount` in the same session.
            return "gcs_s3"
        return "gcs"

    def build_docker_volume_driver_config(
        self,
        strategy: DockerVolumeMountStrategy,
    ) -> tuple[str, dict[str, str], bool]:
        if strategy.driver == "rclone":
            if self._use_s3_compatible_rclone():
                assert self.access_id is not None
                assert self.secret_access_key is not None
                hmac_options: dict[str, str] = {
                    "type": "s3",
                    "path": self._join_remote_path(self.bucket, self.prefix),
                    "s3-provider": "GCS",
                    "s3-access-key-id": self.access_id,
                    "s3-secret-access-key": self.secret_access_key,
                    "s3-endpoint": self.endpoint_url or "https://storage.googleapis.com",
                }
                if self.region is not None:
                    hmac_options["s3-region"] = self.region
                return strategy.driver, hmac_options | strategy.driver_options, self.read_only

            native_options: dict[str, str] = {
                "type": "google cloud storage",
                "path": self._join_remote_path(self.bucket, self.prefix),
            }
            if self.service_account_file is not None:
                native_options["gcs-service-account-file"] = self.service_account_file
            if self.service_account_credentials is not None:
                native_options["gcs-service-account-credentials"] = self.service_account_credentials
            if self.access_token is not None:
                native_options["gcs-access-token"] = self.access_token
            return strategy.driver, native_options | strategy.driver_options, self.read_only

        mountpoint_options: dict[str, str] = {
            "bucket": self.bucket,
            "endpoint_url": self.endpoint_url or "https://storage.googleapis.com",
        }
        if self.access_id is not None:
            mountpoint_options["access_key_id"] = self.access_id
        if self.secret_access_key is not None:
            mountpoint_options["secret_access_key"] = self.secret_access_key
        if self.region is not None:
            mountpoint_options["region"] = self.region
        if self.prefix is not None:
            mountpoint_options["prefix"] = self.prefix
        return strategy.driver, mountpoint_options | strategy.driver_options, self.read_only

    async def build_in_container_mount_config(
        self,
        session: BaseSandboxSession,
        pattern: MountPattern,
        *,
        include_config_text: bool,
    ) -> MountPatternConfig:
        if isinstance(pattern, RcloneMountPattern):
            if self._use_s3_compatible_rclone():
                remote_kind = self._rclone_remote_kind()
                return await self._build_rclone_config(
                    session=session,
                    pattern=pattern,
                    remote_kind=remote_kind,
                    remote_path=self._join_remote_path(self.bucket, self.prefix),
                    required_lines=self._s3_compatible_rclone_required_lines(
                        pattern.resolve_remote_name(
                            session_id=self._require_session_id_hex(session, self.type),
                            remote_kind=remote_kind,
                            mount_type=self.type,
                        )
                    ),
                    include_config_text=include_config_text,
                )

            remote_kind = self._rclone_remote_kind()
            return await self._build_rclone_config(
                session=session,
                pattern=pattern,
                remote_kind=remote_kind,
                remote_path=self._join_remote_path(self.bucket, self.prefix),
                required_lines=self._rclone_required_lines(
                    pattern.resolve_remote_name(
                        session_id=self._require_session_id_hex(session, self.type),
                        remote_kind=remote_kind,
                        mount_type=self.type,
                    )
                ),
                include_config_text=include_config_text,
            )
        if isinstance(pattern, MountpointMountPattern):
            options = pattern.options
            return MountpointMountConfig(
                bucket=self.bucket,
                access_key_id=self.access_id,
                secret_access_key=self.secret_access_key,
                session_token=None,
                prefix=self.prefix or options.prefix,
                region=self.region or options.region,
                endpoint_url=(
                    self.endpoint_url or options.endpoint_url or "https://storage.googleapis.com"
                ),
                mount_type=self.type,
                read_only=self.read_only,
            )
        raise MountConfigError(
            message="invalid mount_pattern type",
            context={"type": self.type},
        )

    def _rclone_required_lines(self, remote_name: str) -> list[str]:
        lines = [
            f"[{remote_name}]",
            "type = google cloud storage",
        ]
        if self.service_account_file:
            lines.append(f"service_account_file = {self.service_account_file}")
        if self.service_account_credentials:
            lines.append(f"service_account_credentials = {self.service_account_credentials}")
        if self.access_token:
            lines.append(f"access_token = {self.access_token}")
        if (
            self.service_account_file is None
            and self.service_account_credentials is None
            and self.access_token is None
        ):
            lines.append("env_auth = true")
        else:
            lines.append("env_auth = false")
        return lines

    def _s3_compatible_rclone_required_lines(self, remote_name: str) -> list[str]:
        lines = [
            f"[{remote_name}]",
            "type = s3",
            "provider = GCS",
            "env_auth = false",
            f"access_key_id = {self.access_id}",
            f"secret_access_key = {self.secret_access_key}",
            f"endpoint = {self.endpoint_url or 'https://storage.googleapis.com'}",
        ]
        if self.region:
            lines.append(f"region = {self.region}")
        return lines

apply async

apply(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> list[MaterializedFile]

Activate this mount for a manifest application pass.

In-container strategies run a live mount command here. Docker-volume strategies are intentionally no-ops because the backend attaches them before the session starts.

Source code in src/agents/sandbox/entries/mounts/base.py
async def apply(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> list[MaterializedFile]:
    """Activate this mount for a manifest application pass.

    In-container strategies run a live mount command here. Docker-volume strategies are
    intentionally no-ops because the backend attaches them before the session starts.
    """

    return await self.mount_strategy.activate(self, session, dest, base_dir)

model_post_init

model_post_init(context: object) -> None

Normalize mount metadata and validate that the active strategy fits this mount type.

Source code in src/agents/sandbox/entries/mounts/base.py
def model_post_init(self, context: object, /) -> None:
    """Normalize mount metadata and validate that the active strategy fits this mount type."""

    _ = context

    default_permissions = Permissions(
        owner=FileMode.ALL,
        group=FileMode.READ | FileMode.EXEC,
        other=FileMode.READ | FileMode.EXEC,
    )
    if (
        self.permissions.owner != default_permissions.owner
        or self.permissions.group != default_permissions.group
        or self.permissions.other != default_permissions.other
    ):
        warnings.warn(
            "Mount permissions are not enforced. "
            "Please configure access in the cloud provider instead; "
            "mount-level permissions can be unreliable.",
            stacklevel=2,
        )
        self.permissions.owner = default_permissions.owner
        self.permissions.group = default_permissions.group
        self.permissions.other = default_permissions.other
    self.permissions.directory = True
    if (
        not self.supported_in_container_patterns()
        and not self.supported_docker_volume_drivers()
    ):
        raise MountConfigError(
            message="mount type must support at least one mount strategy",
            context={"mount_type": self.type},
        )
    self.mount_strategy.validate_mount(self)

in_container_adapter

in_container_adapter() -> InContainerMountAdapter

Use pattern-driven in-container behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def in_container_adapter(self) -> InContainerMountAdapter:
    """Use pattern-driven in-container behavior for built-in provider mounts."""

    return InContainerMountAdapter(self)

docker_volume_adapter

docker_volume_adapter() -> DockerVolumeMountAdapter

Use Docker volume-driver behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def docker_volume_adapter(self) -> DockerVolumeMountAdapter:
    """Use Docker volume-driver behavior for built-in provider mounts."""

    return DockerVolumeMountAdapter(self)

unmount async

unmount(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> None

Deactivate this mount for manifest teardown.

Source code in src/agents/sandbox/entries/mounts/base.py
async def unmount(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> None:
    """Deactivate this mount for manifest teardown."""

    await self.mount_strategy.deactivate(self, session, dest, base_dir)

R2Mount

Bases: _ConfiguredMount

Source code in src/agents/sandbox/entries/mounts/providers/r2.py
class R2Mount(_ConfiguredMount):
    type: Literal["r2_mount"] = "r2_mount"
    bucket: str
    account_id: str
    access_key_id: str | None = None
    secret_access_key: str | None = None
    custom_domain: str | None = None

    def _validate_credential_pair(self) -> None:
        if (self.access_key_id is None) != (self.secret_access_key is None):
            raise MountConfigError(
                message="r2 credentials must include both access_key_id and secret_access_key",
                context={"type": self.type},
            )

    def supported_in_container_patterns(self) -> tuple[builtins.type[MountPattern], ...]:
        return (RcloneMountPattern,)

    def supported_docker_volume_drivers(self) -> frozenset[str]:
        return frozenset({"rclone"})

    def build_docker_volume_driver_config(
        self,
        strategy: DockerVolumeMountStrategy,
    ) -> tuple[str, dict[str, str], bool]:
        self._validate_credential_pair()
        options: dict[str, str] = {
            "type": "s3",
            "path": self.bucket,
            "s3-provider": "Cloudflare",
            "s3-endpoint": (
                self.custom_domain or f"https://{self.account_id}.r2.cloudflarestorage.com"
            ),
        }
        if self.access_key_id is not None:
            options["s3-access-key-id"] = self.access_key_id
        if self.secret_access_key is not None:
            options["s3-secret-access-key"] = self.secret_access_key
        return strategy.driver, options | strategy.driver_options, self.read_only

    async def build_in_container_mount_config(
        self,
        session: BaseSandboxSession,
        pattern: MountPattern,
        *,
        include_config_text: bool,
    ) -> MountPatternConfig:
        self._validate_credential_pair()
        if isinstance(pattern, RcloneMountPattern):
            return await self._build_rclone_config(
                session=session,
                pattern=pattern,
                remote_kind="r2",
                remote_path=self.bucket,
                required_lines=self._rclone_required_lines(
                    pattern.resolve_remote_name(
                        session_id=self._require_session_id_hex(session, self.type),
                        remote_kind="r2",
                        mount_type=self.type,
                    )
                ),
                include_config_text=include_config_text,
            )
        raise MountConfigError(
            message="invalid mount_pattern type",
            context={"type": self.type},
        )

    def _rclone_required_lines(self, remote_name: str) -> list[str]:
        lines = [
            f"[{remote_name}]",
            "type = s3",
            "provider = Cloudflare",
            (
                "endpoint = "
                f"{self.custom_domain or f'https://{self.account_id}.r2.cloudflarestorage.com'}"
            ),
            "acl = private",
        ]
        if self.access_key_id and self.secret_access_key:
            lines.append("env_auth = false")
            lines.append(f"access_key_id = {self.access_key_id}")
            lines.append(f"secret_access_key = {self.secret_access_key}")
        else:
            lines.append("env_auth = true")
        return lines

apply async

apply(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> list[MaterializedFile]

Activate this mount for a manifest application pass.

In-container strategies run a live mount command here. Docker-volume strategies are intentionally no-ops because the backend attaches them before the session starts.

Source code in src/agents/sandbox/entries/mounts/base.py
async def apply(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> list[MaterializedFile]:
    """Activate this mount for a manifest application pass.

    In-container strategies run a live mount command here. Docker-volume strategies are
    intentionally no-ops because the backend attaches them before the session starts.
    """

    return await self.mount_strategy.activate(self, session, dest, base_dir)

model_post_init

model_post_init(context: object) -> None

Normalize mount metadata and validate that the active strategy fits this mount type.

Source code in src/agents/sandbox/entries/mounts/base.py
def model_post_init(self, context: object, /) -> None:
    """Normalize mount metadata and validate that the active strategy fits this mount type."""

    _ = context

    default_permissions = Permissions(
        owner=FileMode.ALL,
        group=FileMode.READ | FileMode.EXEC,
        other=FileMode.READ | FileMode.EXEC,
    )
    if (
        self.permissions.owner != default_permissions.owner
        or self.permissions.group != default_permissions.group
        or self.permissions.other != default_permissions.other
    ):
        warnings.warn(
            "Mount permissions are not enforced. "
            "Please configure access in the cloud provider instead; "
            "mount-level permissions can be unreliable.",
            stacklevel=2,
        )
        self.permissions.owner = default_permissions.owner
        self.permissions.group = default_permissions.group
        self.permissions.other = default_permissions.other
    self.permissions.directory = True
    if (
        not self.supported_in_container_patterns()
        and not self.supported_docker_volume_drivers()
    ):
        raise MountConfigError(
            message="mount type must support at least one mount strategy",
            context={"mount_type": self.type},
        )
    self.mount_strategy.validate_mount(self)

in_container_adapter

in_container_adapter() -> InContainerMountAdapter

Use pattern-driven in-container behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def in_container_adapter(self) -> InContainerMountAdapter:
    """Use pattern-driven in-container behavior for built-in provider mounts."""

    return InContainerMountAdapter(self)

docker_volume_adapter

docker_volume_adapter() -> DockerVolumeMountAdapter

Use Docker volume-driver behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def docker_volume_adapter(self) -> DockerVolumeMountAdapter:
    """Use Docker volume-driver behavior for built-in provider mounts."""

    return DockerVolumeMountAdapter(self)

unmount async

unmount(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> None

Deactivate this mount for manifest teardown.

Source code in src/agents/sandbox/entries/mounts/base.py
async def unmount(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> None:
    """Deactivate this mount for manifest teardown."""

    await self.mount_strategy.deactivate(self, session, dest, base_dir)

S3Mount

Bases: _ConfiguredMount

Source code in src/agents/sandbox/entries/mounts/providers/s3.py
class S3Mount(_ConfiguredMount):
    type: Literal["s3_mount"] = "s3_mount"
    bucket: str
    access_key_id: str | None = None
    secret_access_key: str | None = None
    session_token: str | None = None
    prefix: str | None = None
    region: str | None = None
    endpoint_url: str | None = None
    s3_provider: str = "AWS"

    def supported_in_container_patterns(self) -> tuple[builtins.type[MountPattern], ...]:
        return (RcloneMountPattern, MountpointMountPattern)

    def supported_docker_volume_drivers(self) -> frozenset[str]:
        return frozenset({"mountpoint", "rclone"})

    def build_docker_volume_driver_config(
        self,
        strategy: DockerVolumeMountStrategy,
    ) -> tuple[str, dict[str, str], bool]:
        if strategy.driver == "rclone":
            options: dict[str, str] = {
                "type": "s3",
                "s3-provider": self.s3_provider,
                "path": self._join_remote_path(self.bucket, self.prefix),
            }
            if self.access_key_id is not None:
                options["s3-access-key-id"] = self.access_key_id
            if self.secret_access_key is not None:
                options["s3-secret-access-key"] = self.secret_access_key
            if self.session_token is not None:
                options["s3-session-token"] = self.session_token
            if self.endpoint_url is not None:
                options["s3-endpoint"] = self.endpoint_url
            if self.region is not None:
                options["s3-region"] = self.region
            return strategy.driver, options | strategy.driver_options, self.read_only

        options = {"bucket": self.bucket}
        if self.access_key_id is not None:
            options["access_key_id"] = self.access_key_id
        if self.secret_access_key is not None:
            options["secret_access_key"] = self.secret_access_key
        if self.session_token is not None:
            options["session_token"] = self.session_token
        if self.endpoint_url is not None:
            options["endpoint_url"] = self.endpoint_url
        if self.region is not None:
            options["region"] = self.region
        if self.prefix is not None:
            options["prefix"] = self.prefix
        return strategy.driver, options | strategy.driver_options, self.read_only

    async def build_in_container_mount_config(
        self,
        session: BaseSandboxSession,
        pattern: MountPattern,
        *,
        include_config_text: bool,
    ) -> MountPatternConfig:
        if isinstance(pattern, RcloneMountPattern):
            return await self._build_rclone_config(
                session=session,
                pattern=pattern,
                remote_kind="s3",
                remote_path=self._join_remote_path(self.bucket, self.prefix),
                required_lines=self._rclone_required_lines(
                    pattern.resolve_remote_name(
                        session_id=self._require_session_id_hex(session, self.type),
                        remote_kind="s3",
                        mount_type=self.type,
                    )
                ),
                include_config_text=include_config_text,
            )
        if isinstance(pattern, MountpointMountPattern):
            options = pattern.options
            return MountpointMountConfig(
                bucket=self.bucket,
                access_key_id=self.access_key_id,
                secret_access_key=self.secret_access_key,
                session_token=self.session_token,
                prefix=self.prefix or options.prefix,
                region=self.region or options.region,
                endpoint_url=self.endpoint_url or options.endpoint_url,
                mount_type=self.type,
                read_only=self.read_only,
            )
        raise MountConfigError(
            message="invalid mount_pattern type",
            context={"type": self.type},
        )

    def _rclone_required_lines(self, remote_name: str) -> list[str]:
        lines = [
            f"[{remote_name}]",
            "type = s3",
            f"provider = {self.s3_provider}",
        ]
        if self.endpoint_url is not None:
            lines.append(f"endpoint = {self.endpoint_url}")
        if self.region is not None:
            lines.append(f"region = {self.region}")
        if self.access_key_id and self.secret_access_key:
            lines.append("env_auth = false")
            lines.append(f"access_key_id = {self.access_key_id}")
            lines.append(f"secret_access_key = {self.secret_access_key}")
            if self.session_token:
                lines.append(f"session_token = {self.session_token}")
        else:
            lines.append("env_auth = true")
        return lines

apply async

apply(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> list[MaterializedFile]

Activate this mount for a manifest application pass.

In-container strategies run a live mount command here. Docker-volume strategies are intentionally no-ops because the backend attaches them before the session starts.

Source code in src/agents/sandbox/entries/mounts/base.py
async def apply(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> list[MaterializedFile]:
    """Activate this mount for a manifest application pass.

    In-container strategies run a live mount command here. Docker-volume strategies are
    intentionally no-ops because the backend attaches them before the session starts.
    """

    return await self.mount_strategy.activate(self, session, dest, base_dir)

model_post_init

model_post_init(context: object) -> None

Normalize mount metadata and validate that the active strategy fits this mount type.

Source code in src/agents/sandbox/entries/mounts/base.py
def model_post_init(self, context: object, /) -> None:
    """Normalize mount metadata and validate that the active strategy fits this mount type."""

    _ = context

    default_permissions = Permissions(
        owner=FileMode.ALL,
        group=FileMode.READ | FileMode.EXEC,
        other=FileMode.READ | FileMode.EXEC,
    )
    if (
        self.permissions.owner != default_permissions.owner
        or self.permissions.group != default_permissions.group
        or self.permissions.other != default_permissions.other
    ):
        warnings.warn(
            "Mount permissions are not enforced. "
            "Please configure access in the cloud provider instead; "
            "mount-level permissions can be unreliable.",
            stacklevel=2,
        )
        self.permissions.owner = default_permissions.owner
        self.permissions.group = default_permissions.group
        self.permissions.other = default_permissions.other
    self.permissions.directory = True
    if (
        not self.supported_in_container_patterns()
        and not self.supported_docker_volume_drivers()
    ):
        raise MountConfigError(
            message="mount type must support at least one mount strategy",
            context={"mount_type": self.type},
        )
    self.mount_strategy.validate_mount(self)

in_container_adapter

in_container_adapter() -> InContainerMountAdapter

Use pattern-driven in-container behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def in_container_adapter(self) -> InContainerMountAdapter:
    """Use pattern-driven in-container behavior for built-in provider mounts."""

    return InContainerMountAdapter(self)

docker_volume_adapter

docker_volume_adapter() -> DockerVolumeMountAdapter

Use Docker volume-driver behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def docker_volume_adapter(self) -> DockerVolumeMountAdapter:
    """Use Docker volume-driver behavior for built-in provider mounts."""

    return DockerVolumeMountAdapter(self)

unmount async

unmount(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> None

Deactivate this mount for manifest teardown.

Source code in src/agents/sandbox/entries/mounts/base.py
async def unmount(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> None:
    """Deactivate this mount for manifest teardown."""

    await self.mount_strategy.deactivate(self, session, dest, base_dir)

S3FilesMount

Bases: _ConfiguredMount

Mount an existing Amazon S3 Files file system inside the sandbox.

S3 Files exposes objects in an S3 bucket through an S3 file system that is mounted with the Linux s3files file-system type. AWS documents the mount helper at https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-files-mounting.html.

This mount does not create the S3 Files file system, mount target, VPC, or bucket configuration. It expects those resources to already exist and the sandbox container to run where the S3 Files mount target is reachable. In practice, run the container on infrastructure that has network access to a mount target in the S3 Files file system's VPC/AZ, and pass the file-system region when it cannot be discovered from the container's AWS environment. At mount time, the selected S3FilesMountPattern runs mount -t s3files inside the sandbox using file_system_id as the device, optional subpath as the file-system subdirectory, and any supplied mount-helper options such as mount_target_ip, access_point, region, or extra_options.

Source code in src/agents/sandbox/entries/mounts/providers/s3_files.py
class S3FilesMount(_ConfiguredMount):
    """Mount an existing Amazon S3 Files file system inside the sandbox.

    S3 Files exposes objects in an S3 bucket through an S3 file system that is
    mounted with the Linux `s3files` file-system type. AWS documents the mount
    helper at https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-files-mounting.html.

    This mount does not create the S3 Files file system, mount target, VPC, or
    bucket configuration. It expects those resources to already exist and the
    sandbox container to run where the S3 Files mount target is reachable. In
    practice, run the container on infrastructure that has network access to a
    mount target in the S3 Files file system's VPC/AZ, and pass the file-system
    region when it cannot be discovered from the container's AWS environment.
    At mount time, the selected `S3FilesMountPattern` runs `mount -t s3files`
    inside the sandbox using `file_system_id` as the device, optional `subpath`
    as the file-system subdirectory, and any supplied mount-helper options such
    as `mount_target_ip`, `access_point`, `region`, or `extra_options`.
    """

    type: Literal["s3_files_mount"] = "s3_files_mount"
    file_system_id: str
    subpath: str | None = None
    mount_target_ip: str | None = None
    access_point: str | None = None
    region: str | None = None
    extra_options: dict[str, str | None] = Field(default_factory=dict)

    def supported_in_container_patterns(self) -> tuple[builtins.type[MountPattern], ...]:
        return (S3FilesMountPattern,)

    async def build_in_container_mount_config(
        self,
        session: BaseSandboxSession,
        pattern: MountPattern,
        *,
        include_config_text: bool,
    ) -> MountPatternConfig:
        _ = (session, include_config_text)
        if isinstance(pattern, S3FilesMountPattern):
            options = pattern.options
            return S3FilesMountConfig(
                file_system_id=self.file_system_id,
                subpath=self.subpath,
                mount_target_ip=self.mount_target_ip or options.mount_target_ip,
                access_point=self.access_point or options.access_point,
                region=self.region or options.region,
                extra_options=options.extra_options | self.extra_options,
                mount_type=self.type,
                read_only=self.read_only,
            )
        raise MountConfigError(
            message="invalid mount_pattern type",
            context={"type": self.type},
        )

apply async

apply(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> list[MaterializedFile]

Activate this mount for a manifest application pass.

In-container strategies run a live mount command here. Docker-volume strategies are intentionally no-ops because the backend attaches them before the session starts.

Source code in src/agents/sandbox/entries/mounts/base.py
async def apply(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> list[MaterializedFile]:
    """Activate this mount for a manifest application pass.

    In-container strategies run a live mount command here. Docker-volume strategies are
    intentionally no-ops because the backend attaches them before the session starts.
    """

    return await self.mount_strategy.activate(self, session, dest, base_dir)

model_post_init

model_post_init(context: object) -> None

Normalize mount metadata and validate that the active strategy fits this mount type.

Source code in src/agents/sandbox/entries/mounts/base.py
def model_post_init(self, context: object, /) -> None:
    """Normalize mount metadata and validate that the active strategy fits this mount type."""

    _ = context

    default_permissions = Permissions(
        owner=FileMode.ALL,
        group=FileMode.READ | FileMode.EXEC,
        other=FileMode.READ | FileMode.EXEC,
    )
    if (
        self.permissions.owner != default_permissions.owner
        or self.permissions.group != default_permissions.group
        or self.permissions.other != default_permissions.other
    ):
        warnings.warn(
            "Mount permissions are not enforced. "
            "Please configure access in the cloud provider instead; "
            "mount-level permissions can be unreliable.",
            stacklevel=2,
        )
        self.permissions.owner = default_permissions.owner
        self.permissions.group = default_permissions.group
        self.permissions.other = default_permissions.other
    self.permissions.directory = True
    if (
        not self.supported_in_container_patterns()
        and not self.supported_docker_volume_drivers()
    ):
        raise MountConfigError(
            message="mount type must support at least one mount strategy",
            context={"mount_type": self.type},
        )
    self.mount_strategy.validate_mount(self)

in_container_adapter

in_container_adapter() -> InContainerMountAdapter

Use pattern-driven in-container behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def in_container_adapter(self) -> InContainerMountAdapter:
    """Use pattern-driven in-container behavior for built-in provider mounts."""

    return InContainerMountAdapter(self)

docker_volume_adapter

docker_volume_adapter() -> DockerVolumeMountAdapter

Use Docker volume-driver behavior for built-in provider mounts.

Source code in src/agents/sandbox/entries/mounts/providers/base.py
def docker_volume_adapter(self) -> DockerVolumeMountAdapter:
    """Use Docker volume-driver behavior for built-in provider mounts."""

    return DockerVolumeMountAdapter(self)

unmount async

unmount(
    session: BaseSandboxSession, dest: Path, base_dir: Path
) -> None

Deactivate this mount for manifest teardown.

Source code in src/agents/sandbox/entries/mounts/base.py
async def unmount(
    self,
    session: BaseSandboxSession,
    dest: Path,
    base_dir: Path,
) -> None:
    """Deactivate this mount for manifest teardown."""

    await self.mount_strategy.deactivate(self, session, dest, base_dir)

supported_docker_volume_drivers

supported_docker_volume_drivers() -> frozenset[str]

Return Docker volume driver names accepted by DockerVolumeMountStrategy.

Source code in src/agents/sandbox/entries/mounts/base.py
def supported_docker_volume_drivers(self) -> frozenset[str]:
    """Return Docker volume driver names accepted by `DockerVolumeMountStrategy`."""

    return frozenset()

build_docker_volume_driver_config

build_docker_volume_driver_config(
    strategy: DockerVolumeMountStrategy,
) -> tuple[str, dict[str, str], bool]

Build the Docker volume driver tuple for Docker-volume mounts.

Mount subclasses that do not support Docker volumes inherit this default unsupported implementation.

Source code in src/agents/sandbox/entries/mounts/base.py
def build_docker_volume_driver_config(
    self,
    strategy: DockerVolumeMountStrategy,
) -> tuple[str, dict[str, str], bool]:
    """Build the Docker volume driver tuple for Docker-volume mounts.

    Mount subclasses that do not support Docker volumes inherit this default unsupported
    implementation.
    """

    _ = strategy
    raise MountConfigError(
        message="docker-volume mounts are not supported for this mount type",
        context={"mount_type": self.type},
    )