跳转至

Base Sandbox Session

BaseSandboxSession

Bases: ABC

Source code in src/agents/sandbox/session/base_sandbox_session.py
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
class BaseSandboxSession(abc.ABC):
    state: SandboxSessionState
    _dependencies: Dependencies | None = None
    _dependencies_closed: bool = False
    _runtime_persist_workspace_skip_relpaths: set[Path] | None = None
    _pre_stop_hooks: list[Callable[[], Awaitable[None]]] | None = None
    _pre_stop_hooks_ran: bool = False
    _runtime_helpers_installed: set[Path] | None = None
    _runtime_helper_cache_key: object = _RUNTIME_HELPER_CACHE_KEY_UNSET
    _workspace_path_policy_cache: tuple[str, WorkspacePathPolicy] | None = None
    # True when start() is reusing a backend whose workspace files may still be present.
    # This controls whether start() can avoid a full manifest apply for non-snapshot resumes.
    _start_workspace_state_preserved: bool = False
    # True when start() is reusing a backend whose OS users and groups may still be present.
    # This controls whether snapshot restore needs to reprovision manifest-managed accounts.
    _start_system_state_preserved: bool = False
    # Snapshot of serialized workspace readiness after backend startup/reconnect.
    # Providers may set this to True during start only after a preserved-backend probe succeeds.
    _start_workspace_root_ready: bool | None = None
    _max_manifest_entry_concurrency: int | None = DEFAULT_MAX_MANIFEST_ENTRY_CONCURRENCY
    _max_local_dir_file_concurrency: int | None = DEFAULT_MAX_LOCAL_DIR_FILE_CONCURRENCY

    async def start(self) -> None:
        try:
            await self._ensure_backend_started()
            self._start_workspace_root_ready = self.state.workspace_root_ready
            await self._probe_workspace_root_for_preserved_resume()
            await self._prepare_backend_workspace()
            await self._ensure_runtime_helpers()
            await self._start_workspace()
        except Exception as e:
            await self._after_start_failed()
            wrapped = self._wrap_start_error(e)
            if wrapped is e:
                raise
            raise wrapped from e
        await self._after_start()
        self.state.workspace_root_ready = True

    def _set_concurrency_limits(self, limits: SandboxConcurrencyLimits) -> None:
        limits.validate()
        self._max_manifest_entry_concurrency = limits.manifest_entries
        self._max_local_dir_file_concurrency = limits.local_dir_files

    async def _ensure_backend_started(self) -> None:
        """Start, reconnect, or recreate the backend before workspace setup runs."""

        return

    async def _prepare_backend_workspace(self) -> None:
        """Prepare provider-specific workspace prerequisites before manifest or snapshot work."""

        return

    async def _probe_workspace_root_for_preserved_resume(self) -> bool:
        """Probe whether a preserved backend already has a usable workspace root."""

        if not self._workspace_state_preserved_on_start() or self._start_workspace_root_ready:
            return self._can_reuse_preserved_workspace_on_resume()

        try:
            result = await self.exec(
                "test",
                "-d",
                self.state.manifest.root,
                timeout=_WORKSPACE_ROOT_PROBE_TIMEOUT_S,
                shell=False,
            )
        except Exception:
            return False

        if not result.ok():
            return False

        self._mark_workspace_root_ready_from_probe()
        return True

    def _mark_workspace_root_ready_from_probe(self) -> None:
        """Record that the preserved-backend workspace root was proven ready."""

        self.state.workspace_root_ready = True
        self._start_workspace_root_ready = True

    def _set_start_state_preserved(self, workspace: bool, *, system: bool | None = None) -> None:
        """Record whether this start begins with preserved backend state."""

        self._start_workspace_state_preserved = workspace
        self._start_system_state_preserved = workspace if system is None else system

    def _workspace_state_preserved_on_start(self) -> bool:
        """Return whether start begins with previously persisted workspace state."""

        return self._start_workspace_state_preserved

    def _system_state_preserved_on_start(self) -> bool:
        """Return whether start begins with previously provisioned OS/user state."""

        return self._start_system_state_preserved

    async def _start_workspace(self) -> None:
        """Restore snapshot or apply manifest state after backend startup is complete."""

        if await self.state.snapshot.restorable(dependencies=self.dependencies):
            can_reuse_workspace = await self._can_reuse_restorable_snapshot_workspace()
            if can_reuse_workspace:
                # The preserved workspace already matches the snapshot, so only rebuild ephemeral
                # manifest state that intentionally was not persisted.
                await self._reapply_ephemeral_manifest_on_resume()
            else:
                # Fresh workspaces and drifted preserved workspaces both need the durable snapshot
                # restored before ephemeral state is rebuilt.
                await self._restore_snapshot_into_workspace_on_resume()
                if self.should_provision_manifest_accounts_on_resume():
                    await self.provision_manifest_accounts()
                await self._reapply_ephemeral_manifest_on_resume()
        elif self._can_reuse_preserved_workspace_on_resume():
            # There is no durable snapshot to restore, but a reconnected backend may still need
            # ephemeral mounts/files refreshed without reapplying the full manifest.
            await self._reapply_ephemeral_manifest_on_resume()
        else:
            # A fresh backend without a restorable snapshot needs the full manifest materialized.
            await self._apply_manifest(
                provision_accounts=self.should_provision_manifest_accounts_on_resume()
            )

    async def _can_reuse_restorable_snapshot_workspace(self) -> bool:
        """Return whether a restorable snapshot can be skipped for this start."""

        if not self._can_reuse_preserved_workspace_on_resume():
            return False
        is_running = await self.running()
        return await self._can_skip_snapshot_restore_on_resume(is_running=is_running)

    def _can_reuse_preserved_workspace_on_resume(self) -> bool:
        """Return whether preserved workspace state is proven safe to reuse."""

        workspace_root_ready = self._start_workspace_root_ready
        if workspace_root_ready is None:
            workspace_root_ready = self.state.workspace_root_ready
        return self._workspace_state_preserved_on_start() and workspace_root_ready

    async def _after_start(self) -> None:
        """Run provider bookkeeping after workspace setup succeeds."""

        return

    async def _after_start_failed(self) -> None:
        """Run provider bookkeeping after workspace setup fails."""

        return

    def _wrap_start_error(self, error: Exception) -> Exception:
        """Return a provider-specific start error, or the original error."""

        return error

    async def stop(self) -> None:
        """
        Persist/snapshot the workspace.

        Note: `stop()` is intentionally persistence-only. Sandboxes that need to tear down
        sandbox resources (Docker containers, remote sessions, etc.) should implement
        `shutdown()` instead.
        """
        try:
            try:
                await self._before_stop()
                await self._persist_snapshot()
            except Exception as e:
                wrapped = self._wrap_stop_error(e)
                if wrapped is e:
                    raise
                raise wrapped from e
        finally:
            await self._after_stop()

    async def _before_stop(self) -> None:
        """Run transient process cleanup before snapshot persistence."""

        await self.pty_terminate_all()

    async def _persist_snapshot(self) -> None:
        """Persist/snapshot the workspace."""

        if isinstance(self.state.snapshot, NoopSnapshot):
            return

        fingerprint_record: dict[str, str] | None = None
        try:
            fingerprint_record = await self._compute_and_cache_snapshot_fingerprint()
        except Exception:
            fingerprint_record = None

        workspace_archive = await self.persist_workspace()
        try:
            await self.state.snapshot.persist(workspace_archive, dependencies=self.dependencies)
        except Exception:
            if fingerprint_record is not None:
                await self._delete_cached_snapshot_fingerprint_best_effort()
            raise
        finally:
            try:
                workspace_archive.close()
            except Exception:
                pass

        if fingerprint_record is None:
            self.state.snapshot_fingerprint = None
            self.state.snapshot_fingerprint_version = None
            return

        self.state.snapshot_fingerprint = fingerprint_record["fingerprint"]
        self.state.snapshot_fingerprint_version = fingerprint_record["version"]

    def _wrap_stop_error(self, error: Exception) -> Exception:
        """Return a provider-specific stop error, or the original error."""

        return error

    async def _after_stop(self) -> None:
        """Run provider bookkeeping after stop finishes or fails."""

        return

    def supports_docker_volume_mounts(self) -> bool:
        """Return whether this backend attaches Docker volume mounts before manifest apply."""

        return False

    def supports_pty(self) -> bool:
        return False

    async def shutdown(self) -> None:
        """
        Tear down sandbox resources (best-effort).

        Default is a no-op. Sandbox-specific sessions (e.g. Docker) should override.
        """
        await self._before_shutdown()
        await self._shutdown_backend()
        await self._after_shutdown()

    async def _before_shutdown(self) -> None:
        """Run transient process cleanup before backend shutdown."""

        await self.pty_terminate_all()

    async def _shutdown_backend(self) -> None:
        """Tear down provider-specific backend resources."""

        return

    async def _after_shutdown(self) -> None:
        """Run provider bookkeeping after backend shutdown."""

        return

    async def __aenter__(self) -> Self:
        await self.start()
        return self

    async def aclose(self) -> None:
        """Run the session cleanup lifecycle outside of ``async with``.

        This performs the same session-owned cleanup as ``__aexit__()``: persist/snapshot the
        workspace via ``stop()``, tear down session resources via ``shutdown()``, and close
        session-scoped dependencies. If the session came from a sandbox client, call the client's
        ``delete()`` separately for backend-specific deletion such as removing a Docker container
        or deleting a temporary host workspace.
        """
        try:
            await self.run_pre_stop_hooks()
            await self.stop()
            await self.shutdown()
        finally:
            await self._aclose_dependencies()

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        tb: object | None,
    ) -> None:
        await self.aclose()

    @property
    def dependencies(self) -> Dependencies:
        dependencies = self._dependencies
        if dependencies is None:
            dependencies = Dependencies()
            self._dependencies = dependencies
            self._dependencies_closed = False
        return dependencies

    def set_dependencies(self, dependencies: Dependencies | None) -> None:
        if dependencies is None:
            return
        self._dependencies = dependencies
        self._dependencies_closed = False

    def register_pre_stop_hook(self, hook: Callable[[], Awaitable[None]]) -> None:
        """Register an async hook to run once before the session workspace is persisted."""

        hooks = self._pre_stop_hooks
        if hooks is None:
            hooks = []
            self._pre_stop_hooks = hooks
        hooks.append(hook)
        self._pre_stop_hooks_ran = False

    async def run_pre_stop_hooks(self) -> None:
        """Run registered pre-stop hooks once before workspace persistence."""

        hooks = self._pre_stop_hooks
        if hooks is None or self._pre_stop_hooks_ran:
            return
        self._pre_stop_hooks_ran = True
        cleanup_error: BaseException | None = None
        for hook in hooks:
            try:
                await hook()
            except BaseException as exc:
                if cleanup_error is None:
                    cleanup_error = exc
        if cleanup_error is not None:
            raise cleanup_error

    async def _run_pre_stop_hooks(self) -> None:
        await self.run_pre_stop_hooks()

    async def _aclose_dependencies(self) -> None:
        dependencies = self._dependencies
        if dependencies is None or self._dependencies_closed:
            return
        self._dependencies_closed = True
        await dependencies.aclose()

    @staticmethod
    def _workspace_relpaths_overlap(lhs: Path, rhs: Path) -> bool:
        return lhs == rhs or lhs in rhs.parents or rhs in lhs.parents

    def _mount_relpaths_within_workspace(self) -> set[Path]:
        root = Path(self.state.manifest.root)
        mount_relpaths: set[Path] = set()
        for _mount_entry, mount_path in self.state.manifest.mount_targets():
            try:
                mount_relpaths.add(mount_path.relative_to(root))
            except ValueError:
                continue
        return mount_relpaths

    def _overlapping_mount_relpaths(self, rel_path: Path) -> set[Path]:
        return {
            mount_relpath
            for mount_relpath in self._mount_relpaths_within_workspace()
            if self._workspace_relpaths_overlap(rel_path, mount_relpath)
        }

    def _native_snapshot_requires_tar_fallback(self) -> bool:
        for mount_entry, _mount_path in self.state.manifest.mount_targets():
            if not mount_entry.mount_strategy.supports_native_snapshot_detach(mount_entry):
                return True
        return False

    def register_persist_workspace_skip_path(self, path: Path | str) -> Path:
        """Exclude a runtime-created workspace path from future workspace snapshots.

        Use this for session side effects that are not part of durable workspace state, such as
        generated mount config or ephemeral sink output.
        """

        rel_path = Manifest._coerce_rel_path(path)
        Manifest._validate_rel_path(rel_path)
        if rel_path in (Path(""), Path(".")):
            raise ValueError("Persist workspace skip paths must target a concrete relative path.")
        overlapping_mounts = self._overlapping_mount_relpaths(rel_path)
        if overlapping_mounts:
            overlapping_mount = min(overlapping_mounts, key=lambda p: (len(p.parts), p.as_posix()))
            raise MountConfigError(
                message="persist workspace skip path must not overlap mount path",
                context={
                    "skip_path": rel_path.as_posix(),
                    "mount_path": overlapping_mount.as_posix(),
                },
            )

        if self._runtime_persist_workspace_skip_relpaths is None:
            self._runtime_persist_workspace_skip_relpaths = set()
        self._runtime_persist_workspace_skip_relpaths.add(rel_path)
        return rel_path

    def _persist_workspace_skip_relpaths(self) -> set[Path]:
        skip_paths = set(self.state.manifest.ephemeral_persistence_paths())
        if self._runtime_persist_workspace_skip_relpaths:
            skip_paths.update(self._runtime_persist_workspace_skip_relpaths)
        return skip_paths

    async def exec(
        self,
        *command: str | Path,
        timeout: float | None = None,
        shell: bool | list[str] = True,
        user: str | User | None = None,
    ) -> ExecResult:
        """Execute a command inside the session.

        :param command: Command and args (will be stringified).
        :param timeout: Optional wall-clock timeout in seconds.
        :param shell: Whether to run this command in a shell. If ``True`` is provided,
            the command will be run prefixed by ``sh -lc``. A custom shell prefix may be used
            by providing a list.

        :returns: An ``ExecResult`` containing stdout/stderr and exit code.

        :raises TimeoutError: If the sandbox cannot complete within `timeout`.
        """

        sanitized_command = self._prepare_exec_command(*command, shell=shell, user=user)
        return await self._exec_internal(*sanitized_command, timeout=timeout)

    async def resolve_exposed_port(self, port: int) -> ExposedPortEndpoint:
        self._assert_exposed_port_configured(port)
        return await self._resolve_exposed_port(port)

    def _assert_exposed_port_configured(self, port: int) -> None:
        if port not in self.state.exposed_ports:
            raise ExposedPortUnavailableError(
                port=port,
                exposed_ports=self.state.exposed_ports,
                reason="not_configured",
            )

    def _prepare_exec_command(
        self,
        *command: str | Path,
        shell: bool | list[str],
        user: str | User | None,
    ) -> list[str]:
        sanitized_command = [str(c) for c in command]

        if shell:
            joined = (
                sanitized_command[0]
                if len(sanitized_command) == 1
                else shlex.join(sanitized_command)
            )
            if isinstance(shell, list):
                sanitized_command = shell + [joined]
            else:
                sanitized_command = ["sh", "-lc", joined]

        if user:
            if isinstance(user, User):
                user = user.name

            assert isinstance(user, str)

            sanitized_command = ["sudo", "-u", user, "--"] + sanitized_command

        return sanitized_command

    def _resolve_pty_session_entry(
        self, *, pty_processes: Mapping[int, _PtyEntryT], session_id: int
    ) -> _PtyEntryT:
        entry = pty_processes.get(session_id)
        if entry is None:
            raise PtySessionNotFoundError(session_id=session_id)
        return entry

    async def pty_exec_start(
        self,
        *command: str | Path,
        timeout: float | None = None,
        shell: bool | list[str] = True,
        user: str | User | None = None,
        tty: bool = False,
        yield_time_s: float | None = None,
        max_output_tokens: int | None = None,
    ) -> PtyExecUpdate:
        _ = (command, timeout, shell, user, tty, yield_time_s, max_output_tokens)
        raise NotImplementedError("PTY execution is not supported by this sandbox session")

    async def pty_write_stdin(
        self,
        *,
        session_id: int,
        chars: str,
        yield_time_s: float | None = None,
        max_output_tokens: int | None = None,
    ) -> PtyExecUpdate:
        _ = (session_id, chars, yield_time_s, max_output_tokens)
        raise NotImplementedError("PTY execution is not supported by this sandbox session")

    async def pty_terminate_all(self) -> None:
        return

    @abc.abstractmethod
    async def _exec_internal(
        self,
        *command: str | Path,
        timeout: float | None = None,
    ) -> ExecResult: ...

    async def _resolve_exposed_port(self, port: int) -> ExposedPortEndpoint:
        raise ExposedPortUnavailableError(
            port=port,
            exposed_ports=self.state.exposed_ports,
            reason="backend_unavailable",
            context={"backend": type(self).__name__},
        )

    def _runtime_helpers(self) -> tuple[RuntimeHelperScript, ...]:
        return ()

    def _current_runtime_helper_cache_key(self) -> object | None:
        return None

    def _sync_runtime_helper_install_cache(self) -> None:
        current_key = self._current_runtime_helper_cache_key()
        cached_key = self._runtime_helper_cache_key
        if cached_key is _RUNTIME_HELPER_CACHE_KEY_UNSET:
            self._runtime_helper_cache_key = current_key
            return
        if cached_key != current_key:
            self._runtime_helpers_installed = None
            self._runtime_helper_cache_key = current_key

    async def _ensure_runtime_helper_installed(self, helper: RuntimeHelperScript) -> Path:
        self._sync_runtime_helper_install_cache()
        installed = self._runtime_helpers_installed
        if installed is None:
            installed = set()
            self._runtime_helpers_installed = installed

        install_path = helper.install_path
        if install_path in installed:
            probe = await self.exec(*helper.present_command(), shell=False)
            if probe.ok():
                return install_path
            self._sync_runtime_helper_install_cache()
            installed = self._runtime_helpers_installed
            if installed is None:
                installed = set()
                self._runtime_helpers_installed = installed
            installed.discard(install_path)

        result = await self.exec(*helper.install_command(), shell=False)
        if not result.ok():
            raise ExecNonZeroError(
                result,
                command=("install_runtime_helper", str(install_path)),
            )

        self._sync_runtime_helper_install_cache()
        installed = self._runtime_helpers_installed
        if installed is None:
            installed = set()
            self._runtime_helpers_installed = installed
        installed.add(install_path)
        return install_path

    async def _ensure_runtime_helpers(self) -> None:
        for helper in self._runtime_helpers():
            await self._ensure_runtime_helper_installed(helper)

    def _workspace_path_policy(self) -> WorkspacePathPolicy:
        root = self.state.manifest.root
        cached = self._workspace_path_policy_cache
        if cached is not None and cached[0] == root:
            return cached[1]

        policy = WorkspacePathPolicy(root=root)
        self._workspace_path_policy_cache = (root, policy)
        return policy

    async def _normalize_path_for_io(self, path: Path | str) -> Path:
        return self.normalize_path(path)

    async def _normalize_path_for_remote_io(self, path: Path | str) -> Path:
        """Validate a workspace path against the remote sandbox filesystem before IO.

        The returned path is the normalized workspace path, not the resolved realpath. This keeps
        safe leaf symlink operations working normally, such as removing a symlink instead of its
        target, while still rejecting paths whose resolved remote target escapes the workspace.
        """

        original_path = Path(path)
        root = Path(self.state.manifest.root)
        workspace_path = self._workspace_path_policy().absolute_workspace_path(original_path)
        helper_path = await self._ensure_runtime_helper_installed(RESOLVE_WORKSPACE_PATH_HELPER)
        command = (str(helper_path), str(root), str(workspace_path))
        result = await self.exec(*command, shell=False)
        if result.ok():
            resolved = result.stdout.decode("utf-8", errors="replace").strip()
            if resolved:
                # Preserve the requested workspace path so leaf symlinks keep their normal
                # semantics while the remote realpath check still enforces workspace confinement.
                return workspace_path
            raise ExecTransportError(
                command=("resolve_workspace_path", str(root), str(workspace_path)),
                context={
                    "reason": "empty_stdout",
                    "exit_code": result.exit_code,
                    "stdout": "",
                    "stderr": result.stderr.decode("utf-8", errors="replace"),
                },
            )

        reason: Literal["absolute", "escape_root"] = (
            "absolute" if original_path.is_absolute() else "escape_root"
        )
        if result.exit_code == 111:
            raise InvalidManifestPathError(
                rel=original_path,
                reason=reason,
                context={
                    "resolved_path": result.stderr.decode("utf-8", errors="replace").strip(),
                },
            )
        raise ExecNonZeroError(
            result, command=("resolve_workspace_path", str(root), str(workspace_path))
        )

    @abc.abstractmethod
    async def read(self, path: Path, *, user: str | User | None = None) -> io.IOBase:
        """Read a file from the session's workspace.

        :param path: Absolute path in the container or path relative to the
                workspace root.
        :param user: Optional sandbox user to perform the read as.
        :returns: A readable file-like object.
        :raises: FileNotFoundError: If the path does not exist.
        """

    @abc.abstractmethod
    async def write(
        self,
        path: Path,
        data: io.IOBase,
        *,
        user: str | User | None = None,
    ) -> None:
        """Write a file into the session's workspace.

        :param path: Absolute path in the container or path relative to the
                workspace root.
        :param data: A file-like object positioned at the start of the payload.
        :param user: Optional sandbox user to perform the write as.
        """

    async def _check_read_with_exec(self, path: Path, *, user: str | User | None = None) -> Path:
        workspace_path = await self._normalize_path_for_io(path)
        cmd = ("sh", "-lc", '[ -r "$1" ]', "sh", str(workspace_path))
        result = await self.exec(*cmd, shell=False, user=user)
        if not result.ok():
            raise WorkspaceReadNotFoundError(
                path=path,
                context={
                    "command": ["sh", "-lc", "<read_access_check>", str(workspace_path)],
                    "stdout": result.stdout.decode("utf-8", errors="replace"),
                    "stderr": result.stderr.decode("utf-8", errors="replace"),
                },
            )
        return workspace_path

    async def _check_write_with_exec(self, path: Path, *, user: str | User | None = None) -> Path:
        workspace_path = await self._normalize_path_for_io(path)
        cmd = ("sh", "-lc", _WRITE_ACCESS_CHECK_SCRIPT, "sh", str(workspace_path))
        result = await self.exec(*cmd, shell=False, user=user)
        if not result.ok():
            raise WorkspaceArchiveWriteError(
                path=workspace_path,
                context={
                    "command": ["sh", "-lc", "<write_access_check>", str(workspace_path)],
                    "stdout": result.stdout.decode("utf-8", errors="replace"),
                    "stderr": result.stderr.decode("utf-8", errors="replace"),
                },
            )
        return workspace_path

    async def _check_mkdir_with_exec(
        self,
        path: Path | str,
        *,
        parents: bool = False,
        user: str | User | None = None,
    ) -> Path:
        workspace_path = await self._normalize_path_for_io(path)
        parents_flag = "1" if parents else "0"
        cmd = ("sh", "-lc", _MKDIR_ACCESS_CHECK_SCRIPT, "sh", str(workspace_path), parents_flag)
        result = await self.exec(*cmd, shell=False, user=user)
        if not result.ok():
            raise WorkspaceArchiveWriteError(
                path=workspace_path,
                context={
                    "command": [
                        "sh",
                        "-lc",
                        "<mkdir_access_check>",
                        str(workspace_path),
                        parents_flag,
                    ],
                    "stdout": result.stdout.decode("utf-8", errors="replace"),
                    "stderr": result.stderr.decode("utf-8", errors="replace"),
                },
            )
        return workspace_path

    async def _check_rm_with_exec(
        self,
        path: Path | str,
        *,
        recursive: bool = False,
        user: str | User | None = None,
    ) -> Path:
        workspace_path = await self._normalize_path_for_io(path)
        recursive_flag = "1" if recursive else "0"
        cmd = ("sh", "-lc", _RM_ACCESS_CHECK_SCRIPT, "sh", str(workspace_path), recursive_flag)
        result = await self.exec(*cmd, shell=False, user=user)
        if not result.ok():
            raise WorkspaceArchiveWriteError(
                path=workspace_path,
                context={
                    "command": [
                        "sh",
                        "-lc",
                        "<rm_access_check>",
                        str(workspace_path),
                        recursive_flag,
                    ],
                    "stdout": result.stdout.decode("utf-8", errors="replace"),
                    "stderr": result.stderr.decode("utf-8", errors="replace"),
                },
            )
        return workspace_path

    @abc.abstractmethod
    async def running(self) -> bool:
        """
        :returns: whether the underlying sandbox is currently running.
        """

    @abc.abstractmethod
    async def persist_workspace(self) -> io.IOBase:
        """Serialize the session's workspace into a byte stream.

        :returns: A readable byte stream representing the workspace contents.
            Portable tar streams must use workspace-relative member paths rather than
            embedding the source backend's workspace root directory.
        """

    @abc.abstractmethod
    async def hydrate_workspace(self, data: io.IOBase) -> None:
        """Populate the session's workspace from a serialized byte stream.

        :param data: A readable byte stream as produced by `persist_workspace`.
            Portable tar streams are extracted underneath this session's workspace root.
        """

    async def ls(
        self,
        path: Path | str,
        *,
        user: str | User | None = None,
    ) -> list[FileEntry]:
        """List directory contents.

        :param path: Path to list.
        :param user: Optional sandbox user to list as.
        :returns: A list of `FileEntry` objects.
        """
        path = await self._normalize_path_for_io(path)

        cmd = ("ls", "-la", "--", str(path))
        result = await self.exec(*cmd, shell=False, user=user)
        if not result.ok():
            raise ExecNonZeroError(result, command=cmd)

        return parse_ls_la(result.stdout.decode("utf-8", errors="replace"), base=str(path))

    async def rm(
        self,
        path: Path | str,
        *,
        recursive: bool = False,
        user: str | User | None = None,
    ) -> None:
        """Remove a file or directory.

        :param path: Path to remove.
        :param recursive: If true, remove directories recursively.
        :param user: Optional sandbox user to remove as.
        """
        path = await self._normalize_path_for_io(path)

        cmd: list[str] = ["rm"]
        if recursive:
            cmd.append("-rf")
        cmd.extend(["--", str(path)])

        result = await self.exec(*cmd, shell=False, user=user)
        if not result.ok():
            raise ExecNonZeroError(result, command=cmd)

    async def mkdir(
        self,
        path: Path | str,
        *,
        parents: bool = False,
        user: str | User | None = None,
    ) -> None:
        """Create a directory.

        :param path: Directory to create on the remote.
        :param parents: If true, create missing parents.
        :param user: Optional sandbox user to create the directory as.
        """
        path = await self._normalize_path_for_io(path)

        cmd: list[str] = ["mkdir"]
        if parents:
            cmd.append("-p")
        cmd.append(str(path))

        result = await self.exec(*cmd, shell=False, user=user)
        if not result.ok():
            raise ExecNonZeroError(result, command=cmd)

    async def extract(
        self,
        path: Path | str,
        data: io.IOBase,
        *,
        compression_scheme: Literal["tar", "zip"] | None = None,
    ) -> None:
        """
        Write a compressed archive to a destination on the remote.
        Optionally extract the archive once written.

        :param path: Path on the host machine to extract to
        :param data: a file-like io stream.
        :param compression_scheme: either "tar" or "zip". If not provided,
            it will try to infer from the path.
        """
        if isinstance(path, str):
            path = Path(path)

        if compression_scheme is None:
            suffix = path.suffix.removeprefix(".")
            compression_scheme = cast(Literal["tar", "zip"], suffix) if suffix else None

        if compression_scheme is None or compression_scheme not in ["zip", "tar"]:
            raise InvalidCompressionSchemeError(path=path, scheme=compression_scheme)

        normalized_path = await self._normalize_path_for_io(path)
        destination_root = normalized_path.parent

        # Materialize the archive into a local spool once because both `write()` and the
        # extraction step consume the stream, and zip extraction may require seeking.
        spool = tempfile.SpooledTemporaryFile(max_size=16 * 1024 * 1024, mode="w+b")
        try:
            shutil.copyfileobj(data, spool)
            spool.seek(0)
            await self.write(normalized_path, spool)
            spool.seek(0)

            if compression_scheme == "tar":
                await self._extract_tar_archive(
                    archive_path=normalized_path,
                    destination_root=destination_root,
                    data=spool,
                )
            else:
                await self._extract_zip_archive(
                    archive_path=normalized_path,
                    destination_root=destination_root,
                    data=spool,
                )
        finally:
            spool.close()

    async def apply_patch(
        self,
        operations: ApplyPatchOperation
        | dict[str, object]
        | list[ApplyPatchOperation | dict[str, object]],
        *,
        patch_format: PatchFormat | Literal["v4a"] = "v4a",
    ) -> str:
        return await WorkspaceEditor(self).apply_patch(operations, patch_format=patch_format)

    def normalize_path(self, path: Path | str) -> Path:
        return self._workspace_path_policy().absolute_workspace_path(path)

    def describe(self) -> str:
        return self.state.manifest.describe()

    async def _extract_tar_archive(
        self,
        *,
        archive_path: Path,
        destination_root: Path,
        data: io.IOBase,
    ) -> None:
        extractor = WorkspaceArchiveExtractor(
            mkdir=lambda path: self.mkdir(path, parents=True),
            write=self.write,
            ls=lambda path: self.ls(path),
        )
        await extractor.extract_tar_archive(
            archive_path=archive_path,
            destination_root=destination_root,
            data=data,
        )

    async def _extract_zip_archive(
        self,
        *,
        archive_path: Path,
        destination_root: Path,
        data: io.IOBase,
    ) -> None:
        extractor = WorkspaceArchiveExtractor(
            mkdir=lambda path: self.mkdir(path, parents=True),
            write=self.write,
            ls=lambda path: self.ls(path),
        )
        await extractor.extract_zip_archive(
            archive_path=archive_path,
            destination_root=destination_root,
            data=data,
        )

    @staticmethod
    def _safe_zip_member_rel_path(member) -> Path | None:
        return safe_zip_member_rel_path(member)

    async def _apply_manifest(
        self,
        *,
        only_ephemeral: bool = False,
        provision_accounts: bool = True,
    ) -> MaterializationResult:
        applier = ManifestApplier(
            mkdir=lambda path: self.mkdir(path, parents=True),
            exec_checked_nonzero=self._exec_checked_nonzero,
            apply_entry=lambda artifact, dest, base_dir: artifact.apply(self, dest, base_dir),
            max_entry_concurrency=self._max_manifest_entry_concurrency,
        )
        return await applier.apply_manifest(
            self.state.manifest,
            only_ephemeral=only_ephemeral,
            provision_accounts=provision_accounts,
            base_dir=self._manifest_base_dir(),
        )

    async def apply_manifest(self, *, only_ephemeral: bool = False) -> MaterializationResult:
        return await self._apply_manifest(
            only_ephemeral=only_ephemeral,
            provision_accounts=not only_ephemeral,
        )

    async def provision_manifest_accounts(self) -> None:
        applier = ManifestApplier(
            mkdir=lambda path: self.mkdir(path, parents=True),
            exec_checked_nonzero=self._exec_checked_nonzero,
            apply_entry=lambda artifact, dest, base_dir: artifact.apply(self, dest, base_dir),
        )
        await applier.provision_accounts(self.state.manifest)

    def should_provision_manifest_accounts_on_resume(self) -> bool:
        """Return whether resume should reprovision manifest-managed users and groups."""

        return not self._system_state_preserved_on_start()

    async def _reapply_ephemeral_manifest_on_resume(self) -> None:
        """Rebuild ephemeral manifest state without touching persisted workspace files."""

        await self.apply_manifest(only_ephemeral=True)

    async def _restore_snapshot_into_workspace_on_resume(self) -> None:
        """Clear the live workspace contents and repopulate them from the persisted snapshot."""

        await self._clear_workspace_root_on_resume()
        workspace_archive = await self.state.snapshot.restore(dependencies=self.dependencies)
        try:
            await self.hydrate_workspace(workspace_archive)
        finally:
            try:
                workspace_archive.close()
            except Exception:
                pass

    async def _live_workspace_matches_snapshot_on_resume(self) -> bool:
        """Return whether the running sandbox workspace definitely matches the stored snapshot."""

        stored_fingerprint = self.state.snapshot_fingerprint
        stored_version = self.state.snapshot_fingerprint_version
        if not stored_fingerprint or not stored_version:
            return False

        try:
            cached_record = await self._compute_and_cache_snapshot_fingerprint()
        except Exception:
            return False

        return (
            cached_record.get("fingerprint") == stored_fingerprint
            and cached_record.get("version") == stored_version
        )

    async def _can_skip_snapshot_restore_on_resume(self, *, is_running: bool) -> bool:
        """Return whether resume can safely reuse the running workspace without restore."""

        if not is_running:
            return False
        return await self._live_workspace_matches_snapshot_on_resume()

    def _snapshot_fingerprint_cache_path(self) -> Path:
        """Return the runtime-owned path for this session's cached snapshot fingerprint."""

        return (
            Path("/tmp/openai-agents/session-state")
            / self.state.session_id.hex
            / "fingerprint.json"
        )

    def _workspace_fingerprint_skip_relpaths(self) -> set[Path]:
        """Return workspace paths that should be omitted from snapshot fingerprinting."""

        skip_paths = self._persist_workspace_skip_relpaths()
        skip_paths.update(self._workspace_resume_mount_skip_relpaths())
        return skip_paths

    async def _compute_and_cache_snapshot_fingerprint(self) -> dict[str, str]:
        """Compute the current workspace fingerprint in-container and atomically cache it."""

        helper_path = await self._ensure_runtime_helper_installed(WORKSPACE_FINGERPRINT_HELPER)
        command = [
            str(helper_path),
            str(self.state.manifest.root),
            self._snapshot_fingerprint_version(),
            str(self._snapshot_fingerprint_cache_path()),
            self._resume_manifest_digest(),
        ]
        command.extend(
            rel_path.as_posix()
            for rel_path in sorted(
                self._workspace_fingerprint_skip_relpaths(),
                key=lambda path: path.as_posix(),
            )
        )
        result = await self.exec(*command, shell=False)
        if not result.ok():
            raise ExecNonZeroError(result, command=("compute_workspace_fingerprint", *command[1:]))
        return self._parse_snapshot_fingerprint_record(result.stdout)

    async def _read_cached_snapshot_fingerprint(self) -> dict[str, str]:
        """Read the cached snapshot fingerprint record from the running sandbox."""

        result = await self.exec(
            "cat",
            "--",
            str(self._snapshot_fingerprint_cache_path()),
            shell=False,
        )
        if not result.ok():
            raise ExecNonZeroError(
                result,
                command=("cat", str(self._snapshot_fingerprint_cache_path())),
            )
        return self._parse_snapshot_fingerprint_record(result.stdout)

    def _parse_snapshot_fingerprint_record(
        self, payload: bytes | bytearray | str
    ) -> dict[str, str]:
        """Validate and normalize a cached snapshot fingerprint JSON payload."""

        raw = payload.decode("utf-8") if isinstance(payload, bytes | bytearray) else payload
        data = json.loads(raw)
        if not isinstance(data, dict):
            raise ValueError("snapshot fingerprint payload must be a JSON object")
        fingerprint = data.get("fingerprint")
        version = data.get("version")
        if not isinstance(fingerprint, str) or not fingerprint:
            raise ValueError("snapshot fingerprint payload is missing `fingerprint`")
        if not isinstance(version, str) or not version:
            raise ValueError("snapshot fingerprint payload is missing `version`")
        return {"fingerprint": fingerprint, "version": version}

    async def _delete_cached_snapshot_fingerprint_best_effort(self) -> None:
        """Remove the cached snapshot fingerprint file without raising on cleanup failure."""

        try:
            await self.exec(
                "rm",
                "-f",
                "--",
                str(self._snapshot_fingerprint_cache_path()),
                shell=False,
            )
        except Exception:
            return

    def _snapshot_fingerprint_version(self) -> str:
        """Return the version tag for the current snapshot fingerprint algorithm."""

        return _SNAPSHOT_FINGERPRINT_VERSION

    def _resume_manifest_digest(self) -> str:
        """Return a stable digest of the manifest state that affects resume correctness."""

        manifest_payload = json.dumps(
            self.state.manifest.model_dump(mode="json"),
            sort_keys=True,
            separators=(",", ":"),
        ).encode("utf-8")
        return hashlib.sha256(manifest_payload).hexdigest()

    async def _apply_entry_batch(
        self,
        entries: Sequence[tuple[Path, BaseEntry]],
        *,
        base_dir: Path,
    ) -> list[MaterializedFile]:
        applier = ManifestApplier(
            mkdir=lambda path: self.mkdir(path, parents=True),
            exec_checked_nonzero=self._exec_checked_nonzero,
            apply_entry=lambda artifact, dest, current_base_dir: artifact.apply(
                self,
                dest,
                current_base_dir,
            ),
            max_entry_concurrency=self._max_manifest_entry_concurrency,
        )
        return await applier._apply_entry_batch(entries, base_dir=base_dir)

    def _manifest_base_dir(self) -> Path:
        return Path.cwd()

    async def _exec_checked_nonzero(self, *command: str | Path) -> ExecResult:
        result = await self.exec(*command, shell=False)
        if not result.ok():
            raise ExecNonZeroError(result, command=command)
        return result

    async def _clear_workspace_root_on_resume(self) -> None:
        """
        Best-effort cleanup step for snapshot resume.

        We intentionally clear *contents* of the workspace root rather than deleting the root
        directory itself. Some sandboxes configure their process working directory to the workspace
        root (e.g. Modal sandboxes), and deleting the directory can make subsequent exec() calls
        fail with "failed to find initial working directory".
        """

        skip_rel_paths = self._workspace_resume_mount_skip_relpaths()
        if any(rel_path in (Path(""), Path(".")) for rel_path in skip_rel_paths):
            return

        await self._clear_workspace_dir_on_resume_pruned(
            current_dir=Path(self.state.manifest.root),
            skip_rel_paths=skip_rel_paths,
        )

    def _workspace_resume_mount_skip_relpaths(self) -> set[Path]:
        root = Path(self.state.manifest.root)
        skip_rel_paths: set[Path] = set()
        for _mount, mount_path in self.state.manifest.ephemeral_mount_targets():
            try:
                skip_rel_paths.add(mount_path.relative_to(root))
            except ValueError:
                continue
        return skip_rel_paths

    async def _clear_workspace_dir_on_resume_pruned(
        self,
        *,
        current_dir: Path,
        skip_rel_paths: set[Path],
    ) -> None:
        root = Path(self.state.manifest.root)
        try:
            entries = await self.ls(current_dir)
        except ExecNonZeroError:
            # If the root or subtree doesn't exist (or isn't listable), treat it as empty and let
            # hydrate/apply create it as needed.
            return

        for entry in entries:
            child = Path(entry.path)
            try:
                child_rel = child.relative_to(root)
            except ValueError:
                await self.rm(child, recursive=True)
                continue

            if child_rel in skip_rel_paths:
                continue
            if any(child_rel in skip_rel_path.parents for skip_rel_path in skip_rel_paths):
                if entry.kind == EntryKind.DIRECTORY:
                    await self._clear_workspace_dir_on_resume_pruned(
                        current_dir=child,
                        skip_rel_paths=skip_rel_paths,
                    )
                else:
                    await self.rm(child, recursive=True)
                continue
            # `parse_ls_la` filters "." and ".." already; remove everything else recursively.
            await self.rm(child, recursive=True)

