Skip to content

Conversation

@lou-roboflow
Copy link
Collaborator

@lou-roboflow lou-roboflow commented Dec 24, 2025

Description

Adds "detections list rollup" - a highly needed block that takes the detections from inferences run on dynamic crops and bumps them back up into whatever parent detection coordinate space they came from. This potentially requires some detection merging - masks and bounding boxes are merged based on IoU thresholds (with fully merging detections being the default). Keypoints can be merged with a specified, optional radius (defaults to 10px). This will work with any combination of object detection, segmentation and keypoint predictions (but it'll only ever pull out the coordinate system for the parent).

An example (we have a number of these) would be dynamic cropping of text, running OCR on that text, and then placing the detected text on the original image that the dynamic crops came from.

While I wanted to have the parent input simply be the detection from the dynamic crop, the workflow engine doesn't allow mixing parameters with different dimensionality. So it's necessary to use a dimensionality reduction block before this block (if there's any way around this, I'd be happy to implement it to keep use of the block simpler).

Most of this was vibe coded. It took quite a bit of iteration to get the options and output format correct, but I've left all of the generated code intact.

IMPORTANT: This makes the assumption that the detections used for the crops are in the same order as the list resulting from dimensionality reduction. As far as I can tell, that's true but if there's a better way to make the association, such as a guid, I'm open to it. I couldn't find one though.

Test workflow (also used for integration tests)
test_workflow.json
image

Given the following dynamic crops for person detection:
image

Parent: Object Detection, Child: Segmentation, IoU Threshold: 0
image

Parent: Object Detection, Child: Object Detection, IoU Threshold: 0 (bounding boxes merged)
image

Parent: Object Detection, Child: Object Detection, IoU Threshold: 1 (bounding boxes not merged)
image

The keypoint tests below add 300px padding around the crops to test keypoint merging.

Parent: Object Detection, Child: Keypoint Detection, IoU Threshold: 0, Keypoint Merge Radius: 10 (default)
image

Parent: Object Detection, Child: Keypoint Detection, IoU Threshold: 0, Keypoint Merge Radius: 0
image

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

How has this change been tested, please provide a testcase or example of how you tested the change?

Included integration tests and test workflow. All combinations of object detection, segmentation, and keypoint detection have been tested.

Any specific deployment considerations

N/A

Docs

  • Docs updated? What were the changes: Inline

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new "Dimension Roll Up" block that merges detections from secondary models run on dynamic crops back into parent image coordinates. This is a highly needed feature for workflows involving nested detection pipelines (e.g., cropping and running inference on crops, then consolidating results).

Key changes:

  • Implements merging of bounding boxes and instance segmentation masks from child detections back to parent coordinates
  • Supports configurable confidence strategies (max, mean, min) and overlap thresholds for merging overlapping detections
  • Uses IoU-based merging with union-find algorithm to group overlapping detections per class

Reviewed changes

Copilot reviewed 2 out of 3 changed files in this pull request and generated 18 comments.

File Description
inference/core/workflows/core_steps/loader.py Registers the new DimensionRollUpBlockV1 block by adding import and including it in the load_blocks() list
inference/core/workflows/core_steps/fusion/dimension_rollup/v1.py Complete implementation of the dimension rollup block with manifest, transformation logic, mask/bbox merging algorithms, and helper functions for polygon conversion
inference/core/workflows/core_steps/fusion/dimension_rollup/__init__.py Empty init file for the new dimension_rollup module

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@lou-roboflow lou-roboflow marked this pull request as draft December 24, 2025 21:12
Copy link

Copilot AI commented Dec 26, 2025

@lou-roboflow I've opened a new pull request, #1855, to work on those changes. Once the pull request is ready, I'll request review from you.

@lou-roboflow lou-roboflow marked this pull request as ready for review December 26, 2025 22:25
@grzegorz-roboflow grzegorz-roboflow merged commit 450dede into main Jan 5, 2026
51 checks passed
@grzegorz-roboflow grzegorz-roboflow deleted the RollUpDetectionsBlock branch January 5, 2026 19:35
Comment on lines +964 to +998
# Create boxes as Polygons for spatial indexing
boxes = []
for pred in predictions:
x_min, y_min, x_max, y_max = pred["bbox"]
# Create box polygon (coordinates: [bottom-left, bottom-right, top-right, top-left])
box = Polygon([(x_min, y_min), (x_max, y_min), (x_max, y_max), (x_min, y_max)])
boxes.append(box)

tree = STRtree(boxes)

# Check candidate pairs identified by spatial index
checked_pairs = set()
for i in range(n):
box1 = boxes[i]
# Query for boxes that intersect the bounding box
candidates = tree.query(box1, predicate="intersects")

for j in candidates:
if i >= j or (i, j) in checked_pairs or (j, i) in checked_pairs:
continue
checked_pairs.add((i, j))

bbox1 = predictions[i]["bbox"]
bbox2 = predictions[j]["bbox"]

iou = bbox_iou(bbox1, bbox2)

if overlap_threshold <= 0.0:
# Merge if they overlap at all
if iou > 0:
union(i, j)
else:
# Merge only if IoU exceeds threshold
if iou >= overlap_threshold:
union(i, j)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚡️Codeflash found 136% (1.36x) speedup for _merge_overlapping_bboxes in inference/core/workflows/core_steps/fusion/detections_list_rollup/v1.py

⏱️ Runtime : 825 milliseconds 349 milliseconds (best of 20 runs)

📝 Explanation and details

The optimized code achieves a 136% speedup by replacing Shapely's spatial indexing (STRtree with Polygon objects) with a sweep-line algorithm that operates directly on bounding box coordinates.

Key Optimization

Removed Shapely dependency overhead: The original code created Polygon objects for each bounding box and built an STRtree spatial index. While STRtree helps avoid O(n²) comparisons, the overhead of creating Polygons and building/querying the tree dominates for typical detection counts.

Implemented sweep-line with early termination: The optimized version:

  1. Sorts bounding boxes by x_min coordinate once (O(n log n))
  2. For each box, only checks boxes that could overlap in the x-dimension
  3. Uses early termination: when x2_min > x1_max, all remaining sorted boxes can't overlap
  4. Adds a quick y-dimension rejection test before computing IoU

Performance Analysis

From line profiler results, the original code spent:

  • 4% creating Polygon objects (244ms out of 6.17s)
  • 3% querying STRtree (182ms)
  • 30.9% computing IoU for 262,805 pairs
  • 35.6% in union operations

The optimized version:

  • 0.1% sorting indices (3.27ms)
  • 37% computing IoU for the same 262,805 pairs
  • 40.2% in union operations
  • Eliminates all Shapely overhead

The sweep-line approach maintains the same candidate pair count (262,805) as STRtree but with ~490ms less overhead from polygon creation and tree operations.

Impact on Workloads

Based on function_references, this function is called from merge_crop_predictions which processes detections from multiple image crops. The optimization is particularly valuable when:

  • Processing many crops with moderate detection counts (typical CV pipelines)
  • The function is in a hot path: called once per batch of crops being merged
  • Test results show 544-822% speedup for small-to-medium cases (2-100 detections), making per-crop processing nearly instantaneous
  • For large overlapping cases (500 detections), the speedup is 117-123%, as IoU computation still dominates

The sweep-line algorithm scales better than STRtree for the typical detection counts (tens to low hundreds) seen in object detection, while avoiding Python/C++ boundary crossing overhead from Shapely operations.

Correctness verification report:

Test Status
⏪ Replay Tests 🔘 None Found
⚙️ Existing Unit Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
🌀 Generated Regression Tests 43 Passed
📊 Tests Coverage 96.6%
🌀 Click to see Generated Regression Tests
from typing import List

# function to test (copied from above)
import numpy as np

# imports
import pytest  # used for our unit tests
from inference.core.workflows.core_steps.fusion.detections_list_rollup.v1 import (
    _merge_overlapping_bboxes,
)
from shapely.geometry import Polygon
from shapely.strtree import STRtree

# unit tests

# ----------- BASIC TEST CASES ------------


def test_empty_input():
    # Should return empty list for no predictions
    codeflash_output = _merge_overlapping_bboxes([], "max")
    result = codeflash_output  # 831ns -> 842ns (1.31% slower)


def test_single_bbox():
    # Should return the same bbox unchanged
    bbox = [0, 0, 10, 10]
    pred = {"bbox": bbox, "confidence": 0.9, "class_id": 1}
    codeflash_output = _merge_overlapping_bboxes([pred], "max")
    result = codeflash_output  # 113μs -> 16.7μs (580% faster)


def test_non_overlapping_bboxes():
    # Should not merge bboxes that do not overlap
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
        {"bbox": [20, 20, 30, 30], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 122μs -> 19.0μs (544% faster)
    # Each output bbox should match one input bbox
    for out in result:
        pass


def test_two_overlapping_bboxes():
    # Should merge two overlapping bboxes
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 126μs -> 18.6μs (584% faster)
    # The merged bbox should cover both
    expected_bbox = np.array([0, 0, 15, 15])


def test_three_chain_overlapping_bboxes():
    # Should merge all three if each overlaps with next
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
        {"bbox": [9, 0, 19, 10], "confidence": 0.8, "class_id": 1},
        {"bbox": [18, 0, 28, 10], "confidence": 0.6, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "mean")
    result = codeflash_output  # 188μs -> 59.7μs (215% faster)
    # The merged bbox should cover all
    expected_bbox = np.array([0, 0, 28, 10])


def test_different_class_ids():
    # Should not merge bboxes of different classes
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 2},
    ]
    # Simulate merging by class: only bboxes of same class are grouped
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 124μs -> 17.9μs (595% faster)
    classes = set([r["class_id"] for r in result])


def test_confidence_strategy_min():
    # Should use min confidence when requested
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "min")
    result = codeflash_output  # 123μs -> 18.0μs (585% faster)


def test_confidence_strategy_unknown():
    # Should default to max confidence if strategy is unknown
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "foobar")
    result = codeflash_output  # 123μs -> 17.5μs (603% faster)


# ----------- EDGE TEST CASES ------------


def test_zero_area_bbox():
    # Bbox with zero area should not merge with others
    preds = [
        {"bbox": [0, 0, 0, 10], "confidence": 0.5, "class_id": 1},  # zero width
        {"bbox": [1, 1, 10, 10], "confidence": 0.9, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 123μs -> 17.8μs (596% faster)
    found_zero = any(np.all(r["bbox"] == np.array([0, 0, 0, 10])) for r in result)


def test_bbox_touching_edges_only():
    # Boxes that touch at edge but do not overlap should not be merged (IoU=0)
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.9, "class_id": 1},
        {"bbox": [10, 0, 20, 10], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 120μs -> 19.7μs (513% faster)


def test_bbox_fully_contained():
    # One box fully inside another should be merged
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
        {"bbox": [2, 2, 8, 8], "confidence": 0.9, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "mean")
    result = codeflash_output  # 143μs -> 43.8μs (229% faster)


def test_overlap_threshold_high():
    # With high overlap_threshold, boxes should not merge unless IoU >= threshold
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
        {"bbox": [8, 8, 18, 18], "confidence": 0.9, "class_id": 1},
    ]
    # IoU is low, so with threshold=0.5, should not merge
    codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.5)
    result = codeflash_output  # 123μs -> 20.2μs (511% faster)


def test_overlap_threshold_low():
    # With low overlap_threshold, boxes that overlap at all should merge
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
        {"bbox": [9, 9, 19, 19], "confidence": 0.9, "class_id": 1},
    ]
    # They overlap at the corner, so with threshold=0.0, should merge
    codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.0)
    result = codeflash_output  # 121μs -> 17.9μs (576% faster)


def test_multiple_classes_and_overlap():
    # Overlapping bboxes of different classes should not merge
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.9, "class_id": 2},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 121μs -> 17.8μs (583% faster)


def test_negative_coordinates():
    # Should handle negative coordinates correctly
    preds = [
        {"bbox": [-10, -10, 0, 0], "confidence": 0.7, "class_id": 1},
        {"bbox": [-5, -5, 5, 5], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 121μs -> 18.0μs (577% faster)
    expected_bbox = np.array([-10, -10, 5, 5])


def test_float_coordinates():
    # Should handle float coordinates
    preds = [
        {"bbox": [0.0, 0.0, 10.5, 10.5], "confidence": 0.9, "class_id": 1},
        {"bbox": [10.0, 10.0, 20.0, 20.0], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 121μs -> 18.6μs (557% faster)
    expected_bbox = np.array([0.0, 0.0, 20.0, 20.0])


def test_overlapping_bboxes_different_confidences():
    # Should merge and use correct confidence strategy
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.2, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.5, "class_id": 1},
        {"bbox": [8, 8, 18, 18], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 160μs -> 22.5μs (613% faster)


# ----------- LARGE SCALE TEST CASES ------------


def test_large_number_non_overlapping():
    # Should handle many non-overlapping bboxes efficiently
    preds = []
    for i in range(100):
        preds.append(
            {
                "bbox": [i * 10, i * 10, i * 10 + 5, i * 10 + 5],
                "confidence": 0.5,
                "class_id": 1,
            }
        )
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 2.64ms -> 306μs (762% faster)
    for i in range(100):
        found = any(
            np.all(r["bbox"] == np.array([i * 10, i * 10, i * 10 + 5, i * 10 + 5]))
            for r in result
        )


def test_large_number_chain_overlapping():
    # Should merge all into one group if each overlaps next
    preds = []
    for i in range(100):
        preds.append(
            {"bbox": [i, 0, i + 2, 2], "confidence": 0.1 + 0.001 * i, "class_id": 1}
        )
    codeflash_output = _merge_overlapping_bboxes(preds, "mean")
    result = codeflash_output  # 3.02ms -> 327μs (822% faster)
    expected_bbox = np.array([0, 0, 101, 2])
    # Confidence should be mean of all
    expected_conf = np.mean([0.1 + 0.001 * i for i in range(100)])


def test_large_number_multiple_classes():
    # Should separate by class, even if bboxes overlap
    preds = []
    for i in range(50):
        preds.append({"bbox": [i, 0, i + 10, 10], "confidence": 0.5, "class_id": 1})
        preds.append({"bbox": [i, 0, i + 10, 10], "confidence": 0.6, "class_id": 2})
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 6.73ms -> 2.27ms (196% faster)
    # Should produce two merged bboxes, one per class
    class_counts = {1: 0, 2: 0}
    for r in result:
        class_counts[r["class_id"]] += 1


def test_large_number_random_overlap():
    # Randomly overlapping boxes, some should merge, some not
    preds = []
    for i in range(50):
        # Overlap every 10th box with the previous
        if i % 10 == 0 and i > 0:
            bbox = [i - 1, 0, i + 9, 10]
        else:
            bbox = [i * 2, 0, i * 2 + 5, 5]
        preds.append({"bbox": bbox, "confidence": 0.5, "class_id": 1})
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 1.64ms -> 192μs (756% faster)


def test_large_number_overlap_threshold():
    # With high overlap_threshold, fewer merges
    preds = []
    for i in range(100):
        preds.append(
            {"bbox": [i, 0, i + 2, 2], "confidence": 0.1 + 0.001 * i, "class_id": 1}
        )
    codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.9)
    result = codeflash_output  # 3.06ms -> 471μs (549% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import numpy as np

# imports
import pytest  # used for our unit tests
from inference.core.workflows.core_steps.fusion.detections_list_rollup.v1 import (
    _merge_overlapping_bboxes,
)

# function to test
# (function code pasted above, so not repeated here for brevity)

# ---------------------------
# Basic Test Cases
# ---------------------------


def test_empty_input_returns_empty():
    # Test that an empty input returns an empty list
    codeflash_output = _merge_overlapping_bboxes([], "max")
    result = codeflash_output  # 761ns -> 751ns (1.33% faster)


def test_single_bbox():
    # Test with a single bounding box
    preds = [{"bbox": [0, 0, 10, 10], "confidence": 0.8, "class_id": 1}]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 83.4μs -> 14.4μs (478% faster)


def test_no_overlap_returns_individual_boxes():
    # Two bboxes, no overlap, should not be merged
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
        {"bbox": [20, 20, 30, 30], "confidence": 0.9, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 116μs -> 17.6μs (558% faster)
    bboxes = [r["bbox"].tolist() for r in result]


def test_overlap_merges_boxes():
    # Two overlapping bboxes should be merged
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.9, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 123μs -> 18.2μs (581% faster)
    merged_bbox = result[0]["bbox"]


def test_overlap_merges_boxes_mean_confidence():
    # Two overlapping bboxes, mean confidence
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "mean")
    result = codeflash_output  # 146μs -> 46.3μs (216% faster)


def test_overlap_merges_boxes_min_confidence():
    # Two overlapping bboxes, min confidence
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "min")
    result = codeflash_output  # 121μs -> 17.7μs (586% faster)


def test_different_classes_not_merged():
    # Overlapping boxes with different class_ids should not be merged
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 2},
    ]
    # The function merges only if class_id matches, so these should not merge
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 120μs -> 17.5μs (587% faster)
    class_ids = [r["class_id"] for r in result]


