Source code for datumaro.plugins.kitti_raw_format.extractor

# Copyright (C) 2021 Intel Corporation
#
# SPDX-License-Identifier: MIT

import os
import os.path as osp

from defusedxml import ElementTree as ET

from datumaro.components.annotation import AnnotationType, Cuboid3d, LabelCategories
from datumaro.components.extractor import DatasetItem, Importer, SourceExtractor
from datumaro.components.format_detection import FormatDetectionContext
from datumaro.components.media import Image, PointCloud
from datumaro.util import cast
from datumaro.util.image import find_images
from datumaro.util.meta_file_util import has_meta_file, parse_meta_file

from .format import KittiRawPath, OcclusionStates, TruncationStates


[docs]class KittiRawExtractor(SourceExtractor): # http://www.cvlibs.net/datasets/kitti/raw_data.php # https://s3.eu-central-1.amazonaws.com/avg-kitti/devkit_raw_data.zip # Check cpp header implementation for field meaning
[docs] def __init__(self, path, subset=None): assert osp.isfile(path), path self._rootdir = osp.dirname(path) super().__init__(subset=subset, media_type=PointCloud) items, categories = self._parse(path) self._items = list(self._load_items(items).values()) self._categories = categories
@classmethod def _parse(cls, path): tracks = [] track = None shape = None attr = None labels = {} point_tags = {"tx", "ty", "tz", "rx", "ry", "rz"} # Can fail with "XML declaration not well-formed" on documents with # <?xml ... standalone="true"?> # ^^^^ # (like the original Kitti dataset), while # <?xml ... standalone="yes"?> # ^^^ # works. tree = ET.iterparse(path, events=("start", "end")) for ev, elem in tree: if ev == "start": if elem.tag == "item": if track is None: track = { "shapes": [], "scale": {}, "label": None, "attributes": {}, "start_frame": None, "length": None, } else: shape = { "points": {}, "attributes": {}, "occluded": None, "occluded_kf": False, "truncated": None, } elif elem.tag == "attribute": attr = {} elif ev == "end": if elem.tag == "item": assert track is not None if shape: track["shapes"].append(shape) shape = None else: assert track["length"] == len(track["shapes"]) if track["label"]: labels.setdefault(track["label"], set()) for a in track["attributes"]: labels[track["label"]].add(a) for s in track["shapes"]: for a in s["attributes"]: labels[track["label"]].add(a) tracks.append(track) track = None # track tags elif track and elem.tag == "objectType": track["label"] = elem.text elif track and elem.tag in {"h", "w", "l"}: track["scale"][elem.tag] = float(elem.text) elif track and elem.tag == "first_frame": track["start_frame"] = int(elem.text) elif track and elem.tag == "count" and track: track["length"] = int(elem.text) # pose tags elif shape and elem.tag in point_tags: shape["points"][elem.tag] = float(elem.text) elif shape and elem.tag == "occlusion": shape["occluded"] = OcclusionStates(int(elem.text)) elif shape and elem.tag == "occlusion_kf": shape["occluded_kf"] = elem.text == "1" elif shape and elem.tag == "truncation": shape["truncated"] = TruncationStates(int(elem.text)) # common tags elif attr is not None and elem.tag == "name": if not elem.text: raise ValueError("Attribute name can't be empty") attr["name"] = elem.text elif attr is not None and elem.tag == "value": attr["value"] = elem.text or "" elif attr is not None and elem.tag == "attribute": if shape: shape["attributes"][attr["name"]] = attr["value"] else: track["attributes"][attr["name"]] = attr["value"] attr = None if track is not None or shape is not None or attr is not None: raise Exception("Failed to parse annotations from '%s'" % path) special_attrs = KittiRawPath.SPECIAL_ATTRS common_attrs = ["occluded"] if has_meta_file(path): categories = { AnnotationType.label: LabelCategories.from_iterable(parse_meta_file(path).keys()) } else: label_cat = LabelCategories(attributes=common_attrs) for label, attrs in sorted(labels.items(), key=lambda e: e[0]): label_cat.add(label, attributes=set(attrs) - special_attrs) categories = {AnnotationType.label: label_cat} items = {} for idx, track in enumerate(tracks): track_id = idx + 1 for i, ann in enumerate(cls._parse_track(track_id, track, categories)): frame_desc = items.setdefault(track["start_frame"] + i, {"annotations": []}) frame_desc["annotations"].append(ann) return items, categories @classmethod def _parse_attr(cls, value): if value == "true": return True elif value == "false": return False elif str(cast(value, int, 0)) == value: return int(value) elif str(cast(value, float, 0)) == value: return float(value) else: return value @classmethod def _parse_track(cls, track_id, track, categories): common_attrs = {k: cls._parse_attr(v) for k, v in track["attributes"].items()} scale = [track["scale"][k] for k in ["w", "h", "l"]] label = categories[AnnotationType.label].find(track["label"])[0] kf_occluded = False for shape in track["shapes"]: occluded = shape["occluded"] in {OcclusionStates.FULLY, OcclusionStates.PARTLY} if shape["occluded_kf"]: kf_occluded = occluded elif shape["occluded"] == OcclusionStates.OCCLUSION_UNSET: occluded = kf_occluded if shape["truncated"] in {TruncationStates.OUT_IMAGE, TruncationStates.BEHIND_IMAGE}: # skip these frames continue local_attrs = {k: cls._parse_attr(v) for k, v in shape["attributes"].items()} local_attrs["occluded"] = occluded local_attrs["track_id"] = track_id attrs = dict(common_attrs) attrs.update(local_attrs) position = [shape["points"][k] for k in ["tx", "ty", "tz"]] rotation = [shape["points"][k] for k in ["rx", "ry", "rz"]] yield Cuboid3d(position, rotation, scale, label=label, attributes=attrs) @staticmethod def _parse_name_mapping(path): rootdir = osp.dirname(path) name_mapping = {} if osp.isfile(path): with open(path, encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue idx, path = line.split(maxsplit=1) path = osp.abspath(osp.join(rootdir, path)) assert path.startswith(rootdir), path path = osp.relpath(path, rootdir) name_mapping[int(idx)] = path return name_mapping def _load_items(self, parsed): images = {} for d in os.listdir(self._rootdir): image_dir = osp.join(self._rootdir, d, "data") if not (d.lower().startswith(KittiRawPath.IMG_DIR_PREFIX) and osp.isdir(image_dir)): continue for p in find_images(image_dir, recursive=True): image_name = osp.splitext(osp.relpath(p, image_dir))[0] images.setdefault(image_name, []).append(p) name_mapping = self._parse_name_mapping( osp.join(self._rootdir, KittiRawPath.NAME_MAPPING_FILE) ) items = {} for frame_id, item_desc in parsed.items(): name = name_mapping.get(frame_id, "%010d" % int(frame_id)) items[frame_id] = DatasetItem( id=name, subset=self._subset, media=PointCloud( osp.join(self._rootdir, KittiRawPath.PCD_DIR, name + ".pcd"), extra_images=[Image(path=image) for image in sorted(images.get(name, []))], ), annotations=item_desc.get("annotations"), attributes={"frame": int(frame_id)}, ) for frame_id, name in name_mapping.items(): if frame_id in items: continue items[frame_id] = DatasetItem( id=name, subset=self._subset, media=PointCloud( osp.join(self._rootdir, KittiRawPath.PCD_DIR, name + ".pcd"), extra_images=[Image(path=image) for image in sorted(images.get(name, []))], ), attributes={"frame": int(frame_id)}, ) return items
[docs]class KittiRawImporter(Importer):
[docs] @classmethod def detect(cls, context: FormatDetectionContext) -> None: annot_file = context.require_file("*.xml") with context.probe_text_file( annot_file, "must be a KITTI-like annotation file", ) as f: parser = ET.iterparse(f, events=("start",)) _, elem = next(parser) if elem.tag != "boost_serialization": raise Exception _, elem = next(parser) if elem.tag != "tracklets": raise Exception
[docs] @classmethod def find_sources(cls, path): return cls._find_sources_recursive(path, ".xml", "kitti_raw")