stop async

stop() -> None

Persist/snapshot the workspace.

Note: stop() is intentionally persistence-only. Sandboxes that need to tear down sandbox resources (Docker containers, remote sessions, etc.) should implement shutdown() instead.

Source code in src/agents/sandbox/session/base_sandbox_session.py
async def stop(self) -> None:
    """
    Persist/snapshot the workspace.

    Note: `stop()` is intentionally persistence-only. Sandboxes that need to tear down
    sandbox resources (Docker containers, remote sessions, etc.) should implement
    `shutdown()` instead.
    """
    try:
        try:
            await self._before_stop()
            await self._persist_snapshot()
        except Exception as e:
            wrapped = self._wrap_stop_error(e)
            if wrapped is e:
                raise
            raise wrapped from e
    finally:
        await self._after_stop()

supports_docker_volume_mounts

supports_docker_volume_mounts() -> bool

Return whether this backend attaches Docker volume mounts before manifest apply.

Source code in src/agents/sandbox/session/base_sandbox_session.py
def supports_docker_volume_mounts(self) -> bool:
    """Return whether this backend attaches Docker volume mounts before manifest apply."""

    return False

shutdown async

shutdown() -> None

Tear down sandbox resources (best-effort).

Default is a no-op. Sandbox-specific sessions (e.g. Docker) should override.

