# Copyright (C) 2019-2020 Intel Corporation
#
# SPDX-License-Identifier: MIT
# pylint: disable=unused-variable
from math import ceil
import numpy as np
from datumaro.components.annotation import AnnotationType
from datumaro.util.annotation_util import nms
[docs]def flatmatvec(mat):
return np.reshape(mat, (len(mat), -1))
[docs]def expand(array, axis=None):
if axis is None:
axis = len(array.shape)
return np.expand_dims(array, axis=axis)
[docs]class RISE:
"""
Implements RISE: Randomized Input Sampling for
Explanation of Black-box Models algorithm.
See explanations at: https://arxiv.org/pdf/1806.07421.pdf
"""
[docs] def __init__(
self,
model,
max_samples=None,
mask_width=7,
mask_height=7,
prob=0.5,
iou_thresh=0.9,
nms_thresh=0.0,
det_conf_thresh=0.0,
batch_size=1,
):
self.model = model
self.max_samples = max_samples
self.mask_height = mask_height
self.mask_width = mask_width
self.prob = prob
self.iou_thresh = iou_thresh
self.nms_thresh = nms_thresh
self.det_conf_thresh = det_conf_thresh
self.batch_size = batch_size
[docs] @staticmethod
def split_outputs(annotations):
labels = []
bboxes = []
for r in annotations:
if r.type is AnnotationType.label:
labels.append(r)
elif r.type is AnnotationType.bbox:
bboxes.append(r)
return labels, bboxes
[docs] def normalize_hmaps(self, heatmaps, counts):
eps = np.finfo(heatmaps.dtype).eps
mhmaps = flatmatvec(heatmaps)
mhmaps /= expand(counts * self.prob + eps)
mhmaps -= expand(np.min(mhmaps, axis=1))
mhmaps /= expand(np.max(mhmaps, axis=1) + eps)
return np.reshape(mhmaps, heatmaps.shape)
[docs] def apply(self, image, progressive=False):
import cv2
assert len(image.shape) in [2, 3], "Expected an input image in (H, W, C) format"
if len(image.shape) == 3:
assert image.shape[2] in [3, 4], "Expected BGR or BGRA input"
image = image[:, :, :3].astype(np.float32)
model = self.model
iou_thresh = self.iou_thresh
image_size = np.array((image.shape[:2]))
mask_size = np.array((self.mask_height, self.mask_width))
cell_size = np.ceil(image_size / mask_size)
upsampled_size = np.ceil((mask_size + 1) * cell_size)
rng = lambda shape=None: np.random.rand(*shape)
samples = np.prod(image_size)
if self.max_samples is not None:
samples = min(self.max_samples, samples)
batch_size = self.batch_size
result = next(iter(model.launch(expand(image, 0))))
result_labels, result_bboxes = self.split_outputs(result)
if 0 < self.det_conf_thresh:
result_bboxes = [
b for b in result_bboxes if self.det_conf_thresh <= b.attributes["score"]
]
if 0 < self.nms_thresh:
result_bboxes = nms(result_bboxes, self.nms_thresh)
predicted_labels = set()
if len(result_labels) != 0:
predicted_label = max(result_labels, key=lambda r: r.attributes["score"]).label
predicted_labels.add(predicted_label)
if len(result_bboxes) != 0:
for bbox in result_bboxes:
predicted_labels.add(bbox.label)
predicted_labels = {label: idx for idx, label in enumerate(predicted_labels)}
predicted_bboxes = result_bboxes
heatmaps_count = len(predicted_labels) + len(predicted_bboxes)
heatmaps = np.zeros((heatmaps_count, *image_size), dtype=np.float32)
total_counts = np.zeros(heatmaps_count, dtype=np.int32)
confs = np.zeros(heatmaps_count, dtype=np.float32)
heatmap_id = 0
label_heatmaps = None
label_total_counts = None
label_confs = None
if len(predicted_labels) != 0:
step = len(predicted_labels)
label_heatmaps = heatmaps[heatmap_id : heatmap_id + step]
label_total_counts = total_counts[heatmap_id : heatmap_id + step]
label_confs = confs[heatmap_id : heatmap_id + step]
heatmap_id += step
bbox_heatmaps = None
bbox_total_counts = None
bbox_confs = None
if len(predicted_bboxes) != 0:
step = len(predicted_bboxes)
bbox_heatmaps = heatmaps[heatmap_id : heatmap_id + step]
bbox_total_counts = total_counts[heatmap_id : heatmap_id + step]
bbox_confs = confs[heatmap_id : heatmap_id + step]
heatmap_id += step
ups_mask = np.empty(upsampled_size.astype(int), dtype=np.float32)
masks = np.empty((batch_size, *image_size), dtype=np.float32)
full_batch_inputs = np.empty((batch_size, *image.shape), dtype=np.float32)
current_heatmaps = np.empty_like(heatmaps)
for b in range(ceil(samples / batch_size)):
batch_pos = b * batch_size
current_batch_size = min(samples - batch_pos, batch_size)
batch_masks = masks[:current_batch_size]
for i in range(current_batch_size):
mask = (rng(mask_size) < self.prob).astype(np.float32)
cv2.resize(mask, (int(upsampled_size[1]), int(upsampled_size[0])), ups_mask)
offsets = np.round(rng((2,)) * cell_size)
mask = ups_mask[
int(offsets[0]) : int(image_size[0] + offsets[0]),
int(offsets[1]) : int(image_size[1] + offsets[1]),
]
batch_masks[i] = mask
batch_inputs = full_batch_inputs[:current_batch_size]
np.multiply(expand(batch_masks), expand(image, 0), out=batch_inputs)
results = model.launch(batch_inputs)
for mask, result in zip(batch_masks, results):
result_labels, result_bboxes = self.split_outputs(result)
confs.fill(0)
if len(predicted_labels) != 0:
for r in result_labels:
idx = predicted_labels.get(r.label, None)
if idx is not None:
label_total_counts[idx] += 1
label_confs[idx] += r.attributes["score"]
for r in result_bboxes:
idx = predicted_labels.get(r.label, None)
if idx is not None:
label_total_counts[idx] += 1
label_confs[idx] += r.attributes["score"]
if len(predicted_bboxes) != 0 and len(result_bboxes) != 0:
if 0 < self.det_conf_thresh:
result_bboxes = [
b
for b in result_bboxes
if self.det_conf_thresh <= b.attributes["score"]
]
if 0 < self.nms_thresh:
result_bboxes = nms(result_bboxes, self.nms_thresh)
for detection in result_bboxes:
for pred_idx, pred in enumerate(predicted_bboxes):
if pred.label != detection.label:
continue
iou = pred.iou(detection)
assert iou == -1 or 0 <= iou and iou <= 1
if iou < iou_thresh:
continue
bbox_total_counts[pred_idx] += 1
conf = detection.attributes["score"]
bbox_confs[pred_idx] += conf
np.multiply.outer(confs, mask, out=current_heatmaps)
heatmaps += current_heatmaps
if progressive:
yield self.normalize_hmaps(heatmaps.copy(), total_counts)
yield self.normalize_hmaps(heatmaps, total_counts)