mirror of
https://github.com/fallenbagel/jellyseerr.git
synced 2026-01-05 14:18:35 -05:00
fix: rewrite the rate limit utility (#896)
This commit is contained in:
@@ -7,9 +7,10 @@ type RateLimiteState<T extends (...args: Parameters<T>) => Promise<U>, U> = {
|
|||||||
queue: {
|
queue: {
|
||||||
args: Parameters<T>;
|
args: Parameters<T>;
|
||||||
resolve: (value: U) => void;
|
resolve: (value: U) => void;
|
||||||
|
reject: (reason?: unknown) => void;
|
||||||
}[];
|
}[];
|
||||||
activeRequests: number;
|
lastTimestamps: number[];
|
||||||
timer: NodeJS.Timeout | null;
|
timeout: ReturnType<typeof setTimeout>;
|
||||||
};
|
};
|
||||||
|
|
||||||
const rateLimitById: Record<string, unknown> = {};
|
const rateLimitById: Record<string, unknown> = {};
|
||||||
@@ -27,46 +28,40 @@ export default function rateLimit<
|
|||||||
>(fn: T, options: RateLimitOptions): (...args: Parameters<T>) => Promise<U> {
|
>(fn: T, options: RateLimitOptions): (...args: Parameters<T>) => Promise<U> {
|
||||||
const state: RateLimiteState<T, U> = (rateLimitById[
|
const state: RateLimiteState<T, U> = (rateLimitById[
|
||||||
options.id || ''
|
options.id || ''
|
||||||
] as RateLimiteState<T, U>) || { queue: [], activeRequests: 0, timer: null };
|
] as RateLimiteState<T, U>) || { queue: [], lastTimestamps: [] };
|
||||||
if (options.id) {
|
if (options.id) {
|
||||||
rateLimitById[options.id] = state;
|
rateLimitById[options.id] = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
const processQueue = () => {
|
const processQueue = () => {
|
||||||
if (state.queue.length === 0) {
|
// remove old timestamps
|
||||||
if (state.timer) {
|
state.lastTimestamps = state.lastTimestamps.filter(
|
||||||
clearInterval(state.timer);
|
(timestamp) => Date.now() - timestamp < 1000
|
||||||
state.timer = null;
|
);
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (state.activeRequests < options.maxRPS) {
|
if (state.lastTimestamps.length < options.maxRPS) {
|
||||||
state.activeRequests++;
|
// process requests if RPS not exceeded
|
||||||
const item = state.queue.shift();
|
const item = state.queue.shift();
|
||||||
if (!item) break;
|
if (!item) return;
|
||||||
const { args, resolve } = item;
|
state.lastTimestamps.push(Date.now());
|
||||||
|
const { args, resolve, reject } = item;
|
||||||
fn(...args)
|
fn(...args)
|
||||||
.then(resolve)
|
.then(resolve)
|
||||||
.finally(() => {
|
.catch(reject);
|
||||||
state.activeRequests--;
|
processQueue();
|
||||||
if (state.queue.length > 0) {
|
} else {
|
||||||
if (!state.timer) {
|
// rerun once the oldest item in queue is older than 1s
|
||||||
state.timer = setInterval(processQueue, 1000);
|
if (state.timeout) clearTimeout(state.timeout);
|
||||||
}
|
state.timeout = setTimeout(
|
||||||
} else {
|
processQueue,
|
||||||
if (state.timer) {
|
1000 - (Date.now() - state.lastTimestamps[0])
|
||||||
clearInterval(state.timer);
|
);
|
||||||
state.timer = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
return (...args: Parameters<T>): Promise<U> => {
|
return (...args: Parameters<T>): Promise<U> => {
|
||||||
return new Promise<U>((resolve) => {
|
return new Promise<U>((resolve, reject) => {
|
||||||
state.queue.push({ args, resolve });
|
state.queue.push({ args, resolve, reject });
|
||||||
processQueue();
|
processQueue();
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user