# ---------------------------
# Edge Test Cases
# ---------------------------


def test_overlap_threshold_zero_merges_any_overlap():
    # Two boxes with partial overlap, threshold 0.0, should merge
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
        {"bbox": [9, 9, 20, 20], "confidence": 0.9, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.0)
    result = codeflash_output  # 119μs -> 17.5μs (583% faster)


def test_overlap_threshold_high_does_not_merge():
    # Two boxes with small overlap, high threshold, should not merge
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
        {"bbox": [9, 9, 20, 20], "confidence": 0.9, "class_id": 1},
    ]
    # Calculate IoU manually: intersection is 1x1=1, area1=100, area2=121, union=220
    # IoU = 1/220 ~ 0.0045, so threshold 0.01 should not merge
    codeflash_output = _merge_overlapping_bboxes(preds, "max", overlap_threshold=0.01)
    result = codeflash_output  # 120μs -> 19.7μs (508% faster)


def test_exactly_touching_boxes_not_merged():
    # Two boxes that touch at the edge, but do not overlap
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
        {"bbox": [10, 0, 20, 10], "confidence": 0.7, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 118μs -> 19.1μs (521% faster)


def test_fully_contained_box_merges():
    # One box fully inside another, should merge
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.6, "class_id": 1},
        {"bbox": [2, 2, 8, 8], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 119μs -> 17.3μs (591% faster)


def test_chain_of_overlaps_merges_all():
    # Boxes that overlap in a chain (A overlaps B, B overlaps C, but A does not overlap C)
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
        {"bbox": [9, 0, 19, 10], "confidence": 0.6, "class_id": 1},
        {"bbox": [18, 0, 28, 10], "confidence": 0.7, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 152μs -> 20.5μs (642% faster)


def test_multiple_groups():
    # Multiple groups of overlapping boxes
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
        {"bbox": [8, 0, 18, 10], "confidence": 0.6, "class_id": 1},
        {"bbox": [30, 30, 40, 40], "confidence": 0.9, "class_id": 1},
        {"bbox": [38, 30, 48, 40], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 179μs -> 23.9μs (649% faster)
    bboxes = [r["bbox"].tolist() for r in result]


def test_boxes_with_negative_coordinates():
    # Boxes with negative coordinates should merge correctly
    preds = [
        {"bbox": [-10, -10, 0, 0], "confidence": 0.8, "class_id": 1},
        {"bbox": [-5, -5, 5, 5], "confidence": 0.9, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 120μs -> 17.9μs (576% faster)


def test_boxes_with_zero_area():
    # Boxes with zero area should not merge unless they overlap
    preds = [
        {"bbox": [0, 0, 0, 10], "confidence": 0.5, "class_id": 1},  # zero width
        {"bbox": [5, 5, 15, 5], "confidence": 0.6, "class_id": 1},  # zero height
        {"bbox": [0, 0, 10, 10], "confidence": 0.7, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 174μs -> 23.7μs (637% faster)


def test_invalid_confidence_strategy_defaults_to_max():
    # Unknown confidence strategy should default to max
    preds = [
        {"bbox": [0, 0, 10, 10], "confidence": 0.5, "class_id": 1},
        {"bbox": [5, 5, 15, 15], "confidence": 0.8, "class_id": 1},
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "unknown_strategy")
    result = codeflash_output  # 120μs -> 17.5μs (590% faster)


# ---------------------------
# Large Scale Test Cases
# ---------------------------


def test_large_number_of_non_overlapping_boxes():
    # 500 non-overlapping boxes should each remain separate
    preds = [
        {
            "bbox": [i * 10, i * 10, i * 10 + 5, i * 10 + 5],
            "confidence": 0.5 + 0.001 * i,
            "class_id": 1,
        }
        for i in range(500)
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 12.8ms -> 1.54ms (731% faster)
    # Check a few random boxes
    for idx in [0, 100, 499]:
        found = any(
            np.allclose(r["bbox"], [idx * 10, idx * 10, idx * 10 + 5, idx * 10 + 5])
            for r in result
        )


def test_large_number_of_overlapping_boxes():
    # 500 overlapping boxes (all overlap), should merge into one
    preds = [
        {"bbox": [0, 0, 10 + i, 10 + i], "confidence": 0.5 + 0.001 * i, "class_id": 1}
        for i in range(500)
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 358ms -> 165ms (117% faster)


def test_large_number_of_groups():
    # 100 groups of 5 overlapping boxes each, should merge into 100 boxes
    preds = []
    for group in range(100):
        base = group * 100
        for i in range(5):
            preds.append(
                {
                    "bbox": [base + i, base + i, base + i + 10, base + i + 10],
                    "confidence": 0.5 + 0.01 * i,
                    "class_id": 1,
                }
            )
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 15.5ms -> 1.93ms (707% faster)
    # Each group should be merged into one box
    for group in range(100):
        base = group * 100
        found = any(
            np.allclose(r["bbox"], [base, base, base + 14, base + 14]) for r in result
        )


def test_large_scale_mixed_classes():
    # 500 boxes, alternating class_ids, all overlap
    preds = []
    for i in range(500):
        preds.append(
            {
                "bbox": [0, 0, 10 + i, 10 + i],
                "confidence": 0.5 + 0.001 * i,
                "class_id": i % 2,
            }
        )
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 363ms -> 162ms (123% faster)
    class_ids = set(r["class_id"] for r in result)
    # Each merged box should have the max bbox for its class
    for cid in [0, 1]:
        relevant = [p for p in preds if p["class_id"] == cid]
        max_bbox = [
            0,
            0,
            max(p["bbox"][2] for p in relevant),
            max(p["bbox"][3] for p in relevant),
        ]
        found = any(
            np.allclose(r["bbox"], max_bbox) and r["class_id"] == cid for r in result
        )


def test_large_scale_performance():
    # Not a strict performance test, but checks function completes on 999 boxes
    preds = [
        {"bbox": [i, i, i + 10, i + 10], "confidence": 0.5, "class_id": 1}
        for i in range(999)
    ]
    codeflash_output = _merge_overlapping_bboxes(preds, "max")
    result = codeflash_output  # 53.0ms -> 13.3ms (298% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To test or edit this optimization locally git merge codeflash/optimize-pr1853-2026-01-05T20.00.36

Click to see suggested changes
Suggested change
# Create boxes as Polygons for spatial indexing
boxes = []
for pred in predictions:
x_min, y_min, x_max, y_max = pred["bbox"]
# Create box polygon (coordinates: [bottom-left, bottom-right, top-right, top-left])
box = Polygon([(x_min, y_min), (x_max, y_min), (x_max, y_max), (x_min, y_max)])
boxes.append(box)
tree = STRtree(boxes)
# Check candidate pairs identified by spatial index
checked_pairs = set()
for i in range(n):
box1 = boxes[i]
# Query for boxes that intersect the bounding box
candidates = tree.query(box1, predicate="intersects")
for j in candidates:
if i >= j or (i, j) in checked_pairs or (j, i) in checked_pairs:
continue
checked_pairs.add((i, j))
bbox1 = predictions[i]["bbox"]
bbox2 = predictions[j]["bbox"]
iou = bbox_iou(bbox1, bbox2)
if overlap_threshold <= 0.0:
# Merge if they overlap at all
if iou > 0:
union(i, j)
else:
# Merge only if IoU exceeds threshold
if iou >= overlap_threshold:
union(i, j)
# Create sorted indices by x_min for sweep-line approach
bboxes = [pred["bbox"] for pred in predictions]
sorted_indices = sorted(range(n), key=lambda i: bboxes[i][0])
# Use sweep-line to find overlapping candidates
for i in range(n):
idx_i = sorted_indices[i]
bbox1 = bboxes[idx_i]
x1_min, y1_min, x1_max, y1_max = bbox1
# Only check boxes that could overlap in x dimension
for j in range(i + 1, n):
idx_j = sorted_indices[j]
bbox2 = bboxes[idx_j]
x2_min, y2_min, x2_max, y2_max = bbox2
# Early termination: if x2_min > x1_max, no further boxes can overlap
if x2_min > x1_max:
break
# Quick rejection test for y dimension
if y2_min > y1_max or y2_max < y1_min:
continue
iou = bbox_iou(bbox1, bbox2)
if overlap_threshold <= 0.0:
# Merge if they overlap at all
if iou > 0:
union(idx_i, idx_j)
else:
# Merge only if IoU exceeds threshold
if iou >= overlap_threshold:
union(idx_i, idx_j)

Static Badge

Comment on lines +193 to +271
merged = []
used = set()

for i, pred1 in enumerate(preds_with_keypoints):
if i in used:
continue

# Start a new merged group with this prediction
group = [pred1]
used.add(i)

kp1 = np.array(pred1["keypoint_data"]["keypoints_xy"])

# Find all predictions that should merge with this one
for j, pred2 in enumerate(preds_with_keypoints[i + 1 :], start=i + 1):
if j in used:
continue

kp2 = np.array(pred2["keypoint_data"]["keypoints_xy"])

# Calculate average distance between corresponding keypoints
if len(kp1) == len(kp2):
distances = np.linalg.norm(kp1 - kp2, axis=1)
avg_distance = np.mean(distances)

if avg_distance < keypoint_threshold:
group.append(pred2)
used.add(j)

# Merge the group
if len(group) == 1:
merged.append(group[0])
else:
# Merge multiple predictions
if confidence_strategy == "max":
best_idx = np.argmax([p["confidence"] for p in group])
confidence = group[best_idx]["confidence"]
elif confidence_strategy == "mean":
confidence = np.mean([p["confidence"] for p in group])
else: # 'min'
confidence = np.min([p["confidence"] for p in group])

# Average keypoint coordinates
all_kp_xy = [np.array(p["keypoint_data"]["keypoints_xy"]) for p in group]
merged_kp_xy = np.mean(all_kp_xy, axis=0).tolist()

# Average keypoint confidences if available
merged_kp_data = {
"keypoints_xy": merged_kp_xy,
"keypoints_class_name": group[0]["keypoint_data"].get(
"keypoints_class_name"
),
"keypoints_class_id": group[0]["keypoint_data"].get(
"keypoints_class_id"
),
}

if "keypoints_confidence" in group[0]["keypoint_data"]:
all_kp_conf = [
np.array(p["keypoint_data"]["keypoints_confidence"]) for p in group
]
merged_kp_conf = np.mean(all_kp_conf, axis=0).tolist()
merged_kp_data["keypoints_confidence"] = merged_kp_conf

# Average bbox coordinates
all_bboxes = np.array([p["bbox"] for p in group])
merged_bbox = np.mean(all_bboxes, axis=0)

merged.append(
{
"bbox": merged_bbox,
"confidence": confidence,
"class_id": group[0]["class_id"],
"mask": None,
"keypoint_data": merged_kp_data,
}
)

# Add back predictions without keypoints
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚡️Codeflash found 1,670% (16.70x) speedup for _merge_keypoint_detections in inference/core/workflows/core_steps/fusion/detections_list_rollup/v1.py

⏱️ Runtime : 1.61 seconds 90.8 milliseconds (best of 78 runs)

📝 Explanation and details

The optimized code achieves a 1670% speedup by replacing the O(N²K) nested Python loop with vectorized NumPy operations that compute all pairwise keypoint distances at once.

Key Optimizations:

  1. Vectorized Distance Computation: Instead of comparing keypoints pair-by-pair in nested loops, the code pre-stacks all keypoints into a single NumPy array and uses broadcasting to compute all pairwise distances simultaneously:

    diffs = keypoints_arr[:, None, :, :] - keypoints_arr[None, :, :, :]
    dists = np.linalg.norm(diffs, axis=3)
    avg_dists = np.mean(dists, axis=2)

    This creates an [N×N] distance matrix in one operation, eliminating the need to call np.linalg.norm thousands of times in the inner loop.

  2. Boolean Mask for Membership Tracking: Replaces Python set lookups with NumPy boolean array indexing (used = np.zeros(n_preds, dtype=bool)), providing O(1) access without Python overhead.

  3. Batch Processing: All keypoint arrays are converted to NumPy upfront and validated for equal length, allowing the fast path to handle the common case where all detections have the same number of keypoints.

  4. Fallback Path Preserved: When keypoint counts vary across detections (rare case), the code falls back to the original logic, maintaining correctness while optimizing the common case.

Why It's Faster:

  • Profiler Evidence: The original code spent 92.6% of time in the nested loop (lines with np.linalg.norm and np.mean each taking 39-52% of runtime). The optimized version reduces this to ~27% total for the vectorized computation.
  • Python Loop Elimination: Removes 132,000+ iterations of Python loops and 264,000+ NumPy array allocations per execution, replaced by 3 vectorized operations.
  • Cache Efficiency: Contiguous memory access patterns in the stacked array improve CPU cache utilization.

Impact on Workloads:

The function is called from merge_crop_predictions, which processes detections from tiled/cropped inference—a common pattern in object detection pipelines for handling large images. Based on the test results:

  • High-density scenarios (100-500 predictions): Shows 70-4282% speedup when most predictions don't merge, as the vectorized distance matrix avoids redundant pairwise comparisons.
  • Small inputs (2-10 predictions): Shows 11-89% slowdown due to NumPy setup overhead, but these cases take <100μs absolute time.
  • Real-world benefit: The function is in a hot path during crop merging. For inference on large images requiring 50-100 crops with ~50 detections each, this optimization reduces merge time from seconds to milliseconds, making it negligible compared to model inference time.

The optimization particularly benefits computer vision pipelines processing high-resolution images where many crops generate numerous detections that need to be consolidated.

Correctness verification report:

Test Status
⏪ Replay Tests 🔘 None Found
⚙️ Existing Unit Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
🌀 Generated Regression Tests 37 Passed
📊 Tests Coverage 97.7%
🌀 Click to see Generated Regression Tests
from typing import List

# function to test
import numpy as np

# imports
import pytest
from inference.core.workflows.core_steps.fusion.detections_list_rollup.v1 import (
    _merge_keypoint_detections,
)

# unit tests

# ----------- BASIC TEST CASES ------------


def test_empty_preds_returns_empty():
    # Should return empty list when input is empty
    codeflash_output = _merge_keypoint_detections(
        [], "max", 10.0
    )  # 681ns -> 631ns (7.92% faster)


def test_no_keypoint_data_returns_input():
    # Should return input unchanged if no keypoint_data present
    preds = [
        {"bbox": [0, 0, 1, 1], "confidence": 0.9, "class_id": 1, "mask": None},
        {"bbox": [1, 1, 2, 2], "confidence": 0.8, "class_id": 1, "mask": None},
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 10.0)
    result = codeflash_output  # 1.63μs -> 1.61μs (1.24% faster)


def test_single_prediction_with_keypoints():
    # Should return the single prediction unchanged
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        }
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 10.0)
    result = codeflash_output  # 10.4μs -> 90.9μs (88.6% slower)


def test_two_predictions_far_apart_not_merged():
    # Should not merge if keypoints are far apart
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [10, 10, 11, 11],
            "confidence": 0.8,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[11, 12], [13, 14]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 5.0)
    result = codeflash_output  # 54.3μs -> 76.3μs (28.8% slower)


def test_two_predictions_close_merged_max_confidence():
    # Should merge if keypoints are close, use max confidence
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [0.1, 0.1, 1.1, 1.1],
            "confidence": 0.8,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 0.5)
    result = codeflash_output  # 95.0μs -> 107μs (11.2% slower)
    merged = result[0]


def test_two_predictions_close_merged_mean_confidence():
    # Should merge, use mean confidence
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.7,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [0.2, 0.2, 1.2, 1.2],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.2], [3.2, 4.2]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "mean", 0.5)
    result = codeflash_output  # 90.4μs -> 112μs (19.4% slower)
    merged = result[0]


def test_two_predictions_close_merged_min_confidence():
    # Should merge, use min confidence
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.7,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [0.2, 0.2, 1.2, 1.2],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.2], [3.2, 4.2]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "min", 0.5)
    result = codeflash_output  # 92.4μs -> 109μs (15.4% slower)
    merged = result[0]


def test_keypoints_confidence_merged_mean():
    # Should merge keypoints_confidence by mean
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.7,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_confidence": [0.5, 0.7],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [0.2, 0.2, 1.2, 1.2],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.2], [3.2, 4.2]],
                "keypoints_confidence": [0.7, 0.9],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "mean", 0.5)
    result = codeflash_output  # 101μs -> 114μs (11.6% slower)
    merged = result[0]