Source code in src/agents/sandbox/session/base_sandbox_session.py
async def shutdown(self) -> None:
    """
    Tear down sandbox resources (best-effort).

    Default is a no-op. Sandbox-specific sessions (e.g. Docker) should override.
    """
    await self._before_shutdown()
    await self._shutdown_backend()
    await self._after_shutdown()

aclose async

aclose() -> None

Run the session cleanup lifecycle outside of async with.

This performs the same session-owned cleanup as __aexit__(): persist/snapshot the workspace via stop(), tear down session resources via shutdown(), and close session-scoped dependencies. If the session came from a sandbox client, call the client's delete() separately for backend-specific deletion such as removing a Docker container or deleting a temporary host workspace.

Source code in src/agents/sandbox/session/base_sandbox_session.py
async def aclose(self) -> None:
    """Run the session cleanup lifecycle outside of ``async with``.

    This performs the same session-owned cleanup as ``__aexit__()``: persist/snapshot the
    workspace via ``stop()``, tear down session resources via ``shutdown()``, and close
    session-scoped dependencies. If the session came from a sandbox client, call the client's
    ``delete()`` separately for backend-specific deletion such as removing a Docker container
    or deleting a temporary host workspace.
    """
    try:
        await self.run_pre_stop_hooks()
        await self.stop()
        await self.shutdown()
    finally:
        await self._aclose_dependencies()

