-
Notifications
You must be signed in to change notification settings - Fork 243
Roll up detections block #1853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Roll up detections block #1853
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a new "Dimension Roll Up" block that merges detections from secondary models run on dynamic crops back into parent image coordinates. This is a highly needed feature for workflows involving nested detection pipelines (e.g., cropping and running inference on crops, then consolidating results).
Key changes:
- Implements merging of bounding boxes and instance segmentation masks from child detections back to parent coordinates
- Supports configurable confidence strategies (max, mean, min) and overlap thresholds for merging overlapping detections
- Uses IoU-based merging with union-find algorithm to group overlapping detections per class
Reviewed changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 18 comments.
| File | Description |
|---|---|
inference/core/workflows/core_steps/loader.py |
Registers the new DimensionRollUpBlockV1 block by adding import and including it in the load_blocks() list |
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py |
Complete implementation of the dimension rollup block with manifest, transformation logic, mask/bbox merging algorithms, and helper functions for polygon conversion |
inference/core/workflows/core_steps/fusion/dimension_rollup/__init__.py |
Empty init file for the new dimension_rollup module |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
|
@lou-roboflow I've opened a new pull request, #1855, to work on those changes. Once the pull request is ready, I'll request review from you. |
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py
Outdated
Show resolved
Hide resolved
Run black and isort on changes
…nference into RollUpDetectionsBlock
| # Create boxes as Polygons for spatial indexing | ||
| boxes = [] | ||
| for pred in predictions: | ||
| x_min, y_min, x_max, y_max = pred["bbox"] | ||
| # Create box polygon (coordinates: [bottom-left, bottom-right, top-right, top-left]) | ||
| box = Polygon([(x_min, y_min), (x_max, y_min), (x_max, y_max), (x_min, y_max)]) | ||
| boxes.append(box) | ||
|
|
||
| tree = STRtree(boxes) | ||
|
|
||
| # Check candidate pairs identified by spatial index | ||
| checked_pairs = set() | ||
| for i in range(n): | ||
| box1 = boxes[i] | ||
| # Query for boxes that intersect the bounding box | ||
| candidates = tree.query(box1, predicate="intersects") | ||
|
|
||
| for j in candidates: | ||
| if i >= j or (i, j) in checked_pairs or (j, i) in checked_pairs: | ||
| continue | ||
| checked_pairs.add((i, j)) | ||
|
|
||
| bbox1 = predictions[i]["bbox"] | ||
| bbox2 = predictions[j]["bbox"] | ||
|
|
||
| iou = bbox_iou(bbox1, bbox2) | ||
|
|
||
| if overlap_threshold <= 0.0: | ||
| # Merge if they overlap at all | ||
| if iou > 0: | ||
| union(i, j) | ||
| else: | ||
| # Merge only if IoU exceeds threshold | ||
| if iou >= overlap_threshold: | ||
| union(i, j) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡️Codeflash found 136% (1.36x) speedup for _merge_overlapping_bboxes in inference/core/workflows/core_steps/fusion/detections_list_rollup/v1.py
⏱️ Runtime : 825 milliseconds → 349 milliseconds (best of 20 runs)
📝 Explanation and details
The optimized code achieves a 136% speedup by replacing Shapely's spatial indexing (STRtree with Polygon objects) with a sweep-line algorithm that operates directly on bounding box coordinates.
Key Optimization
Removed Shapely dependency overhead: The original code created Polygon objects for each bounding box and built an STRtree spatial index. While STRtree helps avoid O(n²) comparisons, the overhead of creating Polygons and building/querying the tree dominates for typical detection counts.
Implemented sweep-line with early termination: The optimized version:
- Sorts bounding boxes by x_min coordinate once (O(n log n))
- For each box, only checks boxes that could overlap in the x-dimension
- Uses early termination: when
x2_min > x1_max, all remaining sorted boxes can't overlap - Adds a quick y-dimension rejection test before computing IoU
Performance Analysis
From line profiler results, the original code spent:
- 4% creating Polygon objects (244ms out of 6.17s)
- 3% querying STRtree (182ms)
- 30.9% computing IoU for 262,805 pairs
- 35.6% in union operations
The optimized version:
- 0.1% sorting indices (3.27ms)
- 37% computing IoU for the same 262,805 pairs
- 40.2% in union operations
- Eliminates all Shapely overhead
The sweep-line approach maintains the same candidate pair count (262,805) as STRtree but with ~490ms less overhead from polygon creation and tree operations.
Impact on Workloads
Based on function_references, this function is called from merge_crop_predictions which processes detections from multiple image crops. The optimization is particularly valuable when:
- Processing many crops with moderate detection counts (typical CV pipelines)
- The function is in a hot path: called once per batch of crops being merged
- Test results show 544-822% speedup for small-to-medium cases (2-100 detections), making per-crop processing nearly instantaneous
- For large overlapping cases (500 detections), the speedup is 117-123%, as IoU computation still dominates
The sweep-line algorithm scales better than STRtree for the typical detection counts (tens to low hundreds) seen in object detection, while avoiding Python/C++ boundary crossing overhead from Shapely operations.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⏪ Replay Tests | 🔘 None Found |
| ⚙️ Existing Unit Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 🌀 Generated Regression Tests | ✅ 43 Passed |
| 📊 Tests Coverage | 96.6% |
🌀 Click to see Generated Regression Tests
from typing import List
# function to test (copied from above)
import numpy as np
# imports
import pytest # used for our unit tests
from inference.core.workflows.core_steps.fusion.detections_list_rollup.v1 import (
_merge_overlapping_bboxes,
)
from shapely.geometry import Polygon
from shapely.strtree import STRtree
# unit tests
# ----------- BASIC TEST CASES ------------
def test_empty_input():
# Should return empty list for no predictions
codeflash_output = _merge_overlapping_bboxes([], "max")
result = codeflash_output # 831ns -> 842ns (1.31% slower)
def test_single_bbox():
# Should return the same bbox unchanged
bbox = [0, 0, 10, 10]
pred = {"bbox": bbox, "confidence": 0.9, "class_id": 1}
codeflash_output = _merge_overlapping_bboxes([pred], "max")
result = codeflash_output # 113μs -> 16.7μs (580% faster)
def test_non_overlapping_bboxes():
# Should not merge bboxes that do not overlap
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
{"bbox": [20, 20, 30, 30], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 122μs -> 19.0μs (544% faster)
# Each output bbox should match one input bbox
for out in result:
pass
def test_two_overlapping_bboxes():
# Should merge two overlapping bboxes
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 126μs -> 18.6μs (584% faster)
# The merged bbox should cover both
expected_bbox = np.array([0, 0, 15, 15])
def test_three_chain_overlapping_bboxes():
# Should merge all three if each overlaps with next
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
{"bbox": [9, 0, 19, 10], "confidence": 0.8, "class_id": 1},
{"bbox": [18, 0, 28, 10], "confidence": 0.6, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "mean")
result = codeflash_output # 188μs -> 59.7μs (215% faster)
# The merged bbox should cover all
expected_bbox = np.array([0, 0, 28, 10])
def test_different_class_ids():
# Should not merge bboxes of different classes
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 2},
]
# Simulate merging by class: only bboxes of same class are grouped
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 124μs -> 17.9μs (595% faster)
classes = set([r["class_id"] for r in result])
def test_confidence_strategy_min():
# Should use min confidence when requested
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "min")
result = codeflash_output # 123μs -> 18.0μs (585% faster)
def test_confidence_strategy_unknown():
# Should default to max confidence if strategy is unknown
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "foobar")
result = codeflash_output # 123μs -> 17.5μs (603% faster)
# ----------- EDGE TEST CASES ------------
def test_zero_area_bbox():
# Bbox with zero area should not merge with others
preds = [
{"bbox": [0, 0, 0, 10], "confidence": 0.5, "class_id": 1}, # zero width
{"bbox": [1, 1, 10, 10], "confidence": 0.9, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 123μs -> 17.8μs (596% faster)
found_zero = any(np.all(r["bbox"] == np.array([0, 0, 0, 10])) for r in result)
def test_bbox_touching_edges_only():
# Boxes that touch at edge but do not overlap should not be merged (IoU=0)
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
{"bbox": [10, 0, 20, 10], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 120μs -> 19.7μs (513% faster)
def test_bbox_fully_contained():
# One box fully inside another should be merged
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
{"bbox": [2, 2, 8, 8], "confidence": 0.9, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "mean")
result = codeflash_output # 143μs -> 43.8μs (229% faster)
def test_overlap_threshold_high():
# With high overlap_threshold, boxes should not merge unless IoU >= threshold
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
{"bbox": [8, 8, 18, 18], "confidence": 0.9, "class_id": 1},
]
# IoU is low, so with threshold=0.5, should not merge
codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.5)
result = codeflash_output # 123μs -> 20.2μs (511% faster)
def test_overlap_threshold_low():
# With low overlap_threshold, boxes that overlap at all should merge
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
{"bbox": [9, 9, 19, 19], "confidence": 0.9, "class_id": 1},
]
# They overlap at the corner, so with threshold=0.0, should merge
codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.0)
result = codeflash_output # 121μs -> 17.9μs (576% faster)
def test_multiple_classes_and_overlap():
# Overlapping bboxes of different classes should not merge
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.9, "class_id": 2},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 121μs -> 17.8μs (583% faster)
def test_negative_coordinates():
# Should handle negative coordinates correctly
preds = [
{"bbox": [-10, -10, 0, 0], "confidence": 0.7, "class_id": 1},
{"bbox": [-5, -5, 5, 5], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 121μs -> 18.0μs (577% faster)
expected_bbox = np.array([-10, -10, 5, 5])
def test_float_coordinates():
# Should handle float coordinates
preds = [
{"bbox": [0.0, 0.0, 10.5, 10.5], "confidence": 0.9, "class_id": 1},
{"bbox": [10.0, 10.0, 20.0, 20.0], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 121μs -> 18.6μs (557% faster)
expected_bbox = np.array([0.0, 0.0, 20.0, 20.0])
def test_overlapping_bboxes_different_confidences():
# Should merge and use correct confidence strategy
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.2, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.5, "class_id": 1},
{"bbox": [8, 8, 18, 18], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 160μs -> 22.5μs (613% faster)
# ----------- LARGE SCALE TEST CASES ------------
def test_large_number_non_overlapping():
# Should handle many non-overlapping bboxes efficiently
preds = []
for i in range(100):
preds.append(
{
"bbox": [i * 10, i * 10, i * 10 + 5, i * 10 + 5],
"confidence": 0.5,
"class_id": 1,
}
)
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 2.64ms -> 306μs (762% faster)
for i in range(100):
found = any(
np.all(r["bbox"] == np.array([i * 10, i * 10, i * 10 + 5, i * 10 + 5]))
for r in result
)
def test_large_number_chain_overlapping():
# Should merge all into one group if each overlaps next
preds = []
for i in range(100):
preds.append(
{"bbox": [i, 0, i + 2, 2], "confidence": 0.1 + 0.001 * i, "class_id": 1}
)
codeflash_output = _merge_overlapping_bboxes(preds, "mean")
result = codeflash_output # 3.02ms -> 327μs (822% faster)
expected_bbox = np.array([0, 0, 101, 2])
# Confidence should be mean of all
expected_conf = np.mean([0.1 + 0.001 * i for i in range(100)])
def test_large_number_multiple_classes():
# Should separate by class, even if bboxes overlap
preds = []
for i in range(50):
preds.append({"bbox": [i, 0, i + 10, 10], "confidence": 0.5, "class_id": 1})
preds.append({"bbox": [i, 0, i + 10, 10], "confidence": 0.6, "class_id": 2})
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 6.73ms -> 2.27ms (196% faster)
# Should produce two merged bboxes, one per class
class_counts = {1: 0, 2: 0}
for r in result:
class_counts[r["class_id"]] += 1
def test_large_number_random_overlap():
# Randomly overlapping boxes, some should merge, some not
preds = []
for i in range(50):
# Overlap every 10th box with the previous
if i % 10 == 0 and i > 0:
bbox = [i - 1, 0, i + 9, 10]
else:
bbox = [i * 2, 0, i * 2 + 5, 5]
preds.append({"bbox": bbox, "confidence": 0.5, "class_id": 1})
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 1.64ms -> 192μs (756% faster)
def test_large_number_overlap_threshold():
# With high overlap_threshold, fewer merges
preds = []
for i in range(100):
preds.append(
{"bbox": [i, 0, i + 2, 2], "confidence": 0.1 + 0.001 * i, "class_id": 1}
)
codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.9)
result = codeflash_output # 3.06ms -> 471μs (549% faster)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.import numpy as np
# imports
import pytest # used for our unit tests
from inference.core.workflows.core_steps.fusion.detections_list_rollup.v1 import (
_merge_overlapping_bboxes,
)
# function to test
# (function code pasted above, so not repeated here for brevity)
# ---------------------------
# Basic Test Cases
# ---------------------------
def test_empty_input_returns_empty():
# Test that an empty input returns an empty list
codeflash_output = _merge_overlapping_bboxes([], "max")
result = codeflash_output # 761ns -> 751ns (1.33% faster)
def test_single_bbox():
# Test with a single bounding box
preds = [{"bbox": [0, 0, 10, 10], "confidence": 0.8, "class_id": 1}]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 83.4μs -> 14.4μs (478% faster)
def test_no_overlap_returns_individual_boxes():
# Two bboxes, no overlap, should not be merged
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
{"bbox": [20, 20, 30, 30], "confidence": 0.9, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 116μs -> 17.6μs (558% faster)
bboxes = [r["bbox"].tolist() for r in result]
def test_overlap_merges_boxes():
# Two overlapping bboxes should be merged
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.9, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 123μs -> 18.2μs (581% faster)
merged_bbox = result[0]["bbox"]
def test_overlap_merges_boxes_mean_confidence():
# Two overlapping bboxes, mean confidence
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "mean")
result = codeflash_output # 146μs -> 46.3μs (216% faster)
def test_overlap_merges_boxes_min_confidence():
# Two overlapping bboxes, min confidence
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "min")
result = codeflash_output # 121μs -> 17.7μs (586% faster)
def test_different_classes_not_merged():
# Overlapping boxes with different class_ids should not be merged
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 2},
]
# The function merges only if class_id matches, so these should not merge
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 120μs -> 17.5μs (587% faster)
class_ids = [r["class_id"] for r in result]
# ---------------------------
# Edge Test Cases
# ---------------------------
def test_overlap_threshold_zero_merges_any_overlap():
# Two boxes with partial overlap, threshold 0.0, should merge
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
{"bbox": [9, 9, 20, 20], "confidence": 0.9, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.0)
result = codeflash_output # 119μs -> 17.5μs (583% faster)
def test_overlap_threshold_high_does_not_merge():
# Two boxes with small overlap, high threshold, should not merge
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
{"bbox": [9, 9, 20, 20], "confidence": 0.9, "class_id": 1},
]
# Calculate IoU manually: intersection is 1x1=1, area1=100, area2=121, union=220
# IoU = 1/220 ~ 0.0045, so threshold 0.01 should not merge
codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.01)
result = codeflash_output # 120μs -> 19.7μs (508% faster)
def test_exactly_touching_boxes_not_merged():
# Two boxes that touch at the edge, but do not overlap
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
{"bbox": [10, 0, 20, 10], "confidence": 0.7, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 118μs -> 19.1μs (521% faster)
def test_fully_contained_box_merges():
# One box fully inside another, should merge
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
{"bbox": [2, 2, 8, 8], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 119μs -> 17.3μs (591% faster)
def test_chain_of_overlaps_merges_all():
# Boxes that overlap in a chain (A overlaps B, B overlaps C, but A does not overlap C)
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
{"bbox": [9, 0, 19, 10], "confidence": 0.6, "class_id": 1},
{"bbox": [18, 0, 28, 10], "confidence": 0.7, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 152μs -> 20.5μs (642% faster)
def test_multiple_groups():
# Multiple groups of overlapping boxes
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
{"bbox": [8, 0, 18, 10], "confidence": 0.6, "class_id": 1},
{"bbox": [30, 30, 40, 40], "confidence": 0.9, "class_id": 1},
{"bbox": [38, 30, 48, 40], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 179μs -> 23.9μs (649% faster)
bboxes = [r["bbox"].tolist() for r in result]
def test_boxes_with_negative_coordinates():
# Boxes with negative coordinates should merge correctly
preds = [
{"bbox": [-10, -10, 0, 0], "confidence": 0.8, "class_id": 1},
{"bbox": [-5, -5, 5, 5], "confidence": 0.9, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 120μs -> 17.9μs (576% faster)
def test_boxes_with_zero_area():
# Boxes with zero area should not merge unless they overlap
preds = [
{"bbox": [0, 0, 0, 10], "confidence": 0.5, "class_id": 1}, # zero width
{"bbox": [5, 5, 15, 5], "confidence": 0.6, "class_id": 1}, # zero height
{"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 174μs -> 23.7μs (637% faster)
def test_invalid_confidence_strategy_defaults_to_max():
# Unknown confidence strategy should default to max
preds = [
{"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
{"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
]
codeflash_output = _merge_overlapping_bboxes(preds, "unknown_strategy")
result = codeflash_output # 120μs -> 17.5μs (590% faster)
# ---------------------------
# Large Scale Test Cases
# ---------------------------
def test_large_number_of_non_overlapping_boxes():
# 500 non-overlapping boxes should each remain separate
preds = [
{
"bbox": [i * 10, i * 10, i * 10 + 5, i * 10 + 5],
"confidence": 0.5 + 0.001 * i,
"class_id": 1,
}
for i in range(500)
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 12.8ms -> 1.54ms (731% faster)
# Check a few random boxes
for idx in [0, 100, 499]:
found = any(
np.allclose(r["bbox"], [idx * 10, idx * 10, idx * 10 + 5, idx * 10 + 5])
for r in result
)
def test_large_number_of_overlapping_boxes():
# 500 overlapping boxes (all overlap), should merge into one
preds = [
{"bbox": [0, 0, 10 + i, 10 + i], "confidence": 0.5 + 0.001 * i, "class_id": 1}
for i in range(500)
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 358ms -> 165ms (117% faster)
def test_large_number_of_groups():
# 100 groups of 5 overlapping boxes each, should merge into 100 boxes
preds = []
for group in range(100):
base = group * 100
for i in range(5):
preds.append(
{
"bbox": [base + i, base + i, base + i + 10, base + i + 10],
"confidence": 0.5 + 0.01 * i,
"class_id": 1,
}
)
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 15.5ms -> 1.93ms (707% faster)
# Each group should be merged into one box
for group in range(100):
base = group * 100
found = any(
np.allclose(r["bbox"], [base, base, base + 14, base + 14]) for r in result
)
def test_large_scale_mixed_classes():
# 500 boxes, alternating class_ids, all overlap
preds = []
for i in range(500):
preds.append(
{
"bbox": [0, 0, 10 + i, 10 + i],
"confidence": 0.5 + 0.001 * i,
"class_id": i % 2,
}
)
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 363ms -> 162ms (123% faster)
class_ids = set(r["class_id"] for r in result)
# Each merged box should have the max bbox for its class
for cid in [0, 1]:
relevant = [p for p in preds if p["class_id"] == cid]
max_bbox = [
0,
0,
max(p["bbox"][2] for p in relevant),
max(p["bbox"][3] for p in relevant),
]
found = any(
np.allclose(r["bbox"], max_bbox) and r["class_id"] == cid for r in result
)
def test_large_scale_performance():
# Not a strict performance test, but checks function completes on 999 boxes
preds = [
{"bbox": [i, i, i + 10, i + 10], "confidence": 0.5, "class_id": 1}
for i in range(999)
]
codeflash_output = _merge_overlapping_bboxes(preds, "max")
result = codeflash_output # 53.0ms -> 13.3ms (298% faster)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.To test or edit this optimization locally git merge codeflash/optimize-pr1853-2026-01-05T20.00.36
Click to see suggested changes
| # Create boxes as Polygons for spatial indexing | |
| boxes = [] | |
| for pred in predictions: | |
| x_min, y_min, x_max, y_max = pred["bbox"] | |
| # Create box polygon (coordinates: [bottom-left, bottom-right, top-right, top-left]) | |
| box = Polygon([(x_min, y_min), (x_max, y_min), (x_max, y_max), (x_min, y_max)]) | |
| boxes.append(box) | |
| tree = STRtree(boxes) | |
| # Check candidate pairs identified by spatial index | |
| checked_pairs = set() | |
| for i in range(n): | |
| box1 = boxes[i] | |
| # Query for boxes that intersect the bounding box | |
| candidates = tree.query(box1, predicate="intersects") | |
| for j in candidates: | |
| if i >= j or (i, j) in checked_pairs or (j, i) in checked_pairs: | |
| continue | |
| checked_pairs.add((i, j)) | |
| bbox1 = predictions[i]["bbox"] | |
| bbox2 = predictions[j]["bbox"] | |
| iou = bbox_iou(bbox1, bbox2) | |
| if overlap_threshold <= 0.0: | |
| # Merge if they overlap at all | |
| if iou > 0: | |
| union(i, j) | |
| else: | |
| # Merge only if IoU exceeds threshold | |
| if iou >= overlap_threshold: | |
| union(i, j) | |
| # Create sorted indices by x_min for sweep-line approach | |
| bboxes = [pred["bbox"] for pred in predictions] | |
| sorted_indices = sorted(range(n), key=lambda i: bboxes[i][0]) | |
| # Use sweep-line to find overlapping candidates | |
| for i in range(n): | |
| idx_i = sorted_indices[i] | |
| bbox1 = bboxes[idx_i] | |
| x1_min, y1_min, x1_max, y1_max = bbox1 | |
| # Only check boxes that could overlap in x dimension | |
| for j in range(i + 1, n): | |
| idx_j = sorted_indices[j] | |
| bbox2 = bboxes[idx_j] | |
| x2_min, y2_min, x2_max, y2_max = bbox2 | |
| # Early termination: if x2_min > x1_max, no further boxes can overlap | |
| if x2_min > x1_max: | |
| break | |
| # Quick rejection test for y dimension | |
| if y2_min > y1_max or y2_max < y1_min: | |
| continue | |
| iou = bbox_iou(bbox1, bbox2) | |
| if overlap_threshold <= 0.0: | |
| # Merge if they overlap at all | |
| if iou > 0: | |
| union(idx_i, idx_j) | |
| else: | |
| # Merge only if IoU exceeds threshold | |
| if iou >= overlap_threshold: | |
| union(idx_i, idx_j) |
| merged = [] | ||
| used = set() | ||
|
|
||
| for i, pred1 in enumerate(preds_with_keypoints): | ||
| if i in used: | ||
| continue | ||
|
|
||
| # Start a new merged group with this prediction | ||
| group = [pred1] | ||
| used.add(i) | ||
|
|
||
| kp1 = np.array(pred1["keypoint_data"]["keypoints_xy"]) | ||
|
|
||
| # Find all predictions that should merge with this one | ||
| for j, pred2 in enumerate(preds_with_keypoints[i + 1 :], start=i + 1): | ||
| if j in used: | ||
| continue | ||
|
|
||
| kp2 = np.array(pred2["keypoint_data"]["keypoints_xy"]) | ||
|
|
||
| # Calculate average distance between corresponding keypoints | ||
| if len(kp1) == len(kp2): | ||
| distances = np.linalg.norm(kp1 - kp2, axis=1) | ||
| avg_distance = np.mean(distances) | ||
|
|
||
| if avg_distance < keypoint_threshold: | ||
| group.append(pred2) | ||
| used.add(j) | ||
|
|
||
| # Merge the group | ||
| if len(group) == 1: | ||
| merged.append(group[0]) | ||
| else: | ||
| # Merge multiple predictions | ||
| if confidence_strategy == "max": | ||
| best_idx = np.argmax([p["confidence"] for p in group]) | ||
| confidence = group[best_idx]["confidence"] | ||
| elif confidence_strategy == "mean": | ||
| confidence = np.mean([p["confidence"] for p in group]) | ||
| else: # 'min' | ||
| confidence = np.min([p["confidence"] for p in group]) | ||
|
|
||
| # Average keypoint coordinates | ||
| all_kp_xy = [np.array(p["keypoint_data"]["keypoints_xy"]) for p in group] | ||
| merged_kp_xy = np.mean(all_kp_xy, axis=0).tolist() | ||
|
|
||
| # Average keypoint confidences if available | ||
| merged_kp_data = { | ||
| "keypoints_xy": merged_kp_xy, | ||
| "keypoints_class_name": group[0]["keypoint_data"].get( | ||
| "keypoints_class_name" | ||
| ), | ||
| "keypoints_class_id": group[0]["keypoint_data"].get( | ||
| "keypoints_class_id" | ||
| ), | ||
| } | ||
|
|
||
| if "keypoints_confidence" in group[0]["keypoint_data"]: | ||
| all_kp_conf = [ | ||
| np.array(p["keypoint_data"]["keypoints_confidence"]) for p in group | ||
| ] | ||
| merged_kp_conf = np.mean(all_kp_conf, axis=0).tolist() | ||
| merged_kp_data["keypoints_confidence"] = merged_kp_conf | ||
|
|
||
| # Average bbox coordinates | ||
| all_bboxes = np.array([p["bbox"] for p in group]) | ||
| merged_bbox = np.mean(all_bboxes, axis=0) | ||
|
|
||
| merged.append( | ||
| { | ||
| "bbox": merged_bbox, | ||
| "confidence": confidence, | ||
| "class_id": group[0]["class_id"], | ||
| "mask": None, | ||
| "keypoint_data": merged_kp_data, | ||
| } | ||
| ) | ||
|
|
||
| # Add back predictions without keypoints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡️Codeflash found 1,670% (16.70x) speedup for _merge_keypoint_detections in inference/core/workflows/core_steps/fusion/detections_list_rollup/v1.py
⏱️ Runtime : 1.61 seconds → 90.8 milliseconds (best of 78 runs)
📝 Explanation and details
The optimized code achieves a 1670% speedup by replacing the O(N²K) nested Python loop with vectorized NumPy operations that compute all pairwise keypoint distances at once.
Key Optimizations:
-
Vectorized Distance Computation: Instead of comparing keypoints pair-by-pair in nested loops, the code pre-stacks all keypoints into a single NumPy array and uses broadcasting to compute all pairwise distances simultaneously:
diffs = keypoints_arr[:, None, :, :] - keypoints_arr[None, :, :, :] dists = np.linalg.norm(diffs, axis=3) avg_dists = np.mean(dists, axis=2)
This creates an [N×N] distance matrix in one operation, eliminating the need to call
np.linalg.normthousands of times in the inner loop. -
Boolean Mask for Membership Tracking: Replaces Python
setlookups with NumPy boolean array indexing (used = np.zeros(n_preds, dtype=bool)), providing O(1) access without Python overhead. -
Batch Processing: All keypoint arrays are converted to NumPy upfront and validated for equal length, allowing the fast path to handle the common case where all detections have the same number of keypoints.
-
Fallback Path Preserved: When keypoint counts vary across detections (rare case), the code falls back to the original logic, maintaining correctness while optimizing the common case.
Why It's Faster:
- Profiler Evidence: The original code spent 92.6% of time in the nested loop (lines with
np.linalg.normandnp.meaneach taking 39-52% of runtime). The optimized version reduces this to ~27% total for the vectorized computation. - Python Loop Elimination: Removes 132,000+ iterations of Python loops and 264,000+ NumPy array allocations per execution, replaced by 3 vectorized operations.
- Cache Efficiency: Contiguous memory access patterns in the stacked array improve CPU cache utilization.
Impact on Workloads:
The function is called from merge_crop_predictions, which processes detections from tiled/cropped inference—a common pattern in object detection pipelines for handling large images. Based on the test results:
- High-density scenarios (100-500 predictions): Shows 70-4282% speedup when most predictions don't merge, as the vectorized distance matrix avoids redundant pairwise comparisons.
- Small inputs (2-10 predictions): Shows 11-89% slowdown due to NumPy setup overhead, but these cases take <100μs absolute time.
- Real-world benefit: The function is in a hot path during crop merging. For inference on large images requiring 50-100 crops with ~50 detections each, this optimization reduces merge time from seconds to milliseconds, making it negligible compared to model inference time.
The optimization particularly benefits computer vision pipelines processing high-resolution images where many crops generate numerous detections that need to be consolidated.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⏪ Replay Tests | 🔘 None Found |
| ⚙️ Existing Unit Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 🌀 Generated Regression Tests | ✅ 37 Passed |
| 📊 Tests Coverage | 97.7% |
🌀 Click to see Generated Regression Tests
from typing import List
# function to test
import numpy as np
# imports
import pytest
from inference.core.workflows.core_steps.fusion.detections_list_rollup.v1 import (
_merge_keypoint_detections,
)
# unit tests
# ----------- BASIC TEST CASES ------------
def test_empty_preds_returns_empty():
# Should return empty list when input is empty
codeflash_output = _merge_keypoint_detections(
[], "max", 10.0
) # 681ns -> 631ns (7.92% faster)
def test_no_keypoint_data_returns_input():
# Should return input unchanged if no keypoint_data present
preds = [
{"bbox": [0, 0, 1, 1], "confidence": 0.9, "class_id": 1, "mask": None},
{"bbox": [1, 1, 2, 2], "confidence": 0.8, "class_id": 1, "mask": None},
]
codeflash_output = _merge_keypoint_detections(preds, "max", 10.0)
result = codeflash_output # 1.63μs -> 1.61μs (1.24% faster)
def test_single_prediction_with_keypoints():
# Should return the single prediction unchanged
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
}
]
codeflash_output = _merge_keypoint_detections(preds, "max", 10.0)
result = codeflash_output # 10.4μs -> 90.9μs (88.6% slower)
def test_two_predictions_far_apart_not_merged():
# Should not merge if keypoints are far apart
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [10, 10, 11, 11],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[11, 12], [13, 14]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "max", 5.0)
result = codeflash_output # 54.3μs -> 76.3μs (28.8% slower)
def test_two_predictions_close_merged_max_confidence():
# Should merge if keypoints are close, use max confidence
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [0.1, 0.1, 1.1, 1.1],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "max", 0.5)
result = codeflash_output # 95.0μs -> 107μs (11.2% slower)
merged = result[0]
def test_two_predictions_close_merged_mean_confidence():
# Should merge, use mean confidence
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.7,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [0.2, 0.2, 1.2, 1.2],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 2.2], [3.2, 4.2]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "mean", 0.5)
result = codeflash_output # 90.4μs -> 112μs (19.4% slower)
merged = result[0]
def test_two_predictions_close_merged_min_confidence():
# Should merge, use min confidence
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.7,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [0.2, 0.2, 1.2, 1.2],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 2.2], [3.2, 4.2]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "min", 0.5)
result = codeflash_output # 92.4μs -> 109μs (15.4% slower)
merged = result[0]
def test_keypoints_confidence_merged_mean():
# Should merge keypoints_confidence by mean
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.7,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_confidence": [0.5, 0.7],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [0.2, 0.2, 1.2, 1.2],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 2.2], [3.2, 4.2]],
"keypoints_confidence": [0.7, 0.9],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "mean", 0.5)
result = codeflash_output # 101μs -> 114μs (11.6% slower)
merged = result[0]
def test_mixed_preds_with_and_without_keypoints():
# Should merge keypoint preds and append non-keypoint preds unchanged
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [0.1, 0.1, 1.1, 1.1],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [10, 10, 11, 11],
"confidence": 0.5,
"class_id": 2,
"mask": None,
},
]
codeflash_output = _merge_keypoint_detections(preds, "max", 0.5)
result = codeflash_output # 88.9μs -> 102μs (13.0% slower)
# ----------- EDGE TEST CASES ------------
def test_keypoints_different_lengths_not_merged():
# Should not merge predictions with different number of keypoints
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4], [5, 6]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [0.1, 0.1, 1.1, 1.1],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "max", 1.0)
result = codeflash_output # 13.4μs -> 18.6μs (28.2% slower)
def test_missing_keypoints_xy_field():
# Should treat missing keypoints_xy as a non-keypoint pred
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
# missing keypoints_xy
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [0.1, 0.1, 1.1, 1.1],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "max", 1.0)
result = codeflash_output # 8.88μs -> 62.9μs (85.9% slower)
def test_keypoint_threshold_zero():
# Should never merge if threshold is zero
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [0.1, 0.1, 1.1, 1.1],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "max", 0.0)
result = codeflash_output # 49.2μs -> 69.7μs (29.4% slower)
def test_keypoint_threshold_large_merges_all():
# Should merge all if threshold is very large
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [100, 100, 101, 101],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[101, 102], [103, 104]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "mean", 1000.0)
result = codeflash_output # 93.1μs -> 111μs (16.8% slower)
def test_keypoints_confidence_missing_in_some():
# Should handle missing keypoints_confidence gracefully
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
# missing keypoints_confidence
},
},
{
"bbox": [0.1, 0.1, 1.1, 1.1],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
"keypoints_confidence": [0.7, 0.9],
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "max", 0.5)
result = codeflash_output # 90.6μs -> 103μs (12.1% slower)
merged = [r for r in result if "keypoint_data" in r][0]
def test_preds_with_extra_fields():
# Should ignore extra fields and not break
preds = [
{
"bbox": [0, 0, 1, 1],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"extra_field": "foo",
"keypoint_data": {
"keypoints_xy": [[1, 2], [3, 4]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
{
"bbox": [0.1, 0.1, 1.1, 1.1],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"extra_field": "bar",
"keypoint_data": {
"keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
},
]
codeflash_output = _merge_keypoint_detections(preds, "max", 0.5)
result = codeflash_output # 89.3μs -> 100μs (11.2% slower)
# ----------- LARGE SCALE TEST CASES ------------
def test_large_number_of_predictions_merging():
# Create 100 predictions in a tight cluster, should all merge
N = 100
preds = []
for i in range(N):
preds.append(
{
"bbox": [i, i, i + 1, i + 1],
"confidence": 0.5 + i / (2 * N),
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [
[1 + i / 100, 2 + i / 100],
[3 + i / 100, 4 + i / 100],
],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
}
)
# All keypoints within 1 pixel, so threshold 2 should merge all
codeflash_output = _merge_keypoint_detections(preds, "mean", 2.0)
result = codeflash_output # 1.43ms -> 1.48ms (3.46% slower)
merged = result[0]
# Check that merged confidence is mean of all
expected_conf = sum(0.5 + i / (2 * N) for i in range(N)) / N
def test_large_number_of_predictions_no_merging():
# Create 100 predictions far apart, none should merge
N = 100
preds = []
for i in range(N):
preds.append(
{
"bbox": [i * 10, i * 10, i * 10 + 1, i * 10 + 1],
"confidence": 0.5 + i / (2 * N),
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [
[100 + i * 10, 200 + i * 10],
[300 + i * 10, 400 + i * 10],
],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
}
)
# All keypoints >10 pixels apart, so threshold 1 should merge none
codeflash_output = _merge_keypoint_detections(preds, "mean", 1.0)
result = codeflash_output # 60.2ms -> 1.67ms (3495% faster)
def test_large_mixed_preds_with_and_without_keypoints():
# 50 keypoint preds in cluster, 50 without keypoints
N = 50
preds = []
for i in range(N):
preds.append(
{
"bbox": [i, i, i + 1, i + 1],
"confidence": 0.5 + i / (2 * N),
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [
[1 + i / 100, 2 + i / 100],
[3 + i / 100, 4 + i / 100],
],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
}
)
for i in range(N):
preds.append(
{
"bbox": [100 + i, 100 + i, 101 + i, 101 + i],
"confidence": 0.7,
"class_id": 2,
"mask": None,
}
)
# Should merge all keypoint preds into one, and keep the rest
codeflash_output = _merge_keypoint_detections(preds, "mean", 2.0)
result = codeflash_output # 768μs -> 451μs (70.1% faster)
def test_large_preds_with_varied_keypoints_lengths():
# 10 with 2 keypoints, 10 with 3 keypoints, none should merge across groups
preds = []
for i in range(10):
preds.append(
{
"bbox": [i, i, i + 1, i + 1],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [
[1 + i / 10, 2 + i / 10],
[3 + i / 10, 4 + i / 10],
],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
}
)
for i in range(10):
preds.append(
{
"bbox": [i + 20, i + 20, i + 21, i + 21],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [
[1 + i / 10, 2 + i / 10],
[3 + i / 10, 4 + i / 10],
[5 + i / 10, 6 + i / 10],
],
"keypoints_class_name": "test",
"keypoints_class_id": 42,
},
}
)
# Should merge within each group, not across
codeflash_output = _merge_keypoint_detections(preds, "mean", 2.0)
result = codeflash_output # 398μs -> 406μs (2.16% slower)
for r in result:
pass
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.import numpy as np
# imports
import pytest # used for our unit tests
from inference.core.workflows.core_steps.fusion.detections_list_rollup.v1 import (
_merge_keypoint_detections,
)
# unit tests
# ----------- BASIC TEST CASES ------------
def test_empty_input_returns_empty():
# Should return empty list if no detections
codeflash_output = _merge_keypoint_detections(
[], "max", 10
) # 672ns -> 641ns (4.84% faster)
def test_single_detection_no_merge():
# Should return the same detection if only one present
pred = {
"bbox": [0, 0, 10, 10],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
codeflash_output = _merge_keypoint_detections([pred], "max", 5)
result = codeflash_output # 9.39μs -> 67.0μs (86.0% slower)
def test_two_detections_far_apart_no_merge():
# Should NOT merge if keypoints are far apart
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred2 = {
"bbox": [20, 20, 30, 30],
"confidence": 0.7,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[21, 21], [22, 22]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 5)
result = codeflash_output # 48.7μs -> 73.0μs (33.3% slower)
def test_two_detections_close_merge_max_confidence():
# Should merge if keypoints are close, and use max confidence
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred2 = {
"bbox": [1, 1, 11, 11],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.1, 1], [2.1, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 1.5)
result = codeflash_output # 96.9μs -> 105μs (8.21% slower)
merged = result[0]
# Bbox should be mean of both
expected_bbox = np.mean([pred1["bbox"], pred2["bbox"]], axis=0)
# Keypoints should be mean
expected_kp = np.mean(
[
np.array(pred1["keypoint_data"]["keypoints_xy"]),
np.array(pred2["keypoint_data"]["keypoints_xy"]),
],
axis=0,
)
def test_confidence_strategy_mean_and_min():
# Should use mean or min confidence as requested
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.6,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred2 = {
"bbox": [1, 1, 11, 11],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.1, 1], [2.1, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
# Mean
codeflash_output = _merge_keypoint_detections([pred1, pred2], "mean", 1.5)
result_mean = codeflash_output # 89.8μs -> 110μs (18.6% slower)
# Min
codeflash_output = _merge_keypoint_detections([pred1, pred2], "min", 1.5)
result_min = codeflash_output # 72.7μs -> 87.8μs (17.2% slower)
def test_keypoint_confidence_merging():
# Should merge keypoint confidences if present
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.6,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2]],
"keypoints_confidence": [0.5, 0.7],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred2 = {
"bbox": [1, 1, 11, 11],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.1, 1], [2.1, 2]],
"keypoints_confidence": [0.9, 0.3],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
codeflash_output = _merge_keypoint_detections([pred1, pred2], "mean", 1.5)
result = codeflash_output # 101μs -> 116μs (12.6% slower)
merged = result[0]
expected_conf = np.mean([[0.5, 0.7], [0.9, 0.3]], axis=0)
def test_detections_without_keypoints_are_not_merged():
# Detections without keypoints should be returned as-is
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.6,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred2 = {
"bbox": [20, 20, 30, 30],
"confidence": 0.7,
"class_id": 2,
"mask": None,
"keypoint_data": None,
}
codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 5)
result = codeflash_output # 9.27μs -> 63.4μs (85.4% slower)
def test_keypoint_data_missing_keypoints_xy():
# Detections with keypoint_data but missing keypoints_xy should not be merged
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.6,
"class_id": 1,
"mask": None,
"keypoint_data": {"keypoints_class_name": "A", "keypoints_class_id": 1},
}
pred2 = {
"bbox": [1, 1, 11, 11],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.1, 1], [2.1, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 1.5)
result = codeflash_output # 9.62μs -> 60.8μs (84.2% slower)
# ----------- EDGE TEST CASES ------------
def test_keypoints_different_lengths_not_merged():
# Detections with different number of keypoints should not be merged
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2], [3, 3]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred2 = {
"bbox": [1, 1, 11, 11],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.1, 1], [2.1, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 2)
result = codeflash_output # 13.8μs -> 18.9μs (27.0% slower)
def test_keypoint_threshold_zero():
# With threshold zero, only identical keypoints merge
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred2 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred3 = {
"bbox": [1, 1, 11, 11],
"confidence": 0.7,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.1, 1], [2.1, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
codeflash_output = _merge_keypoint_detections([pred1, pred2, pred3], "max", 0)
result = codeflash_output # 92.5μs -> 80.3μs (15.1% faster)
def test_all_detections_merged():
# All detections merged if threshold is very large
preds = []
for i in range(5):
preds.append(
{
"bbox": [i, i, i + 10, i + 10],
"confidence": 0.5 + i * 0.1,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[i + 1, i + 1], [i + 2, i + 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
)
codeflash_output = _merge_keypoint_detections(preds, "max", 1000)
result = codeflash_output # 148μs -> 122μs (22.1% faster)
def test_missing_keypoint_data_field():
# If keypoint_data is missing, detection should not be merged
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.8,
"class_id": 1,
"mask": None,
# No keypoint_data
}
pred2 = {
"bbox": [1, 1, 11, 11],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.1, 1], [2.1, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 1.5)
result = codeflash_output # 9.94μs -> 61.8μs (83.9% slower)
def test_multiple_groups_merging():
# Should correctly merge multiple groups separately
pred1 = {
"bbox": [0, 0, 10, 10],
"confidence": 0.7,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1, 1], [2, 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred2 = {
"bbox": [1, 1, 11, 11],
"confidence": 0.8,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[1.2, 1.1], [2.1, 2.1]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
pred3 = {
"bbox": [20, 20, 30, 30],
"confidence": 0.9,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[21, 21], [22, 22]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
codeflash_output = _merge_keypoint_detections([pred1, pred2, pred3], "max", 2)
result = codeflash_output # 115μs -> 114μs (0.595% faster)
confidences = [p["confidence"] for p in result]
# ----------- LARGE SCALE TEST CASES ------------
def test_large_scale_merging():
# Merge a large number of close detections
N = 500
preds = []
for i in range(N):
preds.append(
{
"bbox": [i, i, i + 10, i + 10],
"confidence": 0.5 + (i % 10) * 0.01,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[i + 1, i + 1], [i + 2, i + 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
)
# All keypoints are close enough to merge
codeflash_output = _merge_keypoint_detections(preds, "max", 1000)
result = codeflash_output # 7.28ms -> 26.8ms (72.8% slower)
# Confidence should be max of all
expected_conf = max([p["confidence"] for p in preds])
# Bbox should be mean of all
expected_bbox = np.mean([p["bbox"] for p in preds], axis=0)
def test_large_scale_no_merging():
# Large number of detections, all far apart
N = 500
preds = []
for i in range(N):
preds.append(
{
"bbox": [i * 100, i * 100, i * 100 + 10, i * 100 + 10],
"confidence": 0.5 + (i % 10) * 0.01,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [
[i * 100 + 1, i * 100 + 1],
[i * 100 + 2, i * 100 + 2],
],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
)
# No keypoints close enough to merge
codeflash_output = _merge_keypoint_detections(preds, "max", 1)
result = codeflash_output # 1.51s -> 34.5ms (4282% faster)
# All original detections should be present
for p in preds:
pass
def test_large_scale_mixed_keypoints_and_non_keypoints():
# Mix of detections with and without keypoints
N = 250
preds = []
for i in range(N):
preds.append(
{
"bbox": [i, i, i + 10, i + 10],
"confidence": 0.5 + (i % 10) * 0.01,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[i + 1, i + 1], [i + 2, i + 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
)
for i in range(N):
preds.append(
{
"bbox": [i + 1000, i + 1000, i + 1010, i + 1010],
"confidence": 0.6,
"class_id": 2,
"mask": None,
"keypoint_data": None,
}
)
# Only the first N should merge, rest stay separate
codeflash_output = _merge_keypoint_detections(preds, "max", 1000)
result = codeflash_output # 3.64ms -> 5.89ms (38.2% slower)
def test_large_scale_multiple_groups():
# Large number of detections forming two separate groups
N = 200
preds = []
# First group: close keypoints
for i in range(N):
preds.append(
{
"bbox": [i, i, i + 10, i + 10],
"confidence": 0.5 + (i % 10) * 0.01,
"class_id": 1,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[i + 1, i + 1], [i + 2, i + 2]],
"keypoints_class_name": "A",
"keypoints_class_id": 1,
},
}
)
# Second group: far keypoints
for i in range(N):
preds.append(
{
"bbox": [i + 1000, i + 1000, i + 1010, i + 1010],
"confidence": 0.6,
"class_id": 2,
"mask": None,
"keypoint_data": {
"keypoints_xy": [[i + 1001, i + 1001], [i + 1002, i + 1002]],
"keypoints_class_name": "B",
"keypoints_class_id": 2,
},
}
)
codeflash_output = _merge_keypoint_detections(preds, "max", 100)
result = codeflash_output # 18.1ms -> 17.3ms (4.69% faster)
# One for class_id 1, one for class_id 2
class_ids = set(p["class_id"] for p in result)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.To test or edit this optimization locally git merge codeflash/optimize-pr1853-2026-01-05T20.02.38
Click to see suggested changes
| merged = [] | |
| used = set() | |
| for i, pred1 in enumerate(preds_with_keypoints): | |
| if i in used: | |
| continue | |
| # Start a new merged group with this prediction | |
| group = [pred1] | |
| used.add(i) | |
| kp1 = np.array(pred1["keypoint_data"]["keypoints_xy"]) | |
| # Find all predictions that should merge with this one | |
| for j, pred2 in enumerate(preds_with_keypoints[i + 1 :], start=i + 1): | |
| if j in used: | |
| continue | |
| kp2 = np.array(pred2["keypoint_data"]["keypoints_xy"]) | |
| # Calculate average distance between corresponding keypoints | |
| if len(kp1) == len(kp2): | |
| distances = np.linalg.norm(kp1 - kp2, axis=1) | |
| avg_distance = np.mean(distances) | |
| if avg_distance < keypoint_threshold: | |
| group.append(pred2) | |
| used.add(j) | |
| # Merge the group | |
| if len(group) == 1: | |
| merged.append(group[0]) | |
| else: | |
| # Merge multiple predictions | |
| if confidence_strategy == "max": | |
| best_idx = np.argmax([p["confidence"] for p in group]) | |
| confidence = group[best_idx]["confidence"] | |
| elif confidence_strategy == "mean": | |
| confidence = np.mean([p["confidence"] for p in group]) | |
| else: # 'min' | |
| confidence = np.min([p["confidence"] for p in group]) | |
| # Average keypoint coordinates | |
| all_kp_xy = [np.array(p["keypoint_data"]["keypoints_xy"]) for p in group] | |
| merged_kp_xy = np.mean(all_kp_xy, axis=0).tolist() | |
| # Average keypoint confidences if available | |
| merged_kp_data = { | |
| "keypoints_xy": merged_kp_xy, | |
| "keypoints_class_name": group[0]["keypoint_data"].get( | |
| "keypoints_class_name" | |
| ), | |
| "keypoints_class_id": group[0]["keypoint_data"].get( | |
| "keypoints_class_id" | |
| ), | |
| } | |
| if "keypoints_confidence" in group[0]["keypoint_data"]: | |
| all_kp_conf = [ | |
| np.array(p["keypoint_data"]["keypoints_confidence"]) for p in group | |
| ] | |
| merged_kp_conf = np.mean(all_kp_conf, axis=0).tolist() | |
| merged_kp_data["keypoints_confidence"] = merged_kp_conf | |
| # Average bbox coordinates | |
| all_bboxes = np.array([p["bbox"] for p in group]) | |
| merged_bbox = np.mean(all_bboxes, axis=0) | |
| merged.append( | |
| { | |
| "bbox": merged_bbox, | |
| "confidence": confidence, | |
| "class_id": group[0]["class_id"], | |
| "mask": None, | |
| "keypoint_data": merged_kp_data, | |
| } | |
| ) | |
| # Add back predictions without keypoints | |
| n_preds = len(preds_with_keypoints) | |
| # Pre-convert all keypoints_xy's to a single numpy array for fast slicing | |
| kp_list = [] | |
| kp_len = None | |
| for p in preds_with_keypoints: | |
| arr = np.asarray(p["keypoint_data"]["keypoints_xy"]) | |
| if kp_len is None: | |
| kp_len = len(arr) | |
| elif kp_len != len(arr): | |
| kp_len = None | |
| break | |
| kp_list.append(arr) | |
| if kp_len is not None: | |
| # All have equal keypoint length, stack for fast vectorized comparison | |
| keypoints_arr = np.stack(kp_list, axis=0) | |
| same_size = True | |
| else: | |
| # Fallback to original slow path if varying keypoint length | |
| keypoints_arr = None | |
| same_size = False | |
| merged = [] | |
| used = np.zeros(n_preds, dtype=bool) # bool mask instead of set for O(1) checks | |
| if same_size: | |
| # Compute an upper triangular pairwise matrix of average distances | |
| diffs = keypoints_arr[:, None, :, :] - keypoints_arr[None, :, :, :] | |
| dists = np.linalg.norm(diffs, axis=3) # [i, j, k] -- k is keypoint index | |
| avg_dists = np.mean(dists, axis=2) # [i, j] mean over keypoints | |
| for i in range(n_preds): | |
| if used[i]: | |
| continue | |
| # Find all remaining j > i such that avg_distance < keypoint_threshold | |
| mask = (~used) & (avg_dists[i] < keypoint_threshold) | |
| # Only look at indices >= i, to preserve greedy merge order as before | |
| mask[: i + 1] = False | |
| group_indices = [i] | |
| for j in range(i + 1, n_preds): | |
| if mask[j]: | |
| group_indices.append(j) | |
| used[j] = True | |
| used[i] = True | |
| group = [preds_with_keypoints[idx] for idx in group_indices] | |
| if len(group) == 1: | |
| merged.append(group[0]) | |
| else: # 'min' | |
| if confidence_strategy == "max": | |
| best_idx = np.argmax([p["confidence"] for p in group]) | |
| confidence = group[best_idx]["confidence"] | |
| elif confidence_strategy == "mean": | |
| confidence = np.mean([p["confidence"] for p in group]) | |
| else: # 'min' | |
| confidence = np.min([p["confidence"] for p in group]) | |
| all_kp_xy = np.array( | |
| [p["keypoint_data"]["keypoints_xy"] for p in group] | |
| ) | |
| merged_kp_xy = np.mean(all_kp_xy, axis=0).tolist() | |
| merged_kp_data = { | |
| "keypoints_xy": merged_kp_xy, | |
| "keypoints_class_name": group[0]["keypoint_data"].get( | |
| "keypoints_class_name" | |
| ), | |
| "keypoints_class_id": group[0]["keypoint_data"].get( | |
| "keypoints_class_id" | |
| ), | |
| } | |
| if "keypoints_confidence" in group[0]["keypoint_data"]: | |
| all_kp_conf = np.array( | |
| [p["keypoint_data"]["keypoints_confidence"] for p in group] | |
| ) | |
| merged_kp_conf = np.mean(all_kp_conf, axis=0).tolist() | |
| merged_kp_data["keypoints_confidence"] = merged_kp_conf | |
| all_bboxes = np.array([p["bbox"] for p in group]) | |
| merged_bbox = np.mean(all_bboxes, axis=0) | |
| merged.append( | |
| { | |
| "bbox": merged_bbox, | |
| "confidence": confidence, | |
| "class_id": group[0]["class_id"], | |
| "mask": None, | |
| "keypoint_data": merged_kp_data, | |
| } | |
| ) | |
| else: | |
| # Fallback: original slow python-path for variable keypoint lengths | |
| used_set = set() | |
| for i, pred1 in enumerate(preds_with_keypoints): | |
| if i in used_set: | |
| continue | |
| group = [pred1] | |
| used_set.add(i) | |
| kp1 = np.array(pred1["keypoint_data"]["keypoints_xy"]) | |
| for j, pred2 in enumerate(preds_with_keypoints[i + 1 :], start=i + 1): | |
| if j in used_set: | |
| continue | |
| kp2 = np.array(pred2["keypoint_data"]["keypoints_xy"]) | |
| if len(kp1) == len(kp2): | |
| distances = np.linalg.norm(kp1 - kp2, axis=1) | |
| avg_distance = np.mean(distances) | |
| if avg_distance < keypoint_threshold: | |
| group.append(pred2) | |
| used_set.add(j) | |
| if len(group) == 1: | |
| merged.append(group[0]) | |
| else: | |
| if confidence_strategy == "max": | |
| best_idx = np.argmax([p["confidence"] for p in group]) | |
| confidence = group[best_idx]["confidence"] | |
| elif confidence_strategy == "mean": | |
| confidence = np.mean([p["confidence"] for p in group]) | |
| else: | |
| confidence = np.min([p["confidence"] for p in group]) | |
| all_kp_xy = [ | |
| np.array(p["keypoint_data"]["keypoints_xy"]) for p in group | |
| ] | |
| merged_kp_xy = np.mean(all_kp_xy, axis=0).tolist() | |
| merged_kp_data = { | |
| "keypoints_xy": merged_kp_xy, | |
| "keypoints_class_name": group[0]["keypoint_data"].get( | |
| "keypoints_class_name" | |
| ), | |
| "keypoints_class_id": group[0]["keypoint_data"].get( | |
| "keypoints_class_id" | |
| ), | |
| } | |
| if "keypoints_confidence" in group[0]["keypoint_data"]: | |
| all_kp_conf = [ | |
| np.array(p["keypoint_data"]["keypoints_confidence"]) | |
| for p in group | |
| ] | |
| merged_kp_conf = np.mean(all_kp_conf, axis=0).tolist() | |
| merged_kp_data["keypoints_confidence"] = merged_kp_conf | |
| all_bboxes = np.array([p["bbox"] for p in group]) | |
| merged_bbox = np.mean(all_bboxes, axis=0) | |
| merged.append( | |
| { | |
| "bbox": merged_bbox, | |
| "confidence": confidence, | |
| "class_id": group[0]["class_id"], | |
| "mask": None, | |
| "keypoint_data": merged_kp_data, | |
| } | |
| ) | |
| # Add back predictions without keypoints | |
| # Add back predictions without keypoints |
Description
Adds "detections list rollup" - a highly needed block that takes the detections from inferences run on dynamic crops and bumps them back up into whatever parent detection coordinate space they came from. This potentially requires some detection merging - masks and bounding boxes are merged based on IoU thresholds (with fully merging detections being the default). Keypoints can be merged with a specified, optional radius (defaults to 10px). This will work with any combination of object detection, segmentation and keypoint predictions (but it'll only ever pull out the coordinate system for the parent).
An example (we have a number of these) would be dynamic cropping of text, running OCR on that text, and then placing the detected text on the original image that the dynamic crops came from.
While I wanted to have the parent input simply be the detection from the dynamic crop, the workflow engine doesn't allow mixing parameters with different dimensionality. So it's necessary to use a dimensionality reduction block before this block (if there's any way around this, I'd be happy to implement it to keep use of the block simpler).
Most of this was vibe coded. It took quite a bit of iteration to get the options and output format correct, but I've left all of the generated code intact.
IMPORTANT: This makes the assumption that the detections used for the crops are in the same order as the list resulting from dimensionality reduction. As far as I can tell, that's true but if there's a better way to make the association, such as a guid, I'm open to it. I couldn't find one though.
Test workflow (also used for integration tests)

test_workflow.json
Given the following dynamic crops for person detection:

Parent: Object Detection, Child: Segmentation, IoU Threshold: 0

Parent: Object Detection, Child: Object Detection, IoU Threshold: 0 (bounding boxes merged)

Parent: Object Detection, Child: Object Detection, IoU Threshold: 1 (bounding boxes not merged)

The keypoint tests below add 300px padding around the crops to test keypoint merging.
Parent: Object Detection, Child: Keypoint Detection, IoU Threshold: 0, Keypoint Merge Radius: 10 (default)

Parent: Object Detection, Child: Keypoint Detection, IoU Threshold: 0, Keypoint Merge Radius: 0

Type of change
Please delete options that are not relevant.
How has this change been tested, please provide a testcase or example of how you tested the change?
Included integration tests and test workflow. All combinations of object detection, segmentation, and keypoint detection have been tested.
Any specific deployment considerations
N/A
Docs