def test_mixed_preds_with_and_without_keypoints():
    # Should merge keypoint preds and append non-keypoint preds unchanged
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [0.1, 0.1, 1.1, 1.1],
            "confidence": 0.8,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [10, 10, 11, 11],
            "confidence": 0.5,
            "class_id": 2,
            "mask": None,
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 0.5)
    result = codeflash_output  # 88.9μs -> 102μs (13.0% slower)


# ----------- EDGE TEST CASES ------------


def test_keypoints_different_lengths_not_merged():
    # Should not merge predictions with different number of keypoints
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4], [5, 6]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [0.1, 0.1, 1.1, 1.1],
            "confidence": 0.8,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 1.0)
    result = codeflash_output  # 13.4μs -> 18.6μs (28.2% slower)


def test_missing_keypoints_xy_field():
    # Should treat missing keypoints_xy as a non-keypoint pred
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                # missing keypoints_xy
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [0.1, 0.1, 1.1, 1.1],
            "confidence": 0.8,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 1.0)
    result = codeflash_output  # 8.88μs -> 62.9μs (85.9% slower)


def test_keypoint_threshold_zero():
    # Should never merge if threshold is zero
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [0.1, 0.1, 1.1, 1.1],
            "confidence": 0.8,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 0.0)
    result = codeflash_output  # 49.2μs -> 69.7μs (29.4% slower)