register_pre_stop_hook

register_pre_stop_hook(
    hook: Callable[[], Awaitable[None]],
) -> None

Register an async hook to run once before the session workspace is persisted.

Source code in src/agents/sandbox/session/base_sandbox_session.py
def register_pre_stop_hook(self, hook: Callable[[], Awaitable[None]]) -> None:
    """Register an async hook to run once before the session workspace is persisted."""

    hooks = self._pre_stop_hooks
    if hooks is None:
        hooks = []
        self._pre_stop_hooks = hooks
    hooks.append(hook)
    self._pre_stop_hooks_ran = False

run_pre_stop_hooks async

run_pre_stop_hooks() -> None

Run registered pre-stop hooks once before workspace persistence.

Source code in src/agents/sandbox/session/base_sandbox_session.py
async def run_pre_stop_hooks(self) -> None:
    """Run registered pre-stop hooks once before workspace persistence."""

    hooks = self._pre_stop_hooks
    if hooks is None or self._pre_stop_hooks_ran:
        return
    self._pre_stop_hooks_ran = True
    cleanup_error: BaseException | None = None
    for hook in hooks:
        try:
            await hook()
        except BaseException as exc:
            if cleanup_error is None:
                cleanup_error = exc
    if cleanup_error is not None:
        raise cleanup_error

