# Copyright (C) 2019-2022 Intel Corporation
#
# SPDX-License-Identifier: MIT
# pylint: disable=no-self-use
import os
import os.path as osp
import shutil
import numpy as np
import pycocotools.mask as mask_utils
from datumaro.components.annotation import (
Annotation,
Bbox,
Caption,
Cuboid3d,
Label,
LabelCategories,
Mask,
MaskCategories,
Points,
PointsCategories,
Polygon,
PolyLine,
RleMask,
_Shape,
)
from datumaro.components.converter import Converter
from datumaro.components.dataset import ItemStatus
from datumaro.components.extractor import DEFAULT_SUBSET_NAME, DatasetItem
from datumaro.components.media import Image, MediaElement, PointCloud
from datumaro.util import cast, dump_json_file
from .format import DatumaroPath
class _SubsetWriter:
def __init__(self, context):
self._context = context
self._data = {
"info": {},
"categories": {},
"items": [],
}
@property
def categories(self):
return self._data["categories"]
@property
def items(self):
return self._data["items"]
def is_empty(self):
return not self.items
def add_item(self, item: DatasetItem):
annotations = []
item_desc = {
"id": item.id,
"annotations": annotations,
}
if item.attributes:
item_desc["attr"] = item.attributes
if isinstance(item.media, Image):
image = item.media_as(Image)
path = image.path
if self._context._save_media:
path = self._context._make_image_filename(item)
self._context._save_image(
item, osp.join(self._context._images_dir, item.subset, path)
)
item_desc["image"] = {
"path": path,
}
if item.media.has_size: # avoid occasional loading
item_desc["image"]["size"] = item.media.size
elif isinstance(item.media, PointCloud):
pcd = item.media_as(PointCloud)
path = pcd.path
if self._context._save_media:
path = self._context._make_pcd_filename(item)
self._context._save_point_cloud(
item, osp.join(self._context._pcd_dir, item.subset, path)
)
item_desc["point_cloud"] = {"path": path}
images = sorted(pcd.extra_images, key=lambda v: v.path)
if self._context._save_media:
related_images = []
for i, img in enumerate(images):
ri_desc = {}
# Images can have completely the same names or don't
# have them at all, so we just rename them
ri_desc["path"] = f"image_{i}{self._context._find_image_ext(img)}"
if img.has_data:
img.save(
osp.join(
self._context._related_images_dir,
item.subset,
item.id,
ri_desc["path"],
)
)
if img.has_size:
ri_desc["size"] = img.size
related_images.append(ri_desc)
else:
related_images = [{"path": img.path} for img in images]
if related_images:
item_desc["related_images"] = related_images
if isinstance(item.media, MediaElement):
item_desc["media"] = {"path": item.media.path}
self.items.append(item_desc)
for ann in item.annotations:
if isinstance(ann, Label):
converted_ann = self._convert_label_object(ann)
elif isinstance(ann, Mask):
converted_ann = self._convert_mask_object(ann)
elif isinstance(ann, Points):
converted_ann = self._convert_points_object(ann)
elif isinstance(ann, PolyLine):
converted_ann = self._convert_polyline_object(ann)
elif isinstance(ann, Polygon):
converted_ann = self._convert_polygon_object(ann)
elif isinstance(ann, Bbox):
converted_ann = self._convert_bbox_object(ann)
elif isinstance(ann, Caption):
converted_ann = self._convert_caption_object(ann)
elif isinstance(ann, Cuboid3d):
converted_ann = self._convert_cuboid_3d_object(ann)
else:
raise NotImplementedError()
annotations.append(converted_ann)
def add_categories(self, categories):
for ann_type, desc in categories.items():
if isinstance(desc, LabelCategories):
converted_desc = self._convert_label_categories(desc)
elif isinstance(desc, MaskCategories):
converted_desc = self._convert_mask_categories(desc)
elif isinstance(desc, PointsCategories):
converted_desc = self._convert_points_categories(desc)
else:
raise NotImplementedError()
self.categories[ann_type.name] = converted_desc
def write(self, ann_file):
dump_json_file(ann_file, self._data)
def _convert_annotation(self, obj):
assert isinstance(obj, Annotation)
ann_json = {
"id": cast(obj.id, int),
"type": cast(obj.type.name, str),
"attributes": obj.attributes,
"group": cast(obj.group, int, 0),
}
return ann_json
def _convert_label_object(self, obj):
converted = self._convert_annotation(obj)
converted.update(
{
"label_id": cast(obj.label, int),
}
)
return converted
def _convert_mask_object(self, obj):
converted = self._convert_annotation(obj)
if isinstance(obj, RleMask):
rle = obj.rle
else:
rle = mask_utils.encode(np.require(obj.image, dtype=np.uint8, requirements="F"))
if isinstance(rle["counts"], str):
counts = rle["counts"]
else:
counts = rle["counts"].decode("ascii")
converted.update(
{
"label_id": cast(obj.label, int),
"rle": {
# serialize as compressed COCO mask
"counts": counts,
"size": list(int(c) for c in rle["size"]),
},
"z_order": obj.z_order,
}
)
return converted
def _convert_shape_object(self, obj):
assert isinstance(obj, _Shape)
converted = self._convert_annotation(obj)
converted.update(
{
"label_id": cast(obj.label, int),
"points": [float(p) for p in obj.points],
"z_order": obj.z_order,
}
)
return converted
def _convert_polyline_object(self, obj):
return self._convert_shape_object(obj)
def _convert_polygon_object(self, obj):
return self._convert_shape_object(obj)
def _convert_bbox_object(self, obj):
converted = self._convert_shape_object(obj)
converted.pop("points", None)
converted["bbox"] = [float(p) for p in obj.get_bbox()]
return converted
def _convert_points_object(self, obj):
converted = self._convert_shape_object(obj)
converted.update(
{
"visibility": [int(v.value) for v in obj.visibility],
}
)
return converted
def _convert_caption_object(self, obj):
converted = self._convert_annotation(obj)
converted.update(
{
"caption": cast(obj.caption, str),
}
)
return converted
def _convert_cuboid_3d_object(self, obj):
converted = self._convert_annotation(obj)
converted.update(
{
"label_id": cast(obj.label, int),
"position": [float(p) for p in obj.position],
"rotation": [float(p) for p in obj.rotation],
"scale": [float(p) for p in obj.scale],
}
)
return converted
def _convert_attribute_categories(self, attributes):
return sorted(attributes)
def _convert_label_categories(self, obj):
converted = {
"labels": [],
"attributes": self._convert_attribute_categories(obj.attributes),
}
for label in obj.items:
converted["labels"].append(
{
"name": cast(label.name, str),
"parent": cast(label.parent, str),
"attributes": self._convert_attribute_categories(label.attributes),
}
)
return converted
def _convert_mask_categories(self, obj):
converted = {
"colormap": [],
}
for label_id, color in obj.colormap.items():
converted["colormap"].append(
{
"label_id": int(label_id),
"r": int(color[0]),
"g": int(color[1]),
"b": int(color[2]),
}
)
return converted
def _convert_points_categories(self, obj):
converted = {
"items": [],
}
for label_id, item in obj.items.items():
converted["items"].append(
{
"label_id": int(label_id),
"labels": [cast(label, str) for label in item.labels],
"joints": [list(map(int, j)) for j in item.joints],
}
)
return converted
[docs]class DatumaroConverter(Converter):
DEFAULT_IMAGE_EXT = DatumaroPath.IMAGE_EXT
[docs] def apply(self):
os.makedirs(self._save_dir, exist_ok=True)
images_dir = osp.join(self._save_dir, DatumaroPath.IMAGES_DIR)
os.makedirs(images_dir, exist_ok=True)
self._images_dir = images_dir
annotations_dir = osp.join(self._save_dir, DatumaroPath.ANNOTATIONS_DIR)
os.makedirs(annotations_dir, exist_ok=True)
self._annotations_dir = annotations_dir
self._pcd_dir = osp.join(self._save_dir, DatumaroPath.PCD_DIR)
self._related_images_dir = osp.join(self._save_dir, DatumaroPath.RELATED_IMAGES_DIR)
writers = {s: _SubsetWriter(self) for s in self._extractor.subsets()}
for writer in writers.values():
writer.add_categories(self._extractor.categories())
for item in self._extractor:
subset = item.subset or DEFAULT_SUBSET_NAME
writers[subset].add_item(item)
for subset, writer in writers.items():
ann_file = osp.join(self._annotations_dir, "%s.json" % subset)
if self._patch and subset in self._patch.updated_subsets and writer.is_empty():
if osp.isfile(ann_file):
# Remove subsets that became empty
os.remove(ann_file)
continue
writer.write(ann_file)
[docs] @classmethod
def patch(cls, dataset, patch, save_dir, **kwargs):
for subset in patch.updated_subsets:
conv = cls(dataset.get_subset(subset), save_dir=save_dir, **kwargs)
conv._patch = patch
conv.apply()
conv = cls(dataset, save_dir=save_dir, **kwargs)
for (item_id, subset), status in patch.updated_items.items():
if status != ItemStatus.removed:
item = patch.data.get(item_id, subset)
else:
item = DatasetItem(item_id, subset=subset)
if not (status == ItemStatus.removed or not item.media):
continue
image_path = osp.join(
save_dir, DatumaroPath.IMAGES_DIR, item.subset, conv._make_image_filename(item)
)
if osp.isfile(image_path):
os.unlink(image_path)
pcd_path = osp.join(
save_dir, DatumaroPath.PCD_DIR, item.subset, conv._make_pcd_filename(item)
)
if osp.isfile(pcd_path):
os.unlink(pcd_path)
related_images_path = osp.join(
save_dir, DatumaroPath.RELATED_IMAGES_DIR, item.subset, item.id
)
if osp.isdir(related_images_path):
shutil.rmtree(related_images_path)