def test_keypoint_threshold_large_merges_all():
    # Should merge all if threshold is very large
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [100, 100, 101, 101],
            "confidence": 0.8,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[101, 102], [103, 104]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "mean", 1000.0)
    result = codeflash_output  # 93.1μs -> 111μs (16.8% slower)


def test_keypoints_confidence_missing_in_some():
    # Should handle missing keypoints_confidence gracefully
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
                # missing keypoints_confidence
            },
        },
        {
            "bbox": [0.1, 0.1, 1.1, 1.1],
            "confidence": 0.8,
            "class_id": 1,
            "mask": None,
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
                "keypoints_confidence": [0.7, 0.9],
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 0.5)
    result = codeflash_output  # 90.6μs -> 103μs (12.1% slower)
    merged = [r for r in result if "keypoint_data" in r][0]


def test_preds_with_extra_fields():
    # Should ignore extra fields and not break
    preds = [
        {
            "bbox": [0, 0, 1, 1],
            "confidence": 0.9,
            "class_id": 1,
            "mask": None,
            "extra_field": "foo",
            "keypoint_data": {
                "keypoints_xy": [[1, 2], [3, 4]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
        {
            "bbox": [0.1, 0.1, 1.1, 1.1],
            "confidence": 0.8,
            "class_id": 1,
            "mask": None,
            "extra_field": "bar",
            "keypoint_data": {
                "keypoints_xy": [[1.2, 2.1], [3.1, 4.2]],
                "keypoints_class_name": "test",
                "keypoints_class_id": 42,
            },
        },
    ]
    codeflash_output = _merge_keypoint_detections(preds, "max", 0.5)
    result = codeflash_output  # 89.3μs -> 100μs (11.2% slower)


# ----------- LARGE SCALE TEST CASES ------------


def test_large_number_of_predictions_merging():
    # Create 100 predictions in a tight cluster, should all merge
    N = 100
    preds = []
    for i in range(N):
        preds.append(
            {
                "bbox": [i, i, i + 1, i + 1],
                "confidence": 0.5 + i / (2 * N),
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [
                        [1 + i / 100, 2 + i / 100],
                        [3 + i / 100, 4 + i / 100],
                    ],
                    "keypoints_class_name": "test",
                    "keypoints_class_id": 42,
                },
            }
        )
    # All keypoints within 1 pixel, so threshold 2 should merge all
    codeflash_output = _merge_keypoint_detections(preds, "mean", 2.0)
    result = codeflash_output  # 1.43ms -> 1.48ms (3.46% slower)
    merged = result[0]
    # Check that merged confidence is mean of all
    expected_conf = sum(0.5 + i / (2 * N) for i in range(N)) / N


def test_large_number_of_predictions_no_merging():
    # Create 100 predictions far apart, none should merge
    N = 100
    preds = []
    for i in range(N):
        preds.append(
            {
                "bbox": [i * 10, i * 10, i * 10 + 1, i * 10 + 1],
                "confidence": 0.5 + i / (2 * N),
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [
                        [100 + i * 10, 200 + i * 10],
                        [300 + i * 10, 400 + i * 10],
                    ],
                    "keypoints_class_name": "test",
                    "keypoints_class_id": 42,
                },
            }
        )
    # All keypoints >10 pixels apart, so threshold 1 should merge none
    codeflash_output = _merge_keypoint_detections(preds, "mean", 1.0)
    result = codeflash_output  # 60.2ms -> 1.67ms (3495% faster)


def test_large_mixed_preds_with_and_without_keypoints():
    # 50 keypoint preds in cluster, 50 without keypoints
    N = 50
    preds = []
    for i in range(N):
        preds.append(
            {
                "bbox": [i, i, i + 1, i + 1],
                "confidence": 0.5 + i / (2 * N),
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [
                        [1 + i / 100, 2 + i / 100],
                        [3 + i / 100, 4 + i / 100],
                    ],
                    "keypoints_class_name": "test",
                    "keypoints_class_id": 42,
                },
            }
        )
    for i in range(N):
        preds.append(
            {
                "bbox": [100 + i, 100 + i, 101 + i, 101 + i],
                "confidence": 0.7,
                "class_id": 2,
                "mask": None,
            }
        )
    # Should merge all keypoint preds into one, and keep the rest
    codeflash_output = _merge_keypoint_detections(preds, "mean", 2.0)
    result = codeflash_output  # 768μs -> 451μs (70.1% faster)