register_persist_workspace_skip_path

register_persist_workspace_skip_path(
    path: Path | str,
) -> Path

Exclude a runtime-created workspace path from future workspace snapshots.

Use this for session side effects that are not part of durable workspace state, such as generated mount config or ephemeral sink output.

Source code in src/agents/sandbox/session/base_sandbox_session.py
def register_persist_workspace_skip_path(self, path: Path | str) -> Path:
    """Exclude a runtime-created workspace path from future workspace snapshots.

    Use this for session side effects that are not part of durable workspace state, such as
    generated mount config or ephemeral sink output.
    """

    rel_path = Manifest._coerce_rel_path(path)
    Manifest._validate_rel_path(rel_path)
    if rel_path in (Path(""), Path(".")):
        raise ValueError("Persist workspace skip paths must target a concrete relative path.")
    overlapping_mounts = self._overlapping_mount_relpaths(rel_path)
    if overlapping_mounts:
        overlapping_mount = min(overlapping_mounts, key=lambda p: (len(p.parts), p.as_posix()))
        raise MountConfigError(
            message="persist workspace skip path must not overlap mount path",
            context={
                "skip_path": rel_path.as_posix(),
                "mount_path": overlapping_mount.as_posix(),
            },
        )

    if self._runtime_persist_workspace_skip_relpaths is None:
        self._runtime_persist_workspace_skip_relpaths = set()
    self._runtime_persist_workspace_skip_relpaths.add(rel_path)
    return rel_path

