class nccl_symm_mem_context:
def __init__(
self,
pynccl_comm: PyNcclCommunicator,
disabled: bool = False,
):
self.disabled = (disabled or not is_symmetric_memory_enabled()
or pynccl_comm.world_size == 1
or not current_platform.is_cuda()
or get_nccl_mem_pool() is None or version.parse(
torch.__version__) < version.parse("2.8.0.a0"))
if self.disabled:
self.pynccl_comm: Optional[PyNcclCommunicator] = None
self._mem_pool_ctx: contextlib.AbstractContextManager[
Any] = contextlib.nullcontext()
self.is_graph_capture = None
self.device = None
else:
self.pynccl_comm = pynccl_comm
self._mem_pool_ctx = torch.cuda.use_mem_pool(get_nccl_mem_pool())
self.is_graph_capture = torch.cuda.is_current_stream_capturing()
self.device = torch.cuda.current_device()
def __enter__(self):
if self.disabled:
return self
assert (
self.pynccl_comm
is not None), "Symmetric memory requires pynccl to be initalized"
assert (
self.pynccl_comm.nccl_version >= 22703
), "NCCL version 2.27.3 or higher is required for NCCL symmetric memory"
if self.is_graph_capture:
assert (
_graph_pool_id
is not None), "graph_pool_id is not set under graph capture"
# Pause graph memory pool to use symmetric memory with cuda graph
torch._C._cuda_endAllocateToPool(self.device, _graph_pool_id)
self._mem_pool_ctx.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.disabled:
return
global _cached_pool_snapshot
global _registered_base_addrs
self._mem_pool_ctx.__exit__(exc_type, exc_val, exc_tb)
_pool = get_nccl_mem_pool()
assert _pool is not None
_cached_pool_snapshot = _pool.snapshot()
assert self.pynccl_comm is not None
for segment in _cached_pool_snapshot:
if segment["address"] not in _registered_base_addrs:
self.pynccl_comm.register_comm_window_raw(
segment["address"], segment["total_size"])
_registered_base_addrs.add(segment["address"])
if self.is_graph_capture:
torch._C._cuda_beginAllocateCurrentThreadToPool(
self.device, _graph_pool_id)