Spaces:
Running
on
Zero
Running
on
Zero
| from typing import * | |
| import torch | |
| import torch.nn as nn | |
| from .. import SparseTensor | |
| __all__ = [ | |
| 'SparseDownsample', | |
| 'SparseUpsample', | |
| ] | |
| class SparseDownsample(nn.Module): | |
| """ | |
| Downsample a sparse tensor by a factor of `factor`. | |
| Implemented as average pooling. | |
| """ | |
| def __init__(self, factor: int, mode: Literal['mean', 'max'] = 'mean'): | |
| super(SparseDownsample, self).__init__() | |
| self.factor = factor | |
| self.mode = mode | |
| assert self.mode in ['mean', 'max'], f'Invalid mode: {self.mode}' | |
| def forward(self, x: SparseTensor) -> SparseTensor: | |
| cache = x.get_spatial_cache(f'downsample_{self.factor}') | |
| if cache is None: | |
| DIM = x.coords.shape[-1] - 1 | |
| coord = list(x.coords.unbind(dim=-1)) | |
| for i in range(DIM): | |
| coord[i+1] = coord[i+1] // self.factor | |
| MAX = [(s + self.factor - 1) // self.factor for s in x.spatial_shape] | |
| OFFSET = torch.cumprod(torch.tensor(MAX[::-1]), 0).tolist()[::-1] + [1] | |
| code = sum([c * o for c, o in zip(coord, OFFSET)]) | |
| code, idx = code.unique(return_inverse=True) | |
| new_coords = torch.stack( | |
| [code // OFFSET[0]] + | |
| [(code // OFFSET[i+1]) % MAX[i] for i in range(DIM)], | |
| dim=-1 | |
| ) | |
| else: | |
| new_coords, idx = cache | |
| new_feats = torch.scatter_reduce( | |
| torch.zeros(new_coords.shape[0], x.feats.shape[1], device=x.feats.device, dtype=x.feats.dtype), | |
| dim=0, | |
| index=idx.unsqueeze(1).expand(-1, x.feats.shape[1]), | |
| src=x.feats, | |
| reduce=self.mode, | |
| include_self=False, | |
| ) | |
| out = SparseTensor(new_feats, new_coords, x._shape) | |
| out._scale = tuple([s * self.factor for s in x._scale]) | |
| out._spatial_cache = x._spatial_cache | |
| if cache is None: | |
| x.register_spatial_cache(f'downsample_{self.factor}', (new_coords, idx)) | |
| out.register_spatial_cache(f'upsample_{self.factor}', (x.coords, idx)) | |
| out.register_spatial_cache(f'shape', torch.Size(MAX)) | |
| if self.training: | |
| subidx = x.coords[:, 1:] % self.factor | |
| subidx = sum([subidx[..., i] * self.factor ** i for i in range(DIM)]) | |
| subdivision = torch.zeros((new_coords.shape[0], self.factor ** DIM), device=x.device, dtype=torch.bool) | |
| subdivision[idx, subidx] = True | |
| out.register_spatial_cache(f'subdivision', subdivision) | |
| return out | |
| class SparseUpsample(nn.Module): | |
| """ | |
| Upsample a sparse tensor by a factor of `factor`. | |
| Implemented as nearest neighbor interpolation. | |
| """ | |
| def __init__( | |
| self, factor: int | |
| ): | |
| super(SparseUpsample, self).__init__() | |
| self.factor = factor | |
| def forward(self, x: SparseTensor, subdivision: Optional[SparseTensor] = None) -> SparseTensor: | |
| DIM = x.coords.shape[-1] - 1 | |
| cache = x.get_spatial_cache(f'upsample_{self.factor}') | |
| if cache is None: | |
| if subdivision is None: | |
| raise ValueError('Cache not found. Provide subdivision tensor or pair SparseUpsample with SparseDownsample.') | |
| else: | |
| sub = subdivision.feats | |
| N_leaf = sub.sum(dim=-1) | |
| subidx = sub.nonzero()[:, -1] | |
| new_coords = x.coords.clone().detach() | |
| new_coords[:, 1:] *= self.factor | |
| new_coords = torch.repeat_interleave(new_coords, N_leaf, dim=0, output_size=subidx.shape[0]) | |
| for i in range(DIM): | |
| new_coords[:, i+1] += subidx // self.factor ** i % self.factor | |
| idx = torch.repeat_interleave(torch.arange(x.coords.shape[0], device=x.device), N_leaf, dim=0, output_size=subidx.shape[0]) | |
| else: | |
| new_coords, idx = cache | |
| new_feats = x.feats[idx] | |
| out = SparseTensor(new_feats, new_coords, x._shape) | |
| out._scale = tuple([s / self.factor for s in x._scale]) | |
| if cache is not None: # only keep cache when subdiv following it | |
| out._spatial_cache = x._spatial_cache | |
| return out | |