exec async

exec(
    *command: str | Path,
    timeout: float | None = None,
    shell: bool | list[str] = True,
    user: str | User | None = None,
) -> ExecResult

Execute a command inside the session.

:param command: Command and args (will be stringified). :param timeout: Optional wall-clock timeout in seconds. :param shell: Whether to run this command in a shell. If True is provided, the command will be run prefixed by sh -lc. A custom shell prefix may be used by providing a list.

:returns: An ExecResult containing stdout/stderr and exit code.

:raises TimeoutError: If the sandbox cannot complete within timeout.

Source code in src/agents/sandbox/session/base_sandbox_session.py
async def exec(
    self,
    *command: str | Path,
    timeout: float | None = None,
    shell: bool | list[str] = True,
    user: str | User | None = None,
) -> ExecResult:
    """Execute a command inside the session.

    :param command: Command and args (will be stringified).
    :param timeout: Optional wall-clock timeout in seconds.
    :param shell: Whether to run this command in a shell. If ``True`` is provided,
        the command will be run prefixed by ``sh -lc``. A custom shell prefix may be used
        by providing a list.

    :returns: An ``ExecResult`` containing stdout/stderr and exit code.

    :raises TimeoutError: If the sandbox cannot complete within `timeout`.
    """

    sanitized_command = self._prepare_exec_command(*command, shell=shell, user=user)
    return await self._exec_internal(*sanitized_command, timeout=timeout)

