Spaces:
Runtime error
Runtime error
| # Copyright (c) Facebook, Inc. and its affiliates. | |
| # Modified by Bowen Cheng from https://github.com/sukjunhwang/IFC | |
| import copy | |
| import logging | |
| import random | |
| import numpy as np | |
| from typing import List, Union | |
| import torch | |
| from detectron2.config import configurable | |
| from detectron2.structures import ( | |
| BitMasks, | |
| Boxes, | |
| BoxMode, | |
| Instances, | |
| ) | |
| from detectron2.data import detection_utils as utils | |
| from detectron2.data import transforms as T | |
| from .augmentation import build_augmentation | |
| __all__ = ["YTVISDatasetMapper", "CocoClipDatasetMapper"] | |
| def filter_empty_instances(instances, by_box=True, by_mask=True, box_threshold=1e-5): | |
| """ | |
| Filter out empty instances in an `Instances` object. | |
| Args: | |
| instances (Instances): | |
| by_box (bool): whether to filter out instances with empty boxes | |
| by_mask (bool): whether to filter out instances with empty masks | |
| box_threshold (float): minimum width and height to be considered non-empty | |
| Returns: | |
| Instances: the filtered instances. | |
| """ | |
| assert by_box or by_mask | |
| r = [] | |
| if by_box: | |
| r.append(instances.gt_boxes.nonempty(threshold=box_threshold)) | |
| if instances.has("gt_masks") and by_mask: | |
| r.append(instances.gt_masks.nonempty()) | |
| if not r: | |
| return instances | |
| m = r[0] | |
| for x in r[1:]: | |
| m = m & x | |
| instances.gt_ids[~m] = -1 | |
| return instances | |
| def _get_dummy_anno(num_classes): | |
| return { | |
| "iscrowd": 0, | |
| "category_id": num_classes, | |
| "id": -1, | |
| "bbox": np.array([0, 0, 0, 0]), | |
| "bbox_mode": BoxMode.XYXY_ABS, | |
| "segmentation": [np.array([0.0] * 6)] | |
| } | |
| def ytvis_annotations_to_instances(annos, image_size): | |
| """ | |
| Create an :class:`Instances` object used by the models, | |
| from instance annotations in the dataset dict. | |
| Args: | |
| annos (list[dict]): a list of instance annotations in one image, each | |
| element for one instance. | |
| image_size (tuple): height, width | |
| Returns: | |
| Instances: | |
| It will contain fields "gt_boxes", "gt_classes", "gt_ids", | |
| "gt_masks", if they can be obtained from `annos`. | |
| This is the format that builtin models expect. | |
| """ | |
| boxes = [BoxMode.convert(obj["bbox"], obj["bbox_mode"], BoxMode.XYXY_ABS) for obj in annos] | |
| target = Instances(image_size) | |
| target.gt_boxes = Boxes(boxes) | |
| classes = [int(obj["category_id"]) for obj in annos] | |
| classes = torch.tensor(classes, dtype=torch.int64) | |
| target.gt_classes = classes | |
| ids = [int(obj["id"]) for obj in annos] | |
| ids = torch.tensor(ids, dtype=torch.int64) | |
| target.gt_ids = ids | |
| if len(annos) and "segmentation" in annos[0]: | |
| segms = [obj["segmentation"] for obj in annos] | |
| masks = [] | |
| for segm in segms: | |
| assert segm.ndim == 2, "Expect segmentation of 2 dimensions, got {}.".format( | |
| segm.ndim | |
| ) | |
| # mask array | |
| masks.append(segm) | |
| # torch.from_numpy does not support array with negative stride. | |
| masks = BitMasks( | |
| torch.stack([torch.from_numpy(np.ascontiguousarray(x)) for x in masks]) | |
| ) | |
| target.gt_masks = masks | |
| return target | |
| class YTVISDatasetMapper: | |
| """ | |
| A callable which takes a dataset dict in YouTube-VIS Dataset format, | |
| and map it into a format used by the model. | |
| """ | |
| def __init__( | |
| self, | |
| is_train: bool, | |
| *, | |
| augmentations: List[Union[T.Augmentation, T.Transform]], | |
| image_format: str, | |
| use_instance_mask: bool = False, | |
| sampling_frame_num: int = 2, | |
| sampling_frame_range: int = 5, | |
| sampling_frame_shuffle: bool = False, | |
| num_classes: int = 40, | |
| ): | |
| """ | |
| NOTE: this interface is experimental. | |
| Args: | |
| is_train: whether it's used in training or inference | |
| augmentations: a list of augmentations or deterministic transforms to apply | |
| image_format: an image format supported by :func:`detection_utils.read_image`. | |
| use_instance_mask: whether to process instance segmentation annotations, if available | |
| """ | |
| # fmt: off | |
| self.is_train = is_train | |
| self.augmentations = T.AugmentationList(augmentations) | |
| self.image_format = image_format | |
| self.use_instance_mask = use_instance_mask | |
| self.sampling_frame_num = sampling_frame_num | |
| self.sampling_frame_range = sampling_frame_range | |
| self.sampling_frame_shuffle = sampling_frame_shuffle | |
| self.num_classes = num_classes | |
| # fmt: on | |
| logger = logging.getLogger(__name__) | |
| mode = "training" if is_train else "inference" | |
| logger.info(f"[DatasetMapper] Augmentations used in {mode}: {augmentations}") | |
| def from_config(cls, cfg, is_train: bool = True): | |
| augs = build_augmentation(cfg, is_train) | |
| sampling_frame_num = cfg.INPUT.SAMPLING_FRAME_NUM | |
| sampling_frame_range = cfg.INPUT.SAMPLING_FRAME_RANGE | |
| sampling_frame_shuffle = cfg.INPUT.SAMPLING_FRAME_SHUFFLE | |
| ret = { | |
| "is_train": is_train, | |
| "augmentations": augs, | |
| "image_format": cfg.INPUT.FORMAT, | |
| "use_instance_mask": cfg.MODEL.MASK_ON, | |
| "sampling_frame_num": sampling_frame_num, | |
| "sampling_frame_range": sampling_frame_range, | |
| "sampling_frame_shuffle": sampling_frame_shuffle, | |
| "num_classes": cfg.MODEL.SEM_SEG_HEAD.NUM_CLASSES, | |
| } | |
| return ret | |
| def __call__(self, dataset_dict): | |
| """ | |
| Args: | |
| dataset_dict (dict): Metadata of one video, in YTVIS Dataset format. | |
| Returns: | |
| dict: a format that builtin models in detectron2 accept | |
| """ | |
| # TODO consider examining below deepcopy as it costs huge amount of computations. | |
| dataset_dict = copy.deepcopy(dataset_dict) # it will be modified by code below | |
| video_length = dataset_dict["length"] | |
| if self.is_train: | |
| ref_frame = random.randrange(video_length) | |
| start_idx = max(0, ref_frame-self.sampling_frame_range) | |
| end_idx = min(video_length, ref_frame+self.sampling_frame_range + 1) | |
| selected_idx = np.random.choice( | |
| np.array(list(range(start_idx, ref_frame)) + list(range(ref_frame+1, end_idx))), | |
| self.sampling_frame_num - 1, | |
| ) | |
| selected_idx = selected_idx.tolist() + [ref_frame] | |
| selected_idx = sorted(selected_idx) | |
| if self.sampling_frame_shuffle: | |
| random.shuffle(selected_idx) | |
| else: | |
| selected_idx = range(video_length) | |
| video_annos = dataset_dict.pop("annotations", None) | |
| file_names = dataset_dict.pop("file_names", None) | |
| if self.is_train: | |
| _ids = set() | |
| for frame_idx in selected_idx: | |
| _ids.update([anno["id"] for anno in video_annos[frame_idx]]) | |
| ids = dict() | |
| for i, _id in enumerate(_ids): | |
| ids[_id] = i | |
| dataset_dict["image"] = [] | |
| dataset_dict["instances"] = [] | |
| dataset_dict["file_names"] = [] | |
| for frame_idx in selected_idx: | |
| dataset_dict["file_names"].append(file_names[frame_idx]) | |
| # Read image | |
| image = utils.read_image(file_names[frame_idx], format=self.image_format) | |
| utils.check_image_size(dataset_dict, image) | |
| aug_input = T.AugInput(image) | |
| transforms = self.augmentations(aug_input) | |
| image = aug_input.image | |
| image_shape = image.shape[:2] # h, w | |
| # Pytorch's dataloader is efficient on torch.Tensor due to shared-memory, | |
| # but not efficient on large generic data structures due to the use of pickle & mp.Queue. | |
| # Therefore it's important to use torch.Tensor. | |
| dataset_dict["image"].append(torch.as_tensor(np.ascontiguousarray(image.transpose(2, 0, 1)))) | |
| if (video_annos is None) or (not self.is_train): | |
| continue | |
| # NOTE copy() is to prevent annotations getting changed from applying augmentations | |
| _frame_annos = [] | |
| for anno in video_annos[frame_idx]: | |
| _anno = {} | |
| for k, v in anno.items(): | |
| _anno[k] = copy.deepcopy(v) | |
| _frame_annos.append(_anno) | |
| # USER: Implement additional transformations if you have other types of data | |
| annos = [ | |
| utils.transform_instance_annotations(obj, transforms, image_shape) | |
| for obj in _frame_annos | |
| if obj.get("iscrowd", 0) == 0 | |
| ] | |
| sorted_annos = [_get_dummy_anno(self.num_classes) for _ in range(len(ids))] | |
| for _anno in annos: | |
| idx = ids[_anno["id"]] | |
| sorted_annos[idx] = _anno | |
| _gt_ids = [_anno["id"] for _anno in sorted_annos] | |
| instances = utils.annotations_to_instances(sorted_annos, image_shape, mask_format="bitmask") | |
| instances.gt_ids = torch.tensor(_gt_ids) | |
| if instances.has("gt_masks"): | |
| instances.gt_boxes = instances.gt_masks.get_bounding_boxes() | |
| instances = filter_empty_instances(instances) | |
| else: | |
| instances.gt_masks = BitMasks(torch.empty((0, *image_shape))) | |
| dataset_dict["instances"].append(instances) | |
| return dataset_dict | |
| class CocoClipDatasetMapper: | |
| """ | |
| A callable which takes a COCO image which converts into multiple frames, | |
| and map it into a format used by the model. | |
| """ | |
| def __init__( | |
| self, | |
| is_train: bool, | |
| *, | |
| augmentations: List[Union[T.Augmentation, T.Transform]], | |
| image_format: str, | |
| use_instance_mask: bool = False, | |
| sampling_frame_num: int = 2, | |
| ): | |
| """ | |
| NOTE: this interface is experimental. | |
| Args: | |
| is_train: whether it's used in training or inference | |
| augmentations: a list of augmentations or deterministic transforms to apply | |
| image_format: an image format supported by :func:`detection_utils.read_image`. | |
| use_instance_mask: whether to process instance segmentation annotations, if available | |
| """ | |
| # fmt: off | |
| self.is_train = is_train | |
| self.augmentations = T.AugmentationList(augmentations) | |
| self.image_format = image_format | |
| self.use_instance_mask = use_instance_mask | |
| self.sampling_frame_num = sampling_frame_num | |
| # fmt: on | |
| logger = logging.getLogger(__name__) | |
| mode = "training" if is_train else "inference" | |
| logger.info(f"[DatasetMapper] Augmentations used in {mode}: {augmentations}") | |
| def from_config(cls, cfg, is_train: bool = True): | |
| augs = build_augmentation(cfg, is_train) | |
| sampling_frame_num = cfg.INPUT.SAMPLING_FRAME_NUM | |
| ret = { | |
| "is_train": is_train, | |
| "augmentations": augs, | |
| "image_format": cfg.INPUT.FORMAT, | |
| "use_instance_mask": cfg.MODEL.MASK_ON, | |
| "sampling_frame_num": sampling_frame_num, | |
| } | |
| return ret | |
| def __call__(self, dataset_dict): | |
| """ | |
| Args: | |
| dataset_dict (dict): Metadata of one image, in Detectron2 Dataset format. | |
| Returns: | |
| dict: a format that builtin models in detectron2 accept | |
| """ | |
| dataset_dict = copy.deepcopy(dataset_dict) # it will be modified by code below | |
| img_annos = dataset_dict.pop("annotations", None) | |
| file_name = dataset_dict.pop("file_name", None) | |
| original_image = utils.read_image(file_name, format=self.image_format) | |
| dataset_dict["image"] = [] | |
| dataset_dict["instances"] = [] | |
| dataset_dict["file_names"] = [file_name] * self.sampling_frame_num | |
| for _ in range(self.sampling_frame_num): | |
| utils.check_image_size(dataset_dict, original_image) | |
| aug_input = T.AugInput(original_image) | |
| transforms = self.augmentations(aug_input) | |
| image = aug_input.image | |
| image_shape = image.shape[:2] # h, w | |
| # Pytorch's dataloader is efficient on torch.Tensor due to shared-memory, | |
| # but not efficient on large generic data structures due to the use of pickle & mp.Queue. | |
| # Therefore it's important to use torch.Tensor. | |
| dataset_dict["image"].append(torch.as_tensor(np.ascontiguousarray(image.transpose(2, 0, 1)))) | |
| if (img_annos is None) or (not self.is_train): | |
| continue | |
| _img_annos = [] | |
| for anno in img_annos: | |
| _anno = {} | |
| for k, v in anno.items(): | |
| _anno[k] = copy.deepcopy(v) | |
| _img_annos.append(_anno) | |
| # USER: Implement additional transformations if you have other types of data | |
| annos = [ | |
| utils.transform_instance_annotations(obj, transforms, image_shape) | |
| for obj in _img_annos | |
| if obj.get("iscrowd", 0) == 0 | |
| ] | |
| _gt_ids = list(range(len(annos))) | |
| for idx in range(len(annos)): | |
| if len(annos[idx]["segmentation"]) == 0: | |
| annos[idx]["segmentation"] = [np.array([0.0] * 6)] | |
| instances = utils.annotations_to_instances(annos, image_shape, mask_format="bitmask") | |
| instances.gt_ids = torch.tensor(_gt_ids) | |
| if instances.has("gt_masks"): | |
| instances.gt_boxes = instances.gt_masks.get_bounding_boxes() | |
| instances = filter_empty_instances(instances) | |
| else: | |
| instances.gt_masks = BitMasks(torch.empty((0, *image_shape))) | |
| dataset_dict["instances"].append(instances) | |
| return dataset_dict | |