@@ -43,49 +43,32 @@ export const toPromiseAbortable = <T>(
4343 }
4444 } ) ;
4545
46- export function createLimiter ( ratelimitIntervalMs : number , ratelimitRequestPerInterval : number ) {
47- function createCurrentInterval ( ) {
48- return {
49- startedAt : Rx . asyncScheduler . now ( ) ,
50- numRequests : 0 ,
51- } ;
52- }
53-
54- let currentInterval : { startedAt : number ; numRequests : number } = createCurrentInterval ( ) ;
46+ export function createSubscriberConcurrencyLimiter ( maxConcurrency : number ) {
5547 let observers : Array < [ Rx . Subscriber < any > , any ] > = [ ] ;
56- let timerSubscription : Rx . Subscription | undefined ;
48+ let activeObservers : Array < Rx . Subscriber < any > > = [ ] ;
5749
58- function createTimeout ( ) {
59- if ( timerSubscription ) {
50+ function processNext ( ) {
51+ if ( activeObservers . length >= maxConcurrency ) {
6052 return ;
6153 }
62- timerSubscription = Rx . asyncScheduler . schedule ( ( ) => {
63- timerSubscription = undefined ;
64- currentInterval = createCurrentInterval ( ) ;
65- for ( const [ waitingObserver , value ] of observers ) {
66- if ( currentInterval . numRequests >= ratelimitRequestPerInterval ) {
67- createTimeout ( ) ;
68- continue ;
69- }
70- currentInterval . numRequests ++ ;
71- waitingObserver . next ( value ) ;
72- }
73- } , ratelimitIntervalMs ) ;
54+ const observerValuePair = observers . shift ( ) ;
55+
56+ if ( ! observerValuePair ) {
57+ return ;
58+ }
59+
60+ const [ observer , value ] = observerValuePair ;
61+ activeObservers . push ( observer ) ;
62+ observer . next ( value ) ;
7463 }
7564
7665 return function limit < T > ( ) : Rx . MonoTypeOperatorFunction < T > {
7766 return ( observable ) =>
7867 new Rx . Observable < T > ( ( observer ) => {
7968 const subscription = observable . subscribe ( {
8069 next ( value ) {
81- if ( currentInterval . numRequests < ratelimitRequestPerInterval ) {
82- currentInterval . numRequests ++ ;
83- observer . next ( value ) ;
84- return ;
85- }
86-
8770 observers = [ ...observers , [ observer , value ] ] ;
88- createTimeout ( ) ;
71+ processNext ( ) ;
8972 } ,
9073 error ( err ) {
9174 observer . error ( err ) ;
@@ -96,8 +79,10 @@ export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerIn
9679 } ) ;
9780
9881 return ( ) => {
82+ activeObservers = activeObservers . filter ( ( o ) => o !== observer ) ;
9983 observers = observers . filter ( ( o ) => o [ 0 ] !== observer ) ;
10084 subscription . unsubscribe ( ) ;
85+ processNext ( ) ;
10186 } ;
10287 } ) ;
10388 } ;
0 commit comments