read abstractmethod async

read(
    path: Path, *, user: str | User | None = None
) -> IOBase

Read a file from the session's workspace.

:param path: Absolute path in the container or path relative to the workspace root. :param user: Optional sandbox user to perform the read as. :returns: A readable file-like object. :raises: FileNotFoundError: If the path does not exist.

Source code in src/agents/sandbox/session/base_sandbox_session.py
@abc.abstractmethod
async def read(self, path: Path, *, user: str | User | None = None) -> io.IOBase:
    """Read a file from the session's workspace.

    :param path: Absolute path in the container or path relative to the
            workspace root.
    :param user: Optional sandbox user to perform the read as.
    :returns: A readable file-like object.
    :raises: FileNotFoundError: If the path does not exist.
    """

write abstractmethod async

write(
    path: Path,
    data: IOBase,
    *,
    user: str | User | None = None,
) -> None

Write a file into the session's workspace.

:param path: Absolute path in the container or path relative to the workspace root. :param data: A file-like object positioned at the start of the payload. :param user: Optional sandbox user to perform the write as.

Source code in src/agents/sandbox/session/base_sandbox_session.py
@abc.abstractmethod
async def write(
    self,
    path: Path,
    data: io.IOBase,
    *,
    user: str | User | None = None,
) -> None:
    """Write a file into the session's workspace.

    :param path: Absolute path in the container or path relative to the
            workspace root.
    :param data: A file-like object positioned at the start of the payload.
    :param user: Optional sandbox user to perform the write as.
    """