def test_large_preds_with_varied_keypoints_lengths():
    # 10 with 2 keypoints, 10 with 3 keypoints, none should merge across groups
    preds = []
    for i in range(10):
        preds.append(
            {
                "bbox": [i, i, i + 1, i + 1],
                "confidence": 0.8,
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [
                        [1 + i / 10, 2 + i / 10],
                        [3 + i / 10, 4 + i / 10],
                    ],
                    "keypoints_class_name": "test",
                    "keypoints_class_id": 42,
                },
            }
        )
    for i in range(10):
        preds.append(
            {
                "bbox": [i + 20, i + 20, i + 21, i + 21],
                "confidence": 0.9,
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [
                        [1 + i / 10, 2 + i / 10],
                        [3 + i / 10, 4 + i / 10],
                        [5 + i / 10, 6 + i / 10],
                    ],
                    "keypoints_class_name": "test",
                    "keypoints_class_id": 42,
                },
            }
        )
    # Should merge within each group, not across
    codeflash_output = _merge_keypoint_detections(preds, "mean", 2.0)
    result = codeflash_output  # 398μs -> 406μs (2.16% slower)
    for r in result:
        pass


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import numpy as np

# imports
import pytest  # used for our unit tests
from inference.core.workflows.core_steps.fusion.detections_list_rollup.v1 import (
    _merge_keypoint_detections,
)

# unit tests

# ----------- BASIC TEST CASES ------------


def test_empty_input_returns_empty():
    # Should return empty list if no detections
    codeflash_output = _merge_keypoint_detections(
        [], "max", 10
    )  # 672ns -> 641ns (4.84% faster)


def test_single_detection_no_merge():
    # Should return the same detection if only one present
    pred = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.9,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    codeflash_output = _merge_keypoint_detections([pred], "max", 5)
    result = codeflash_output  # 9.39μs -> 67.0μs (86.0% slower)


def test_two_detections_far_apart_no_merge():
    # Should NOT merge if keypoints are far apart
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.8,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred2 = {
        "bbox": [20, 20, 30, 30],
        "confidence": 0.7,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[21, 21], [22, 22]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 5)
    result = codeflash_output  # 48.7μs -> 73.0μs (33.3% slower)


def test_two_detections_close_merge_max_confidence():
    # Should merge if keypoints are close, and use max confidence
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.8,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred2 = {
        "bbox": [1, 1, 11, 11],
        "confidence": 0.9,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1.1, 1], [2.1, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 1.5)
    result = codeflash_output  # 96.9μs -> 105μs (8.21% slower)
    merged = result[0]
    # Bbox should be mean of both
    expected_bbox = np.mean([pred1["bbox"], pred2["bbox"]], axis=0)
    # Keypoints should be mean
    expected_kp = np.mean(
        [
            np.array(pred1["keypoint_data"]["keypoints_xy"]),
            np.array(pred2["keypoint_data"]["keypoints_xy"]),
        ],
        axis=0,
    )


def test_confidence_strategy_mean_and_min():
    # Should use mean or min confidence as requested
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.6,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred2 = {
        "bbox": [1, 1, 11, 11],
        "confidence": 0.8,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1.1, 1], [2.1, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    # Mean
    codeflash_output = _merge_keypoint_detections([pred1, pred2], "mean", 1.5)
    result_mean = codeflash_output  # 89.8μs -> 110μs (18.6% slower)
    # Min
    codeflash_output = _merge_keypoint_detections([pred1, pred2], "min", 1.5)
    result_min = codeflash_output  # 72.7μs -> 87.8μs (17.2% slower)


def test_keypoint_confidence_merging():
    # Should merge keypoint confidences if present
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.6,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2]],
            "keypoints_confidence": [0.5, 0.7],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred2 = {
        "bbox": [1, 1, 11, 11],
        "confidence": 0.8,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1.1, 1], [2.1, 2]],
            "keypoints_confidence": [0.9, 0.3],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    codeflash_output = _merge_keypoint_detections([pred1, pred2], "mean", 1.5)
    result = codeflash_output  # 101μs -> 116μs (12.6% slower)
    merged = result[0]
    expected_conf = np.mean([[0.5, 0.7], [0.9, 0.3]], axis=0)


def test_detections_without_keypoints_are_not_merged():
    # Detections without keypoints should be returned as-is
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.6,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred2 = {
        "bbox": [20, 20, 30, 30],
        "confidence": 0.7,
        "class_id": 2,
        "mask": None,
        "keypoint_data": None,
    }
    codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 5)
    result = codeflash_output  # 9.27μs -> 63.4μs (85.4% slower)


def test_keypoint_data_missing_keypoints_xy():
    # Detections with keypoint_data but missing keypoints_xy should not be merged
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.6,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {"keypoints_class_name": "A", "keypoints_class_id": 1},
    }
    pred2 = {
        "bbox": [1, 1, 11, 11],
        "confidence": 0.8,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1.1, 1], [2.1, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 1.5)
    result = codeflash_output  # 9.62μs -> 60.8μs (84.2% slower)


# ----------- EDGE TEST CASES ------------


def test_keypoints_different_lengths_not_merged():
    # Detections with different number of keypoints should not be merged
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.8,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2], [3, 3]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred2 = {
        "bbox": [1, 1, 11, 11],
        "confidence": 0.9,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1.1, 1], [2.1, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 2)
    result = codeflash_output  # 13.8μs -> 18.9μs (27.0% slower)


def test_keypoint_threshold_zero():
    # With threshold zero, only identical keypoints merge
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.8,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred2 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.9,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred3 = {
        "bbox": [1, 1, 11, 11],
        "confidence": 0.7,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1.1, 1], [2.1, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    codeflash_output = _merge_keypoint_detections([pred1, pred2, pred3], "max", 0)
    result = codeflash_output  # 92.5μs -> 80.3μs (15.1% faster)


def test_all_detections_merged():
    # All detections merged if threshold is very large
    preds = []
    for i in range(5):
        preds.append(
            {
                "bbox": [i, i, i + 10, i + 10],
                "confidence": 0.5 + i * 0.1,
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [[i + 1, i + 1], [i + 2, i + 2]],
                    "keypoints_class_name": "A",
                    "keypoints_class_id": 1,
                },
            }
        )
    codeflash_output = _merge_keypoint_detections(preds, "max", 1000)
    result = codeflash_output  # 148μs -> 122μs (22.1% faster)


def test_missing_keypoint_data_field():
    # If keypoint_data is missing, detection should not be merged
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.8,
        "class_id": 1,
        "mask": None,
        # No keypoint_data
    }
    pred2 = {
        "bbox": [1, 1, 11, 11],
        "confidence": 0.9,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1.1, 1], [2.1, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    codeflash_output = _merge_keypoint_detections([pred1, pred2], "max", 1.5)
    result = codeflash_output  # 9.94μs -> 61.8μs (83.9% slower)


def test_multiple_groups_merging():
    # Should correctly merge multiple groups separately
    pred1 = {
        "bbox": [0, 0, 10, 10],
        "confidence": 0.7,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1, 1], [2, 2]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred2 = {
        "bbox": [1, 1, 11, 11],
        "confidence": 0.8,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[1.2, 1.1], [2.1, 2.1]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    pred3 = {
        "bbox": [20, 20, 30, 30],
        "confidence": 0.9,
        "class_id": 1,
        "mask": None,
        "keypoint_data": {
            "keypoints_xy": [[21, 21], [22, 22]],
            "keypoints_class_name": "A",
            "keypoints_class_id": 1,
        },
    }
    codeflash_output = _merge_keypoint_detections([pred1, pred2, pred3], "max", 2)
    result = codeflash_output  # 115μs -> 114μs (0.595% faster)
    confidences = [p["confidence"] for p in result]


# ----------- LARGE SCALE TEST CASES ------------


def test_large_scale_merging():
    # Merge a large number of close detections
    N = 500
    preds = []
    for i in range(N):
        preds.append(
            {
                "bbox": [i, i, i + 10, i + 10],
                "confidence": 0.5 + (i % 10) * 0.01,
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [[i + 1, i + 1], [i + 2, i + 2]],
                    "keypoints_class_name": "A",
                    "keypoints_class_id": 1,
                },
            }
        )
    # All keypoints are close enough to merge
    codeflash_output = _merge_keypoint_detections(preds, "max", 1000)
    result = codeflash_output  # 7.28ms -> 26.8ms (72.8% slower)
    # Confidence should be max of all
    expected_conf = max([p["confidence"] for p in preds])
    # Bbox should be mean of all
    expected_bbox = np.mean([p["bbox"] for p in preds], axis=0)


def test_large_scale_no_merging():
    # Large number of detections, all far apart
    N = 500
    preds = []
    for i in range(N):
        preds.append(
            {
                "bbox": [i * 100, i * 100, i * 100 + 10, i * 100 + 10],
                "confidence": 0.5 + (i % 10) * 0.01,
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [
                        [i * 100 + 1, i * 100 + 1],
                        [i * 100 + 2, i * 100 + 2],
                    ],
                    "keypoints_class_name": "A",
                    "keypoints_class_id": 1,
                },
            }
        )
    # No keypoints close enough to merge
    codeflash_output = _merge_keypoint_detections(preds, "max", 1)
    result = codeflash_output  # 1.51s -> 34.5ms (4282% faster)
    # All original detections should be present
    for p in preds:
        pass


def test_large_scale_mixed_keypoints_and_non_keypoints():
    # Mix of detections with and without keypoints
    N = 250
    preds = []
    for i in range(N):
        preds.append(
            {
                "bbox": [i, i, i + 10, i + 10],
                "confidence": 0.5 + (i % 10) * 0.01,
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [[i + 1, i + 1], [i + 2, i + 2]],
                    "keypoints_class_name": "A",
                    "keypoints_class_id": 1,
                },
            }
        )
    for i in range(N):
        preds.append(
            {
                "bbox": [i + 1000, i + 1000, i + 1010, i + 1010],
                "confidence": 0.6,
                "class_id": 2,
                "mask": None,
                "keypoint_data": None,
            }
        )
    # Only the first N should merge, rest stay separate
    codeflash_output = _merge_keypoint_detections(preds, "max", 1000)
    result = codeflash_output  # 3.64ms -> 5.89ms (38.2% slower)


def test_large_scale_multiple_groups():
    # Large number of detections forming two separate groups
    N = 200
    preds = []
    # First group: close keypoints
    for i in range(N):
        preds.append(
            {
                "bbox": [i, i, i + 10, i + 10],
                "confidence": 0.5 + (i % 10) * 0.01,
                "class_id": 1,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [[i + 1, i + 1], [i + 2, i + 2]],
                    "keypoints_class_name": "A",
                    "keypoints_class_id": 1,
                },
            }
        )
    # Second group: far keypoints
    for i in range(N):
        preds.append(
            {
                "bbox": [i + 1000, i + 1000, i + 1010, i + 1010],
                "confidence": 0.6,
                "class_id": 2,
                "mask": None,
                "keypoint_data": {
                    "keypoints_xy": [[i + 1001, i + 1001], [i + 1002, i + 1002]],
                    "keypoints_class_name": "B",
                    "keypoints_class_id": 2,
                },
            }
        )
    codeflash_output = _merge_keypoint_detections(preds, "max", 100)
    result = codeflash_output  # 18.1ms -> 17.3ms (4.69% faster)
    # One for class_id 1, one for class_id 2
    class_ids = set(p["class_id"] for p in result)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To test or edit this optimization locally git merge codeflash/optimize-pr1853-2026-01-05T20.02.38

