This tutorial uses Lettuce, which is an unsupported Redis library. For production applications, we recommend using Jedis
RedisGears is a dynamic server-side data processing engine, where the "server" part is Redis itself. RedisGears is distributed as a Redis module. You can start a Redis instance preconfigured with Gears using the official Docker image:
docker run -p 6379:6379 redislabs/redisgears:latestOr, as I do most of the time, using the "redismod" image which include Gears and all the other Redis, Inc. supported modules:
docker run -p 6379:6379 redislabs/redismodRedisGears was built with the purpose of providing a data processing engine inside of Redis, with more formal semantics than the simpler Lua server-side scripting. Think of it as a more flexible map-reduce engine for Redis. It supports supports transaction, batch, and event-driven processing of Redis data. Gears allow you to localize computation and data provides a built-in coordinator to facilitate processing distributed data in a clustered environment.
In RedisGears, the main unit of processing is the RedisGears function, which can be (currently) written in Python (more languages are being worked on). These functions run on their own thread, separate from Redis' main thread and can be executed in response to keyspace events or imperatively as a result of external commands. The functions are "registered" (deployed) to the Gears engine, and have an associated name and a registration Id.
During registration we pick a specific reader for our function which defines how the function gets its initial data:
KeysReader: Redis keys and values.KeysOnlyReader: Redis keys.StreamReader: Redis Stream messages.PythonReader: Arbitrary Python generator.ShardsIDReader: Shard ID.CommandReader: Command arguments from application client.Depending on the reader type, Gear Functions can either be run immediately, on demand, as batch jobs or in an event-driven manner by registering it to trigger automatically on various types of events.
The Python function rate_limit takes 3 parameters:
key: The Redis key backing the counter for a given user.max_request: The request quota for the user.expiry: The number of seconds in the future to set the counter TTL.def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')
Place the script under src/main/resources/scripts. Now, Let's
break it down:
rate_limit function
Similarly to what we did in the previous implementation, we:
key by
execute-ing the GET command.
int and if not found, default to
-1
max_requests and expiry to int
INCR/EXPIRE
commands in a transactions (with atomic():) and return
False (no rate limiting - request is allowed)
True (deny the request)# Function registration section, we instantiate the
GearsBuilder(GB)
using the CommandReader reader. The
GearsBuilder "builds" the context of the function, in
parameters, transformations, triggers, etc.
map method to performs a one-to-one mapping of
records to the params of the rate_limit function via a mapper
function callback.
register action to register the function
as an event handler. The event in our case is the trigger
'RateLimiter'.
Depending on the reader type, Gear Functions can either be run immediately, on demand, as batch jobs or in an event-driven manner by registering it to trigger automatically on various types of events.
The Python function rate_limit takes 3 parameters:
key: The Redis key backing the counter for a given user.max_request: The request quota for the user.expiry: The number of seconds in the future to set the counter
TTL.
def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')
Place the script under src/main/resources/scripts. Now, Let's
break it down:
Similarly to what we did in the previous implementation, we:
key by
execute-ing the GET command.
int and if not found, default to
-1
max_requests and expiry to int
INCR/EXPIRE
commands in a transactions (with atomic():) and return
False (no rate limiting - request is allowed)
True (deny the request)# Function registration section, we instantiate the
GearsBuilder(GB) using the CommandReader
reader. The GearsBuilder "builds" the context of the function, in parameters, transformations, triggers, etc.map method to performs a one-to-one mapping of records to the params of the rate_limit
function via a mapper function callback.register action to register the function as an event handler. The event in our case is the
trigger 'RateLimiter'.In order to use our RedisGear function from our SpringBoot application we need to do a few things:
LettuceMod is a Java client for Redis Modules based on Lettuce created by Julien Ruaux . It supports the following modules in standalone or cluster configurations:
To use LettuceMod we'll add the dependency to the Maven POM as shown:
<dependency>
<groupId>com.redis</groupId>
<artifactId>spring-lettucemod</artifactId>
<version>1.7.0</version>
</dependency>To access any of the LettuceMod supported modules we will inject a StatefulRedisModulesConnection in
our FixedWindowRateLimiterApplication class as follows:
@Autowired
StatefulRedisModulesConnection<String, String> connection;Add the matching import statement:
import com.redis.lettucemod.api.StatefulRedisModulesConnection;We'll start by writing a function to determine whether the function with the trigger RateLimiter has been
registered. It takes a List of Registrations and digs deep to extract the value of the trigger argument
using the Java Streams API:
private Optional<String> getGearsRegistrationIdForTrigger(List<Registration> registrations, String trigger) {
return registrations.stream().filter(r -> r.getData().getArgs().get("trigger").equals(trigger)).findFirst().map(Registration::getId);
}In the @PostConstruct annotated method loadGearsScript method:
RedisGearsCommands from the previously injected StatefulRedisModulesConnectiondumpregistrations methodgetGearsRegistrationIdForTriggerString named pypyexecute method passing the py script payload@PostConstruct
public void loadGearsScript() throws IOException {
String py = StreamUtils.copyToString(new ClassPathResource("scripts/rateLimiter.py").getInputStream(),
Charset.defaultCharset());
RedisGearsCommands<String, String> gears = connection.sync();
List<Registration> registrations = gears.dumpregistrations();
Optional<String> maybeRegistrationId = getGearsRegistrationIdForTrigger(registrations, "RateLimiter");
if (maybeRegistrationId.isEmpty()) {
try {
ExecutionResults er = gears.pyexecute(py);
if (er.isOk()) {
logger.info("RateLimiter.py has been registered");
} else if (er.isError()) {
logger.error(String.format("Could not register RateLimiter.py -> %s", Arrays.toString(er.getErrors().toArray())));
}
} catch (RedisCommandExecutionException rcee) {
logger.error(String.format("Could not register RateLimiter.py -> %s", rcee.getMessage()));
}
} else {
logger.info("RateLimiter.py has already been registered");
}
}Next, we'll modify the filter to include the StatefulRedisModulesConnection as well as the
quota; the value that we need to pass to the function:
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private StatefulRedisModulesConnection<String, String> connection;
private Long maxRequestPerMinute;
public RateLimiterHandlerFilterFunction(StatefulRedisModulesConnection<String, String> connection,
Long maxRequestPerMinute) {
this.connection = connection;
this.maxRequestPerMinute = maxRequestPerMinute;
}Now we can modify the filter method to use the function. Gears functions are invoked by
triggering the correct event RateLimiter and passing the parameters required by the function;
the key, the quota and the TTL seconds in the future.
As we've have done previously, if the function returns false we let the request through, otherwise
we return an HTTP 429:
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
RedisGearsCommands<String, String> gears = connection.sync();
List<Object> results = gears.trigger("RateLimiter", key, Long.toString(maxRequestPerMinute), "59");
if (!results.isEmpty() && !Boolean.parseBoolean((String) results.get(0))) {
return next.handle(request);
} else {
return ServerResponse.status(TOO_MANY_REQUESTS).build();
}
}Once again, we use curl loop to test the limiter:
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; doneYou should see the 21st request being rejected:
➜ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.064786 s
PONG :: HTTP 200, 4 bytes, 0.009926 s
PONG :: HTTP 200, 4 bytes, 0.009546 s
PONG :: HTTP 200, 4 bytes, 0.010189 s
PONG :: HTTP 200, 4 bytes, 0.009399 s
PONG :: HTTP 200, 4 bytes, 0.009210 s
PONG :: HTTP 200, 4 bytes, 0.008333 s
PONG :: HTTP 200, 4 bytes, 0.008009 s
PONG :: HTTP 200, 4 bytes, 0.008919 s
PONG :: HTTP 200, 4 bytes, 0.009271 s
PONG :: HTTP 200, 4 bytes, 0.007515 s
PONG :: HTTP 200, 4 bytes, 0.007057 s
PONG :: HTTP 200, 4 bytes, 0.008373 s
PONG :: HTTP 200, 4 bytes, 0.007573 s
PONG :: HTTP 200, 4 bytes, 0.008209 s
PONG :: HTTP 200, 4 bytes, 0.009080 s
PONG :: HTTP 200, 4 bytes, 0.007595 s
PONG :: HTTP 200, 4 bytes, 0.007955 s
PONG :: HTTP 200, 4 bytes, 0.007693 s
PONG :: HTTP 200, 4 bytes, 0.008743 s
:: HTTP 429, 0 bytes, 0.007226 s
:: HTTP 429, 0 bytes, 0.007388 sIf we run Redis in monitor mode, we should see the Lua calls to RG.TRIGGER and under that you should see the
calls to GET, INCR and EXPIRE for allowed requests:
1631249244.006212 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.006995 [0 ?:0] "GET" "rl_localhost:47"
1631249244.007182 [0 ?:0] "INCR" "rl_localhost:47"
1631249244.007269 [0 ?:0] "EXPIRE" "rl_localhost:47" "59"And for rate limited request you should see only the call to GET:
1631249244.538478 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.538809 [0 ?:0] "GET" "rl_localhost:47"The complete code for this implementation is under the branch with_gears.