mirror of
https://github.com/langgenius/dify.git
synced 2026-04-05 11:11:12 +08:00
fix(rate_limit): flush redis cache when __init__ is triggered by changing max_active_requests (#33830)
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
@@ -19,6 +19,7 @@ class RateLimit:
|
|||||||
_REQUEST_MAX_ALIVE_TIME = 10 * 60 # 10 minutes
|
_REQUEST_MAX_ALIVE_TIME = 10 * 60 # 10 minutes
|
||||||
_ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL = 5 * 60 # recalculate request_count from request_detail every 5 minutes
|
_ACTIVE_REQUESTS_COUNT_FLUSH_INTERVAL = 5 * 60 # recalculate request_count from request_detail every 5 minutes
|
||||||
_instance_dict: dict[str, "RateLimit"] = {}
|
_instance_dict: dict[str, "RateLimit"] = {}
|
||||||
|
max_active_requests: int
|
||||||
|
|
||||||
def __new__(cls, client_id: str, max_active_requests: int):
|
def __new__(cls, client_id: str, max_active_requests: int):
|
||||||
if client_id not in cls._instance_dict:
|
if client_id not in cls._instance_dict:
|
||||||
@@ -27,7 +28,13 @@ class RateLimit:
|
|||||||
return cls._instance_dict[client_id]
|
return cls._instance_dict[client_id]
|
||||||
|
|
||||||
def __init__(self, client_id: str, max_active_requests: int):
|
def __init__(self, client_id: str, max_active_requests: int):
|
||||||
|
flush_cache = hasattr(self, "max_active_requests") and self.max_active_requests != max_active_requests
|
||||||
self.max_active_requests = max_active_requests
|
self.max_active_requests = max_active_requests
|
||||||
|
# Only flush here if this instance has already been fully initialized,
|
||||||
|
# i.e. the Redis key attributes exist. Otherwise, rely on the flush at
|
||||||
|
# the end of initialization below.
|
||||||
|
if flush_cache and hasattr(self, "active_requests_key") and hasattr(self, "max_active_requests_key"):
|
||||||
|
self.flush_cache(use_local_value=True)
|
||||||
# must be called after max_active_requests is set
|
# must be called after max_active_requests is set
|
||||||
if self.disabled():
|
if self.disabled():
|
||||||
return
|
return
|
||||||
@@ -41,8 +48,6 @@ class RateLimit:
|
|||||||
self.flush_cache(use_local_value=True)
|
self.flush_cache(use_local_value=True)
|
||||||
|
|
||||||
def flush_cache(self, use_local_value=False):
|
def flush_cache(self, use_local_value=False):
|
||||||
if self.disabled():
|
|
||||||
return
|
|
||||||
self.last_recalculate_time = time.time()
|
self.last_recalculate_time = time.time()
|
||||||
# flush max active requests
|
# flush max active requests
|
||||||
if use_local_value or not redis_client.exists(self.max_active_requests_key):
|
if use_local_value or not redis_client.exists(self.max_active_requests_key):
|
||||||
@@ -50,7 +55,8 @@ class RateLimit:
|
|||||||
else:
|
else:
|
||||||
self.max_active_requests = int(redis_client.get(self.max_active_requests_key).decode("utf-8"))
|
self.max_active_requests = int(redis_client.get(self.max_active_requests_key).decode("utf-8"))
|
||||||
redis_client.expire(self.max_active_requests_key, timedelta(days=1))
|
redis_client.expire(self.max_active_requests_key, timedelta(days=1))
|
||||||
|
if self.disabled():
|
||||||
|
return
|
||||||
# flush max active requests (in-transit request list)
|
# flush max active requests (in-transit request list)
|
||||||
if not redis_client.exists(self.active_requests_key):
|
if not redis_client.exists(self.active_requests_key):
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -68,8 +68,8 @@ class TestRateLimit:
|
|||||||
assert rate_limit.disabled()
|
assert rate_limit.disabled()
|
||||||
assert not hasattr(rate_limit, "initialized")
|
assert not hasattr(rate_limit, "initialized")
|
||||||
|
|
||||||
def test_should_skip_reinitialization_of_existing_instance(self, redis_patch):
|
def test_should_flush_cache_when_reinitializing_existing_instance(self, redis_patch):
|
||||||
"""Test that existing instance doesn't reinitialize."""
|
"""Test existing instance refreshes Redis cache on reinitialization."""
|
||||||
redis_patch.configure_mock(
|
redis_patch.configure_mock(
|
||||||
**{
|
**{
|
||||||
"exists.return_value": False,
|
"exists.return_value": False,
|
||||||
@@ -82,7 +82,37 @@ class TestRateLimit:
|
|||||||
|
|
||||||
RateLimit("client1", 10)
|
RateLimit("client1", 10)
|
||||||
|
|
||||||
|
redis_patch.setex.assert_called_once_with(
|
||||||
|
"dify:rate_limit:client1:max_active_requests",
|
||||||
|
timedelta(days=1),
|
||||||
|
10,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_should_reinitialize_after_being_disabled(self, redis_patch):
|
||||||
|
"""Test disabled instance can be reinitialized and writes max_active_requests to Redis."""
|
||||||
|
redis_patch.configure_mock(
|
||||||
|
**{
|
||||||
|
"exists.return_value": False,
|
||||||
|
"setex.return_value": True,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# First construct with max_active_requests = 0 (disabled), which should skip initialization.
|
||||||
|
RateLimit("client1", 0)
|
||||||
|
|
||||||
|
# Redis should not have been written to during disabled initialization.
|
||||||
redis_patch.setex.assert_not_called()
|
redis_patch.setex.assert_not_called()
|
||||||
|
redis_patch.reset_mock()
|
||||||
|
|
||||||
|
# Reinitialize with a positive max_active_requests value; this should not raise
|
||||||
|
# and must write the max_active_requests key to Redis.
|
||||||
|
RateLimit("client1", 10)
|
||||||
|
|
||||||
|
redis_patch.setex.assert_called_once_with(
|
||||||
|
"dify:rate_limit:client1:max_active_requests",
|
||||||
|
timedelta(days=1),
|
||||||
|
10,
|
||||||
|
)
|
||||||
|
|
||||||
def test_should_be_disabled_when_max_requests_is_zero_or_negative(self):
|
def test_should_be_disabled_when_max_requests_is_zero_or_negative(self):
|
||||||
"""Test disabled state for zero or negative limits."""
|
"""Test disabled state for zero or negative limits."""
|
||||||
|
|||||||
Reference in New Issue
Block a user