Click to see suggested changes
Suggested change
merged = []
used = set()
for i, pred1 in enumerate(preds_with_keypoints):
if i in used:
continue
# Start a new merged group with this prediction
group = [pred1]
used.add(i)
kp1 = np.array(pred1["keypoint_data"]["keypoints_xy"])
# Find all predictions that should merge with this one
for j, pred2 in enumerate(preds_with_keypoints[i + 1 :], start=i + 1):
if j in used:
continue
kp2 = np.array(pred2["keypoint_data"]["keypoints_xy"])
# Calculate average distance between corresponding keypoints
if len(kp1) == len(kp2):
distances = np.linalg.norm(kp1 - kp2, axis=1)
avg_distance = np.mean(distances)
if avg_distance < keypoint_threshold:
group.append(pred2)
used.add(j)
# Merge the group
if len(group) == 1:
merged.append(group[0])
else:
# Merge multiple predictions
if confidence_strategy == "max":
best_idx = np.argmax([p["confidence"] for p in group])
confidence = group[best_idx]["confidence"]
elif confidence_strategy == "mean":
confidence = np.mean([p["confidence"] for p in group])
else: # 'min'
confidence = np.min([p["confidence"] for p in group])
# Average keypoint coordinates
all_kp_xy = [np.array(p["keypoint_data"]["keypoints_xy"]) for p in group]
merged_kp_xy = np.mean(all_kp_xy, axis=0).tolist()
# Average keypoint confidences if available
merged_kp_data = {
"keypoints_xy": merged_kp_xy,
"keypoints_class_name": group[0]["keypoint_data"].get(
"keypoints_class_name"
),
"keypoints_class_id": group[0]["keypoint_data"].get(
"keypoints_class_id"
),
}
if "keypoints_confidence" in group[0]["keypoint_data"]:
all_kp_conf = [
np.array(p["keypoint_data"]["keypoints_confidence"]) for p in group
]
merged_kp_conf = np.mean(all_kp_conf, axis=0).tolist()
merged_kp_data["keypoints_confidence"] = merged_kp_conf
# Average bbox coordinates
all_bboxes = np.array([p["bbox"] for p in group])
merged_bbox = np.mean(all_bboxes, axis=0)
merged.append(
{
"bbox": merged_bbox,
"confidence": confidence,
"class_id": group[0]["class_id"],
"mask": None,
"keypoint_data": merged_kp_data,
}
)
# Add back predictions without keypoints
n_preds = len(preds_with_keypoints)
# Pre-convert all keypoints_xy's to a single numpy array for fast slicing
kp_list = []
kp_len = None
for p in preds_with_keypoints:
arr = np.asarray(p["keypoint_data"]["keypoints_xy"])
if kp_len is None:
kp_len = len(arr)
elif kp_len != len(arr):
kp_len = None
break
kp_list.append(arr)
if kp_len is not None:
# All have equal keypoint length, stack for fast vectorized comparison
keypoints_arr = np.stack(kp_list, axis=0)
same_size = True
else:
# Fallback to original slow path if varying keypoint length
keypoints_arr = None
same_size = False
merged = []
used = np.zeros(n_preds, dtype=bool) # bool mask instead of set for O(1) checks
if same_size:
# Compute an upper triangular pairwise matrix of average distances
diffs = keypoints_arr[:, None, :, :] - keypoints_arr[None, :, :, :]
dists = np.linalg.norm(diffs, axis=3) # [i, j, k] -- k is keypoint index
avg_dists = np.mean(dists, axis=2) # [i, j] mean over keypoints
for i in range(n_preds):
if used[i]:
continue
# Find all remaining j > i such that avg_distance < keypoint_threshold
mask = (~used) & (avg_dists[i] < keypoint_threshold)
# Only look at indices >= i, to preserve greedy merge order as before
mask[: i + 1] = False
group_indices = [i]
for j in range(i + 1, n_preds):
if mask[j]:
group_indices.append(j)
used[j] = True
used[i] = True
group = [preds_with_keypoints[idx] for idx in group_indices]
if len(group) == 1:
merged.append(group[0])
else: # 'min'
if confidence_strategy == "max":
best_idx = np.argmax([p["confidence"] for p in group])
confidence = group[best_idx]["confidence"]
elif confidence_strategy == "mean":
confidence = np.mean([p["confidence"] for p in group])
else: # 'min'
confidence = np.min([p["confidence"] for p in group])
all_kp_xy = np.array(
[p["keypoint_data"]["keypoints_xy"] for p in group]
)
merged_kp_xy = np.mean(all_kp_xy, axis=0).tolist()
merged_kp_data = {
"keypoints_xy": merged_kp_xy,
"keypoints_class_name": group[0]["keypoint_data"].get(
"keypoints_class_name"
),
"keypoints_class_id": group[0]["keypoint_data"].get(
"keypoints_class_id"
),
}
if "keypoints_confidence" in group[0]["keypoint_data"]:
all_kp_conf = np.array(
[p["keypoint_data"]["keypoints_confidence"] for p in group]
)
merged_kp_conf = np.mean(all_kp_conf, axis=0).tolist()
merged_kp_data["keypoints_confidence"] = merged_kp_conf
all_bboxes = np.array([p["bbox"] for p in group])
merged_bbox = np.mean(all_bboxes, axis=0)
merged.append(
{
"bbox": merged_bbox,
"confidence": confidence,
"class_id": group[0]["class_id"],
"mask": None,
"keypoint_data": merged_kp_data,
}
)
else:
# Fallback: original slow python-path for variable keypoint lengths
used_set = set()
for i, pred1 in enumerate(preds_with_keypoints):
if i in used_set:
continue
group = [pred1]
used_set.add(i)
kp1 = np.array(pred1["keypoint_data"]["keypoints_xy"])
for j, pred2 in enumerate(preds_with_keypoints[i + 1 :], start=i + 1):
if j in used_set:
continue
kp2 = np.array(pred2["keypoint_data"]["keypoints_xy"])
if len(kp1) == len(kp2):
distances = np.linalg.norm(kp1 - kp2, axis=1)
avg_distance = np.mean(distances)
if avg_distance < keypoint_threshold:
group.append(pred2)
used_set.add(j)
if len(group) == 1:
merged.append(group[0])
else:
if confidence_strategy == "max":
best_idx = np.argmax([p["confidence"] for p in group])
confidence = group[best_idx]["confidence"]
elif confidence_strategy == "mean":
confidence = np.mean([p["confidence"] for p in group])
else:
confidence = np.min([p["confidence"] for p in group])
all_kp_xy = [
np.array(p["keypoint_data"]["keypoints_xy"]) for p in group
]
merged_kp_xy = np.mean(all_kp_xy, axis=0).tolist()
merged_kp_data = {
"keypoints_xy": merged_kp_xy,
"keypoints_class_name": group[0]["keypoint_data"].get(
"keypoints_class_name"
),
"keypoints_class_id": group[0]["keypoint_data"].get(
"keypoints_class_id"
),
}
if "keypoints_confidence" in group[0]["keypoint_data"]:
all_kp_conf = [
np.array(p["keypoint_data"]["keypoints_confidence"])
for p in group
]
merged_kp_conf = np.mean(all_kp_conf, axis=0).tolist()
merged_kp_data["keypoints_confidence"] = merged_kp_conf
all_bboxes = np.array([p["bbox"] for p in group])
merged_bbox = np.mean(all_bboxes, axis=0)
merged.append(
{
"bbox": merged_bbox,
"confidence": confidence,
"class_id": group[0]["class_id"],
"mask": None,
"keypoint_data": merged_kp_data,
}
)
# Add back predictions without keypoints
# Add back predictions without keypoints

Static Badge

@lou-roboflow lou-roboflow restored the RollUpDetectionsBlock branch January 5, 2026 20:18
@lou-roboflow lou-roboflow deleted the RollUpDetectionsBlock branch January 5, 2026 20:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants