|
1 | 1 | # -*- coding: utf-8 -*- |
2 | 2 |
|
| 3 | +import math |
3 | 4 | import unittest |
4 | 5 | from includes import * |
5 | 6 | from common import * |
@@ -589,6 +590,119 @@ def testTimedOutWarningCoordResp3(): |
589 | 590 | def testTimedOutWarningCoordResp2(): |
590 | 591 | TimedOutWarningtestCoord(Env(protocol=2)) |
591 | 592 |
|
| 593 | +def get_shards_profile(env, res): |
| 594 | + """Extract shard profiles from FT.PROFILE AGGREGATE response.""" |
| 595 | + if env.protocol == 3: |
| 596 | + return res['Profile']['Shards'] |
| 597 | + else: |
| 598 | + return [to_dict(p) for p in res[-1][1]] |
| 599 | + |
| 600 | +def InternalCursorReadsInProfile(protocol): |
| 601 | + """Tests that 'Internal cursor reads' appears in shard profiles for AGGREGATE.""" |
| 602 | + # Limit number of shards to avoid creating too many docs |
| 603 | + env = Env(shardsCount=2, protocol=protocol) |
| 604 | + conn = getConnectionByEnv(env) |
| 605 | + env.cmd(config_cmd(), 'SET', '_PRINT_PROFILE_CLOCK', 'false') |
| 606 | + |
| 607 | + env.expect('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT').ok() |
| 608 | + |
| 609 | + # Insert docs - with default cursorReadSize=1000, each shard needs more than 1000 to require 2 reads |
| 610 | + num_docs = int(1000 * 1.1 * env.shardsCount) |
| 611 | + for i in range(num_docs): |
| 612 | + conn.execute_command('HSET', f'doc{i}', 't', f'hello{i}') |
| 613 | + |
| 614 | + # Run FT.PROFILE AGGREGATE - coordinator uses internal cursors to shards |
| 615 | + res = env.cmd('FT.PROFILE', 'idx', 'AGGREGATE', 'QUERY', '*') |
| 616 | + |
| 617 | + shards_profile = get_shards_profile(env, res) |
| 618 | + env.assertEqual(len(shards_profile), env.shardsCount, message=f"unexpected number of shards. full reply output: {res}") |
| 619 | + |
| 620 | + # Each shard should have exactly 2 cursor reads (1000+ docs per shard, default cursorReadSize=1000) |
| 621 | + for shard_profile in shards_profile: |
| 622 | + env.assertContains('Internal cursor reads', shard_profile) |
| 623 | + env.assertEqual(shard_profile['Internal cursor reads'], 2) |
| 624 | + |
| 625 | +@skip(cluster=False) |
| 626 | +def testInternalCursorReadsInProfileResp3(): |
| 627 | + InternalCursorReadsInProfile(protocol=3) |
| 628 | + |
| 629 | +@skip(cluster=False) |
| 630 | +def testInternalCursorReadsInProfileResp2(): |
| 631 | + InternalCursorReadsInProfile(protocol=2) |
| 632 | + |
| 633 | +@skip(cluster=False) |
| 634 | +def testInternalCursorReadsWithTimeoutResp3(): |
| 635 | + """Tests 'Internal cursor reads' with timeout - RESP3 coordinator detects timeout and stops early.""" |
| 636 | + env = Env(protocol=3) |
| 637 | + conn = getConnectionByEnv(env) |
| 638 | + run_command_on_all_shards(env, config_cmd(), 'SET', '_PRINT_PROFILE_CLOCK', 'false') |
| 639 | + |
| 640 | + env.expect('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT').ok() |
| 641 | + |
| 642 | + num_docs = 100 |
| 643 | + for i in range(num_docs): |
| 644 | + conn.execute_command('HSET', f'doc{i}', 't', f'hello{i}') |
| 645 | + |
| 646 | + # Run FT.PROFILE AGGREGATE with simulated timeout on shards only |
| 647 | + query = ['FT.PROFILE', 'idx', 'AGGREGATE', 'QUERY', '*'] |
| 648 | + timeout_after_n = 5 |
| 649 | + res = runDebugQueryCommandTimeoutAfterN(env, query, timeout_after_n, internal_only=True) |
| 650 | + |
| 651 | + # RESP3: coordinator detects shard timeout and stops early after reading first shard's reply |
| 652 | + # Results count equals first shard's reply length (timeout_after_n) |
| 653 | + env.assertEqual(len(res['Results']['results']), timeout_after_n) |
| 654 | + |
| 655 | + shards_profile = get_shards_profile(env, res) |
| 656 | + for shard_profile in shards_profile: |
| 657 | + env.assertContains('Internal cursor reads', shard_profile, message=f"full reply output: {res}") |
| 658 | + # Coordinator stops after first timeout, so only 1 cursor read per shard |
| 659 | + env.assertEqual(shard_profile['Internal cursor reads'], 1, message=f"full reply output: {res}") |
| 660 | + env.assertEqual(shard_profile['Warning'], 'Timeout limit was reached', message=f"full reply output: {res}") |
| 661 | + |
| 662 | +@skip(cluster=False) |
| 663 | +def testInternalCursorReadsWithTimeoutResp2(): |
| 664 | + """Tests 'Internal cursor reads' with timeout - RESP2 coordinator doesn't detect timeout, reads until EOF.""" |
| 665 | + env = Env(shardsCount=2, protocol=2) |
| 666 | + conn = getConnectionByEnv(env) |
| 667 | + run_command_on_all_shards(env, config_cmd(), 'SET', '_PRINT_PROFILE_CLOCK', 'false') |
| 668 | + |
| 669 | + env.expect('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT').ok() |
| 670 | + |
| 671 | + num_docs = 100 |
| 672 | + for i in range(num_docs): |
| 673 | + conn.execute_command('HSET', f'doc{i}', 't', f'hello{i}') |
| 674 | + |
| 675 | + # Run FT.PROFILE AGGREGATE with simulated timeout on shards only |
| 676 | + query = ['FT.PROFILE', 'idx', 'AGGREGATE', 'QUERY', '*'] |
| 677 | + timeout_after_n = 5 |
| 678 | + res = runDebugQueryCommandTimeoutAfterN(env, query, timeout_after_n, internal_only=True) |
| 679 | + |
| 680 | + # RESP2: coordinator doesn't check shard timeout, reads until EOF |
| 681 | + # All docs are returned |
| 682 | + env.assertEqual(len(res[0]) - 1, num_docs) |
| 683 | + |
| 684 | + shards_profile = get_shards_profile(env, res) |
| 685 | + env.assertEqual(len(shards_profile), env.shardsCount, message=f"unexpected number of shards. full reply output: {res}") |
| 686 | + |
| 687 | + # Verify total cursor reads matches expected (order of shards may differ) |
| 688 | + total_expected_reads = 0 |
| 689 | + for shard_conn in env.getOSSMasterNodesConnectionList(): |
| 690 | + docs_on_shard = shard_conn.execute_command('DBSIZE') |
| 691 | + total_expected_reads += math.ceil(docs_on_shard / timeout_after_n) |
| 692 | + |
| 693 | + # The order of shards in the profile response may differ, so we can't check per-shard |
| 694 | + total_actual_reads = sum(sp['Internal cursor reads'] for sp in shards_profile) |
| 695 | + env.assertEqual(total_actual_reads, total_expected_reads, message=f"full reply output: {res}") |
| 696 | + |
| 697 | + # Verify each shard has warning |
| 698 | + for shard_profile in shards_profile: |
| 699 | + env.assertContains('Internal cursor reads', shard_profile, message=f"full reply output: {res}") |
| 700 | + env.assertEqual(shard_profile['Warning'], 'Timeout limit was reached', message=f"full reply output: {res}") |
| 701 | + |
| 702 | + # Coordinator should NOT have timeout warning (it doesn't detect it in RESP2) |
| 703 | + coord_profile = to_dict(res[-1][-1]) |
| 704 | + env.assertEqual(coord_profile['Warning'], 'None', message=f"full reply output: {res}") |
| 705 | + |
592 | 706 | # This test is currently skipped due to flaky behavior of some of the machines' |
593 | 707 | # timers. MOD-6436 |
594 | 708 | @skip() |
|
0 commit comments