Skip to content

Flaky consensus test_shard_snapshot_transfer_throttled_updates #7335

@agourlay

Description

@agourlay
 =================================== FAILURES ===================================
________________ test_shard_snapshot_transfer_throttled_updates ________________

tmp_path = PosixPath('/tmp/pytest-of-runner/pytest-1/test_shard_snapshot_transfer_t0')

    def test_shard_snapshot_transfer_throttled_updates(tmp_path: pathlib.Path):
        assert_project_root()
    
        # seed port to reuse the same port for the restarted nodes
        peer_api_uris, peer_dirs, bootstrap_uri = start_cluster(tmp_path, N_PEERS, 20000)
    
        create_collection(peer_api_uris[0], shard_number=N_SHARDS, replication_factor=N_REPLICA)
        wait_collection_exists_and_active_on_all_peers(
            collection_name=COLLECTION_NAME,
            peer_api_uris=peer_api_uris
        )
    
        # Insert some initial number of points
        upsert_random_points(peer_api_uris[0], 10000)
    
        # Start pushing points to the cluster
        upload_process_1 = run_update_points_in_background(peer_api_uris[0], COLLECTION_NAME, init_offset=100, throttle=True)
        upload_process_2 = run_update_points_in_background(peer_api_uris[1], COLLECTION_NAME, init_offset=10000, throttle=True)
        upload_process_3 = run_update_points_in_background(peer_api_uris[2], COLLECTION_NAME, init_offset=20000, throttle=True)
    
        transfer_collection_cluster_info = get_collection_cluster_info(peer_api_uris[0], COLLECTION_NAME)
        receiver_collection_cluster_info = get_collection_cluster_info(peer_api_uris[2], COLLECTION_NAME)
    
        from_peer_id = transfer_collection_cluster_info['peer_id']
        to_peer_id = receiver_collection_cluster_info['peer_id']
    
        shard_id = transfer_collection_cluster_info['local_shards'][0]['shard_id']
    
        # Transfer shard from one node to another
    
        # Move shard `shard_id` to peer `target_peer_id`
        r = requests.post(
            f"{peer_api_uris[0]}/collections/{COLLECTION_NAME}/cluster", json={
                "replicate_shard": {
                    "shard_id": shard_id,
                    "from_peer_id": from_peer_id,
                    "to_peer_id": to_peer_id,
                    "method": "snapshot",
                }
            })
        assert_http_ok(r)
    
        # Wait for end of shard transfer
        wait_for_collection_shard_transfers_count(peer_api_uris[0], COLLECTION_NAME, 0)
    
        upload_process_1.kill()
        upload_process_2.kill()
        upload_process_3.kill()
        sleep(1)
    
        receiver_collection_cluster_info = get_collection_cluster_info(peer_api_uris[2], COLLECTION_NAME)
        number_local_shards = len(receiver_collection_cluster_info['local_shards'])
        assert number_local_shards == 2
    
        # Point counts must be consistent across nodes
        counts = []
        for uri in peer_api_uris:
            r = requests.post(
                f"{uri}/collections/{COLLECTION_NAME}/points/count", json={
                    "exact": True
                }
            )
            assert_http_ok(r)
            counts.append(r.json()["result"]['count'])
>       assert counts[0] == counts[1] == counts[2]
E       assert 10060 == 9804

Failed already twice on CI:

Run with:
pytest tests/consensus_tests/test_shard_snapshot_transfer.py::test_shard_snapshot_transfer_throttled_updates

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions