Skip to content

Improve count_cycle()#1066

Merged
bbayles merged 2 commits intomore-itertools:masterfrom
rhettinger:count_cyle
Aug 29, 2025
Merged

Improve count_cycle()#1066
bbayles merged 2 commits intomore-itertools:masterfrom
rhettinger:count_cyle

Conversation

@rhettinger
Copy link
Copy Markdown
Contributor

Prefer an iterator stack to nested generator expressions.

Baseline:

% python3.14 -m timeit -s 'from more_itertools import consume, count_cycle' 'consume(count_cycle(range(1000), 3))'
5000 loops, best of 5: 93 usec per loop

Patched:

% python3.14 -m timeit -s 'from more_itertools import consume, count_cycle' 'consume(count_cycle(range(1000), 3))'
10000 loops, best of 5: 36.4 usec per loop

@pochmann3
Copy link
Copy Markdown
Contributor

Unless you dislike the dependency again, you could shorten your

    return zip(chain.from_iterable(map(repeat, counter, repeat(len(seq)))), cycle(seq))

to:

    return zip(repeat_each(counter, len(seq)), cycle(seq))

(Another one I wrote was return chain.from_iterable(map(zip, map(repeat, counter), repeat(seq))), but I think yours is better.)

@pochmann3
Copy link
Copy Markdown
Contributor

pochmann3 commented Aug 29, 2025

Not a suggestion, just curiosity: I also tried zip_longest with filling in the counter values:

    return chain.from_iterable(
        zip_longest((), seq, fillvalue=i)
        for i in counter
    )

Times with your test case:

args: range(1000), 3
180.4 ± 1.4 μs  original
 77.7 ± 0.4 μs  zip_chain (yours without repeat_each)
 67.3 ± 0.4 μs  chain_zip (mine I mentioned earlier)
 65.5 ± 0.5 μs  chain_ziplongest (mine with zip_longest)

Python: 3.13.0 (main, Nov  9 2024, 10:04:25) [GCC 14.2.1 20240910]

But as expected, it's slow for a short iterable:

args: range(3), 1000
231.8 ± 1.2 μs  original
223.1 ± 1.2 μs  zip_chain
396.0 ± 4.3 μs  chain_zip
442.1 ± 5.5 μs  chain_ziplongest
benchmark script
def original(iterable, n=None):
    iterable = tuple(iterable)
    if not iterable:
        return iter(())
    counter = count() if n is None else range(n)
    return ((i, item) for i in counter for item in iterable)


def zip_chain(iterable, n=None):
    seq = tuple(iterable)
    if not seq:
        return iter(())
    counter = count() if n is None else range(n)
    return zip(
        chain.from_iterable(map(repeat, counter, repeat(len(seq)))),
        cycle(seq)
    )


def chain_zip(iterable, n=None):
    seq = tuple(iterable)
    if not seq:
        return iter(())
    counter = count() if n is None else range(n)
    return chain.from_iterable(
        map(zip, map(repeat, counter), repeat(seq))
    )


def chain_ziplongest(iterable, n=None):
    seq = tuple(iterable)
    if not seq:
        return iter(())
    counter = count() if n is None else range(n)
    return chain.from_iterable(
        zip_longest((), seq, fillvalue=i)
        for i in counter
    )


funcs = [
    original, 
    zip_chain,
    chain_zip,
    chain_ziplongest,
]


from itertools import *
from collections import deque
from timeit import timeit
from statistics import mean, stdev
import sys
import random

# Correctness
for f in funcs:
    print(*f('AB', 3))

# Speed
consume = deque(maxlen=0).extend
args = 'range(1000), 3'
args = 'range(3), 1000'
times = {f: [] for f in funcs}
def stats(f):
    ts = [t * 1e6 for t in sorted(times[f])[:5]]
    return f'{mean(ts):5.1f} ± {stdev(ts):3.1f} μs '
for _ in range(100):
    for f in random.sample(funcs, len(funcs)):
        t = timeit(f'consume(count_cycle({args}))', 'from __main__ import f as count_cycle, consume', number=10) / 10
        times[f].append(t)
print(args)
for f in funcs:
    print(stats(f), f.__name__)

print('\nPython:', sys.version)

Attempt This Online!

@bbayles bbayles merged commit 0b34b64 into more-itertools:master Aug 29, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants