-
Notifications
You must be signed in to change notification settings - Fork 47
Expand file tree
/
Copy pathhttpx_ratelimiter.py
More file actions
115 lines (75 loc) · 3.02 KB
/
httpx_ratelimiter.py
File metadata and controls
115 lines (75 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# ruff: noqa: T201
"""
Example of using pyrate_limiter with httpx.
"""
import logging
from pyrate_limiter import limiter_factory
from pyrate_limiter.extras.httpx_limiter import AsyncRateLimiterTransport, RateLimiterTransport
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger.setLevel(logging.DEBUG)
# Example below
def fetch(start_time: int):
import httpx
url = "https://httpbin.org/get"
assert limiter_factory.LIMITER is not None
with httpx.Client(transport=RateLimiterTransport(limiter=limiter_factory.LIMITER)) as client:
client.get(url)
def singleprocess_example():
import os
import time
import httpx
from pyrate_limiter import Duration, limiter_factory
start_time = time.time()
url = "https://httpbin.org/get"
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=1, duration=Duration.SECOND)
transport = RateLimiterTransport(limiter=limiter)
with httpx.Client(transport=transport) as client:
for _ in range(10):
response = client.get(url)
print(f"{round(time.time() - start_time, 2)}s-{os.getpid()}: {response.json()}")
def asyncio_example():
import asyncio
import time
import httpx
from pyrate_limiter import Duration, limiter_factory
url = "https://httpbin.org/get"
async def ticker():
"""loops and prints time, showing the eventloop isn't blocked"""
while True:
print(f"[TICK] {time.time()}")
await asyncio.sleep(1)
async def afetch(client: httpx.AsyncClient, start_time: int):
await client.get(url)
async def example():
limiter = limiter_factory.create_inmemory_limiter(rate_per_duration=1, duration=Duration.SECOND)
transport = AsyncRateLimiterTransport(limiter=limiter)
client = httpx.AsyncClient(transport=transport)
tasks = [afetch(client, url) for _ in range(10)]
asyncio.create_task(ticker())
results = await asyncio.gather(*tasks)
await client.aclose()
return results
asyncio.run(example())
def multiprocess_example():
import time
from concurrent.futures import ProcessPoolExecutor, wait
from pyrate_limiter import Duration, MultiprocessBucket, Rate
rate = Rate(1, Duration.SECOND)
bucket = MultiprocessBucket.init([rate])
start_time = time.time()
with ProcessPoolExecutor(initializer=limiter_factory.init_global_limiter, initargs=(bucket,)) as executor:
futures = [executor.submit(fetch, start_time) for _ in range(10)]
wait(futures)
for f in futures:
try:
f.result()
except Exception:
logger.exception("Task raised")
if __name__ == "__main__":
print("Single Process example: 10 requests")
singleprocess_example()
print("Multiprocessing example: 10 requests")
multiprocess_example()
print("Asyncio example: 10 requests")
asyncio_example()