running abstractmethod async

running() -> bool

:returns: whether the underlying sandbox is currently running.

Source code in src/agents/sandbox/session/base_sandbox_session.py
@abc.abstractmethod
async def running(self) -> bool:
    """
    :returns: whether the underlying sandbox is currently running.
    """

persist_workspace abstractmethod async

persist_workspace() -> IOBase

Serialize the session's workspace into a byte stream.

:returns: A readable byte stream representing the workspace contents. Portable tar streams must use workspace-relative member paths rather than embedding the source backend's workspace root directory.

Source code in src/agents/sandbox/session/base_sandbox_session.py
@abc.abstractmethod
async def persist_workspace(self) -> io.IOBase:
    """Serialize the session's workspace into a byte stream.

    :returns: A readable byte stream representing the workspace contents.
        Portable tar streams must use workspace-relative member paths rather than
        embedding the source backend's workspace root directory.
    """

hydrate_workspace abstractmethod async

hydrate_workspace(data: IOBase) -> None

Populate the session's workspace from a serialized byte stream.

:param data: A readable byte stream as produced by persist_workspace. Portable tar streams are extracted underneath this session's workspace root.

Source code in src/agents/sandbox/session/base_sandbox_session.py
@abc.abstractmethod
async def hydrate_workspace(self, data: io.IOBase) -> None:
    """Populate the session's workspace from a serialized byte stream.

    :param data: A readable byte stream as produced by `persist_workspace`.
        Portable tar streams are extracted underneath this session's workspace root.
    """

ls async

ls(
    path: Path | str, *, user: str | User | None = None
) -> list[FileEntry]

List directory contents.

:param path: Path to list. :param user: Optional sandbox user to list as. :returns: A list of FileEntry objects.

Source code in src/agents/sandbox/session/base_sandbox_session.py
async def ls(
    self,
    path: Path | str,
    *,
    user: str | User | None = None,
) -> list[FileEntry]:
    """List directory contents.

    :param path: Path to list.
    :param user: Optional sandbox user to list as.
    :returns: A list of `FileEntry` objects.
    """
    path = await self._normalize_path_for_io(path)

    cmd = ("ls", "-la", "--", str(path))
    result = await self.exec(*cmd, shell=False, user=user)
    if not result.ok():
        raise ExecNonZeroError(result, command=cmd)

    return parse_ls_la(result.stdout.decode("utf-8", errors="replace"), base=str(path))

rm async

rm(
    path: Path | str,
    *,
    recursive: bool = False,
    user: str | User | None = None,
) -> None

Remove a file or directory.

:param path: Path to remove. :param recursive: If true, remove directories recursively. :param user: Optional sandbox user to remove as.

Source code in src/agents/sandbox/session/base_sandbox_session.py
async def rm(
    self,
    path: Path | str,
    *,
    recursive: bool = False,
    user: str | User | None = None,
) -> None:
    """Remove a file or directory.

    :param path: Path to remove.
    :param recursive: If true, remove directories recursively.
    :param user: Optional sandbox user to remove as.
    """
    path = await self._normalize_path_for_io(path)

    cmd: list[str] = ["rm"]
    if recursive:
        cmd.append("-rf")
    cmd.extend(["--", str(path)])

    result = await self.exec(*cmd, shell=False, user=user)
    if not result.ok():
        raise ExecNonZeroError(result, command=cmd)

mkdir async

mkdir(
    path: Path | str,
    *,
    parents: bool = False,
    user: str | User | None = None,
) -> None

Create a directory.

:param path: Directory to create on the remote. :param parents: If true, create missing parents. :param user: Optional sandbox user to create the directory as.

Source code in src/agents/sandbox/session/base_sandbox_session.py
async def mkdir(
    self,
    path: Path | str,
    *,
    parents: bool = False,
    user: str | User | None = None,
) -> None:
    """Create a directory.

    :param path: Directory to create on the remote.
    :param parents: If true, create missing parents.
    :param user: Optional sandbox user to create the directory as.
    """
    path = await self._normalize_path_for_io(path)

    cmd: list[str] = ["mkdir"]
    if parents:
        cmd.append("-p")
    cmd.append(str(path))

    result = await self.exec(*cmd, shell=False, user=user)
    if not result.ok():
        raise ExecNonZeroError(result, command=cmd)

extract async

extract(
    path: Path | str,
    data: IOBase,
    *,
    compression_scheme: Literal["tar", "zip"] | None = None,
) -> None

Write a compressed archive to a destination on the remote. Optionally extract the archive once written.

:param path: Path on the host machine to extract to :param data: a file-like io stream. :param compression_scheme: either "tar" or "zip". If not provided, it will try to infer from the path.

Source code in src/agents/sandbox/session/base_sandbox_session.py
async def extract(
    self,
    path: Path | str,
    data: io.IOBase,
    *,
    compression_scheme: Literal["tar", "zip"] | None = None,
) -> None:
    """
    Write a compressed archive to a destination on the remote.
    Optionally extract the archive once written.

    :param path: Path on the host machine to extract to
    :param data: a file-like io stream.
    :param compression_scheme: either "tar" or "zip". If not provided,
        it will try to infer from the path.
    """
    if isinstance(path, str):
        path = Path(path)

    if compression_scheme is None:
        suffix = path.suffix.removeprefix(".")
        compression_scheme = cast(Literal["tar", "zip"], suffix) if suffix else None

    if compression_scheme is None or compression_scheme not in ["zip", "tar"]:
        raise InvalidCompressionSchemeError(path=path, scheme=compression_scheme)

    normalized_path = await self._normalize_path_for_io(path)
    destination_root = normalized_path.parent

    # Materialize the archive into a local spool once because both `write()` and the
    # extraction step consume the stream, and zip extraction may require seeking.
    spool = tempfile.SpooledTemporaryFile(max_size=16 * 1024 * 1024, mode="w+b")
    try:
        shutil.copyfileobj(data, spool)
        spool.seek(0)
        await self.write(normalized_path, spool)
        spool.seek(0)

        if compression_scheme == "tar":
            await self._extract_tar_archive(
                archive_path=normalized_path,
                destination_root=destination_root,
                data=spool,
            )
        else:
            await self._extract_zip_archive(
                archive_path=normalized_path,
                destination_root=destination_root,
                data=spool,
            )
    finally:
        spool.close()

should_provision_manifest_accounts_on_resume

should_provision_manifest_accounts_on_resume() -> bool

Return whether resume should reprovision manifest-managed users and groups.

Source code in src/agents/sandbox/session/base_sandbox_session.py
def should_provision_manifest_accounts_on_resume(self) -> bool:
    """Return whether resume should reprovision manifest-managed users and groups."""

    return not self._system_state_preserved_on_start()