Source code for datumaro.plugins.cvat_format.extractor
# Copyright (C) 2019-2021 Intel Corporation
# SPDX-License-Identifier: MIT
from collections import OrderedDict
import os.path as osp
from defusedxml import ElementTree
from datumaro.components.annotation import (
AnnotationType, Bbox, Label, LabelCategories, Points, Polygon, PolyLine,
from datumaro.components.extractor import DatasetItem, Importer, SourceExtractor
from datumaro.components.format_detection import FormatDetectionContext
from import Image
from .format import CvatPath
[docs]class CvatExtractor(SourceExtractor):
_SUPPORTED_SHAPES = ('box', 'polygon', 'polyline', 'points')
[docs] def __init__(self, path, subset=None):
assert osp.isfile(path), path
rootpath = osp.dirname(path)
images_dir = ''
if osp.isdir(osp.join(rootpath, CvatPath.IMAGES_DIR)):
images_dir = osp.join(rootpath, CvatPath.IMAGES_DIR)
self._images_dir = images_dir
self._path = path
if not subset:
subset = osp.splitext(osp.basename(path))[0]
items, categories = self._parse(path)
self._items = list(self._load_items(items).values())
self._categories = categories
def _parse(cls, path):
context = ElementTree.iterparse(path, events=("start", "end"))
context = iter(context)
categories, frame_size, attribute_types = cls._parse_meta(context)
items = OrderedDict()
track = None
shape = None
tag = None
attributes = None
image = None
for ev, el in context:
if ev == 'start':
if el.tag == 'track':
track = {
'id': el.attrib['id'],
'label': el.attrib.get('label'),
'group': int(el.attrib.get('group_id', 0)),
'height': frame_size[0],
'width': frame_size[1],
elif el.tag == 'image':
image = {
'name': el.attrib.get('name'),
'frame': el.attrib['id'],
'width': el.attrib.get('width'),
'height': el.attrib.get('height'),
elif el.tag in cls._SUPPORTED_SHAPES and (track or image):
attributes = {}
shape = {
'type': None,
'attributes': attributes,
if track:
shape['track_id'] = int(track['id'])
if image:
elif el.tag == 'tag' and image:
attributes = {}
tag = {
'frame': image['frame'],
'attributes': attributes,
'group': int(el.attrib.get('group_id', 0)),
'label': el.attrib['label'],
elif ev == 'end':
if el.tag == 'attribute' and attributes is not None:
attr_value = el.text or ''
attr_type = attribute_types.get(el.attrib['name'])
if el.text in ['true', 'false']:
attr_value = attr_value == 'true'
elif attr_type is not None and attr_type != 'text':
attr_value = float(attr_value)
except ValueError:
attributes[el.attrib['name']] = attr_value
elif el.tag in cls._SUPPORTED_SHAPES:
if track is not None:
shape['frame'] = el.attrib['frame']
shape['outside'] = (el.attrib.get('outside') == '1')
shape['keyframe'] = (el.attrib.get('keyframe') == '1')
if image is not None:
shape['label'] = el.attrib.get('label')
shape['group'] = int(el.attrib.get('group_id', 0))
shape['type'] = el.tag
shape['occluded'] = (el.attrib.get('occluded') == '1')
shape['z_order'] = int(el.attrib.get('z_order', 0))
if el.tag == 'box':
shape['points'] = list(map(float, [
el.attrib['xtl'], el.attrib['ytl'],
el.attrib['xbr'], el.attrib['ybr'],
shape['points'] = []
for pair in el.attrib['points'].split(';'):
shape['points'].extend(map(float, pair.split(',')))
frame_desc = items.get(shape['frame'], {'annotations': []})
cls._parse_shape_ann(shape, categories))
items[shape['frame']] = frame_desc
shape = None
elif el.tag == 'tag':
frame_desc = items.get(tag['frame'], {'annotations': []})
cls._parse_tag_ann(tag, categories))
items[tag['frame']] = frame_desc
tag = None
elif el.tag == 'track':
track = None
elif el.tag == 'image':
frame_desc = items.get(image['frame'], {'annotations': []})
'name': image.get('name'),
'height': image.get('height'),
'width': image.get('width'),
items[image['frame']] = frame_desc
image = None
return items, categories
def _parse_meta(context):
ev, el = next(context)
if not (ev == 'start' and el.tag == 'annotations'):
raise Exception("Unexpected token ")
categories = {}
frame_size = None
mode = None
labels = OrderedDict()
label = None
# Recursive descent parser
el = None
states = ['annotations']
def accepted(expected_state, tag, next_state=None):
state = states[-1]
if state == expected_state and el is not None and el.tag == tag:
if not next_state:
next_state = tag
return True
return False
def consumed(expected_state, tag):
state = states[-1]
if state == expected_state and el is not None and el.tag == tag:
return True
return False
for ev, el in context:
if ev == 'start':
if accepted('annotations', 'meta'): pass
elif accepted('meta', 'task'): pass
elif accepted('task', 'mode'): pass
elif accepted('task', 'original_size'):
frame_size = [None, None]
elif accepted('original_size', 'height', next_state='frame_height'): pass
elif accepted('original_size', 'width', next_state='frame_width'): pass
elif accepted('task', 'labels'): pass
elif accepted('labels', 'label'):
label = { 'name': None, 'attributes': [] }
elif accepted('label', 'name', next_state='label_name'): pass
elif accepted('label', 'attributes'): pass
elif accepted('attributes', 'attribute'): pass
elif accepted('attribute', 'name', next_state='attr_name'): pass
elif accepted('attribute', 'input_type', next_state='attr_type'): pass
elif accepted('annotations', 'image') or \
accepted('annotations', 'track') or \
accepted('annotations', 'tag'):
elif ev == 'end':
if consumed('meta', 'meta'):
elif consumed('task', 'task'): pass
elif consumed('mode', 'mode'):
mode = el.text
elif consumed('original_size', 'original_size'): pass
elif consumed('frame_height', 'height'):
frame_size[0] = int(el.text)
elif consumed('frame_width', 'width'):
frame_size[1] = int(el.text)
elif consumed('label_name', 'name'):
label['name'] = el.text
elif consumed('attr_name', 'name'):
label['attributes'].append({'name': el.text})
elif consumed('attr_type', 'input_type'):
label['attributes'][-1]['input_type'] = el.text
elif consumed('attribute', 'attribute'): pass
elif consumed('attributes', 'attributes'): pass
elif consumed('label', 'label'):
labels[label['name']] = label['attributes']
label = None
elif consumed('labels', 'labels'): pass
assert len(states) == 1 and states[0] == 'annotations', \
"Expected 'meta' section in the annotation file, path: %s" % states
common_attrs = ['occluded']
if mode == 'interpolation':
label_cat = LabelCategories(attributes=common_attrs)
attribute_types = {}
for label, attrs in labels.items():
attr_names = {v['name'] for v in attrs}
label_cat.add(label, attributes=attr_names)
for attr in attrs:
attribute_types[attr['name']] = attr['input_type']
categories[AnnotationType.label] = label_cat
return categories, frame_size, attribute_types
def _parse_shape_ann(cls, ann, categories):
ann_id = ann.get('id', 0)
ann_type = ann['type']
attributes = ann.get('attributes') or {}
if 'occluded' in categories[AnnotationType.label].attributes:
attributes['occluded'] = ann.get('occluded', False)
if 'outside' in ann:
attributes['outside'] = ann['outside']
if 'keyframe' in ann:
attributes['keyframe'] = ann['keyframe']
if 'track_id' in ann:
attributes['track_id'] = ann['track_id']
group = ann.get('group')
label = ann.get('label')
label_id = categories[AnnotationType.label].find(label)[0]
z_order = ann.get('z_order', 0)
points = ann.get('points', [])
if ann_type == 'polyline':
return PolyLine(points, label=label_id, z_order=z_order,
id=ann_id, attributes=attributes, group=group)
elif ann_type == 'polygon':
return Polygon(points, label=label_id, z_order=z_order,
id=ann_id, attributes=attributes, group=group)
elif ann_type == 'points':
return Points(points, label=label_id, z_order=z_order,
id=ann_id, attributes=attributes, group=group)
elif ann_type == 'box':
x, y = points[0], points[1]
w, h = points[2] - x, points[3] - y
return Bbox(x, y, w, h, label=label_id, z_order=z_order,
id=ann_id, attributes=attributes, group=group)
raise NotImplementedError("Unknown annotation type '%s'" % ann_type)
def _parse_tag_ann(cls, ann, categories):
label = ann.get('label')
label_id = categories[AnnotationType.label].find(label)[0]
group = ann.get('group')
attributes = ann.get('attributes')
return Label(label_id, attributes=attributes, group=group)
def _load_items(self, parsed):
for frame_id, item_desc in parsed.items():
name = item_desc.get('name', 'frame_%06d.png' % int(frame_id))
image = osp.join(self._images_dir, name)
image_size = (item_desc.get('height'), item_desc.get('width'))
if all(image_size):
image = Image(path=image, size=tuple(map(int, image_size)))
parsed[frame_id] = DatasetItem(id=osp.splitext(name)[0],
subset=self._subset, image=image,
attributes={'frame': int(frame_id)})
return parsed
[docs]class CvatImporter(Importer):
[docs] @classmethod
def detect(cls, context: FormatDetectionContext) -> None:
annot_file = context.require_file('*.xml')
with context.probe_text_file(
annot_file, "must be an XML file with an \"annotations\" root element",
) as f:
_, root_elem = next(ElementTree.iterparse(f, events=('start',)))
if root_elem.tag != 'annotations':
raise Exception
[docs] @classmethod
def find_sources(cls, path):
return cls._find_sources_recursive(path, '.xml', 'cvat')