Счетчик скользящего окна используется для отслеживания частоты повторяющихся действий, происходящих в течение определенного периода. Одним из распространенных применений счетчика скользящего окна являются алгоритмы ограничения скорости, где он используется для отслеживания количества вызовов API, сделанных клиентом. Сравнивая это количество с заданным порогом, можно ограничить или отрегулировать избыточные вызовы от одного и того же клиента.

Пакет Node counter-sliding-window обеспечивает реализацию как счетчика скользящего окна, так и счетчика распределенного скользящего окна.

Базовый счетчик скользящего окна функционирует, сохраняя в памяти запись о количестве вызовов API для каждого клиента. Эта запись хранится в списке, упорядоченном по метке времени каждого вызова в порядке убывания. Всякий раз, когда клиент делает новый вызов, новая запись добавляется в начало их соответствующего списка.

Чтобы определить количество вызовов API, сделанных клиентом в течение периода скользящего окна, метод get просматривает список вызовов и суммирует объекты, попадающие в текущий период скользящего окна. Чтобы избежать проблем с использованием памяти, счетчик автоматически очищает сущности старше периода скользящего окна при каждом вызове методов get или add.

import { SlidingWindowCounterLocal } from "counter-sliding-window"

const sliding = new SlidingWindowCounterLocal(5, "seconds")

// [Time 0]
sliding.add(1)

// After 3 seconds [Time 3]
sliding.add(2)

// After 1 more second [Time 4]
console.log(sliding.get()) // prints 3

// After 3 more seconds [Time 7]
console.log(sliding.get()) // prints 2

// After 2 more seconds [Time 9]
console.log(sliding.get()) // prints 0

Реализация Redis

Счетчик в памяти может эффективно справляться с задачей подсчета действий на одном сервере. Однако он не подходит для распределенной системы, где несколько серверов обрабатывают входящие запросы. В этом сценарии необходимо решение с общим хранилищем, и Redis — отличный выбор для управления этими счетчиками из-за его общего характера и высокой скорости.

Реализация отсортированного набора

Чтобы реализовать счетчик скользящего окна в Redis, мы можем использовать сортированные наборы, которые обычно используются в Redis для представления данных временных рядов. Sorted Sets хранить уникальные элементы с оценкой для каждого члена, что позволяет легко подсчитывать и извлекать элементы из набора в пределах указанного диапазона оценок. Поскольку временные метки упорядочены естественным образом, мы можем использовать часть оценки Sorted Set для хранения временной метки каждого вызова API, что упрощает подсчет и получение вызовов, выполненных в диапазоне временных меток, равном периоду скользящего окна.

Чтобы добавить новый счетчик вызовов API к нашему ключу счетчика Redis, мы можем использовать команду ZADD, как показано ниже:

-- ZADD counter [timestamp] [value]
ZADD counter 1657961915 1

Чтобы подсчитать количество вызовов API в последнем периоде скользящего окна, мы можем использовать команду ZCOUNT с оценкой max now в миллисекундах и оценкой min now - window_size, где now — текущая метка времени, а window_size — продолжительность скользящего окна. период окна в миллисекундах, как показано ниже:

-- ZCOUNT counter [now timestamp] [now timestamp minus window period size of 1 minute]
ZCOUNT counter 1672499174 1672499114

Ограничения отсортированного набора

Одно из ограничений использования отсортированного набора заключается в том, что это прежде всего набор, а это означает, что он может хранить значение только один раз. Например, если мы подсчитываем вызовы API в одноминутном скользящем окне с отметкой времени в качестве оценки и числом счетчиков в качестве значения, мы можем пропустить вызовы API. Если мы добавим два вызова в течение минуты, причем каждый вызов представляет собой один вызов API, мы ожидаем, что получим два отдельных элемента, по одному для каждой метки времени, со значением 1. Однако, поскольку это набор, значение 1 может быть сохранено только один раз, в результате чего второй элемент переопределяет первый и приводит к подсчету пропущенных запросов.

ZADD counter 1657961915 1
ZADD counter 1657961937 1

Чтобы решить эту проблему, мы можем добавить метку времени в качестве префикса к значению, что позволит нам сохранять одно и то же значение более одного раза. Тем не менее, он все еще может столкнуться с проблемами при получении более одного запроса с одной и той же меткой времени, что возможно в распределенной среде. Полное решение требует смешивания значения с некоторым случайным префиксом.

ZADD counter 1657961915 1657961915:1
ZADD counter 1657961937 1657961937:1

Очистка элементов, которые старше текущего скользящего окна, также является проблемой с реализацией Sorted Set. Для всесторонней и подробной реализации счетчика скользящего окна с использованием отсортированных наборов вы можете обратиться к Как реализовать приложение с ограничением скорости скользящего окна с помощью ASP.NET Core и Redis.

Использование потоков Redis

Потоки Redis были представлены в версии 5 и представляют собой структуру данных только для добавления, предназначенную для данных временных рядов, таких как журналы или события. Они предлагают параллельную модель потребления данных, аналогичную темам Kafka, и обеспечивают более надежную реализацию pub-sub, чем альтернатива SUBSCRIBE/PUBLISH.

Потоки Redis также подходят для реализации счетчиков скользящих окон, поскольку они поддерживают добавление записей, отсортированных по времени, и запрос записей по временному диапазону.

Чтобы добавить запись в поток, мы можем использовать следующую команду:

XADD couter_key * count "1"

Это добавляет запись в поток counter_key со структурой, аналогичной count=1. * говорит Redis создать автоматический идентификатор для записи со структурой, подобной [redis current Unix timestamp]-[sequence] (например, 1657985420-0). Вызывая эту команду для каждого входящего запроса, мы можем регистрировать count=1 в отметке времени Unix сервера Redis.

Потоки Redis позволяют нам получать записи по временному диапазону. Для временного окна в 5 секунд (5000 мс) мы можем запросить диапазон записей между now и now - 5000 с помощью следующей команды:

XRANGE couter_key 1657987420 +

1657987420 + указывает диапазон идентификаторов записей, начиная с 1657987420 и заканчивая максимальным идентификатором в потоке. Поскольку мы позволяем Redis автоматически генерировать идентификаторы с временными метками Unix, это запрос в диапазоне времени [1657987420, now].

Команда XRANGE возвращает список записей, который выглядит следующим образом:

1) 1) 1657985420-0
   2) 1) "count"
      2) "1"
2) 1) 1657985820-0
   2) 1) "count"
      2) "1"

Суммируя count полей записей в диапазоне временного окна, мы можем определить количество искомых запросов. Мы обсудим реальную реализацию этого позже. Для начала поговорим об очистке потока от старых записей.

Очистка старых записей

Когда сильно загруженная система получает много запросов, Redis может быстро заполниться записями счетчика. Очень важно, чтобы поток был сосредоточен на соответствующих записях временного диапазона.

Чтобы удалить все записи с идентификаторами до временной метки Unix 1657987420 (MINID), мы используем команду XTRIM.

XTRIM counter_key MINID ~ 1657987420

Операнд ~ позволяет Redis оптимизировать фактическое количество удаляемых объектов, не удаляя каждую старую запись. Мы не заинтересованы в удалении всех старых записей, так как XRANGE все равно возвращает только релевантные.

Мы можем объединить добавление новых записей и очистку старых записей в одной команде XADD:

XADD MINID ~ 1657987420 couter_key * count "1"

Другая важная очистка включает в себя удаление старых потоков счетчиков, в которых хранятся ключи Redis старше диапазона временного окна. Мы используем команду Redis EXPIRE, чтобы установить тайм-аут для ключа Redis, удерживающего поток. Эта команда указывает Redis удалить поток по истечении времени ожидания.

Чтобы срок действия ключа оставался текущим, мы устанавливаем EXPIRE для ключа потока со значением времени ожидания, равным периоду скользящего окна. Когда мы публикуем в течение периода скользящего окна, новое значение тайм-аута EXPIRE заменяет предыдущее, предписывая Redis сохранять ключ по крайней мере в течение следующего периода скользящего окна. Если мы не опубликуем в потоке в течение следующего периода скользящего окна, Redis автоматически удалит ключ.

Мы можем комбинировать XADD с EXPIRE, чтобы неиспользуемые потоки автоматически исчезали в течение периода временного окна 5 секунд (5000 мс):

XADD MINID ~ 1657987420 couter_key * count "1"
EXPIRE couter_key 5000

Lua-скрипт

Чтобы обеспечить точные результаты, расчет диапазона временного окна, проверка ограничения скорости и регистрация нового вызова API должны выполняться атомарно. Этого можно добиться с помощью сценария Lua, который по своей природе работает в рамках транзакции Redis. Предоставленный Lua-скрипт извлекает диапазон потока от идентификатора начального окна до самого последнего идентификатора, суммирует счетчики диапазона, проверяет, превышает ли счетчик ограничение скорости, добавляет счетчик, сохраняя поток свободным от старых записей, и устанавливает срок действия потока.

-- `now` is an array with first item as the linux timestamp
local now = redis.call('TIME')
-- ARGV[1] holds the time window size in milliseconds
local start_window = tonumber(now[1]) - tonumber(ARGV[1])

-- Fetching stream range from start window id till latest id
local range = redis.call('XRANGE', KEYS[1], start_window, '+')

-- Summing the range counters
local count = 0;
for _, item in ipairs(range) do
    count = count + tonumber(item[2][2])
end

-- ARGV[3] holds the rate limitation
if tonumber(ARGV[3]) > 0 and count >= ARGV[3] then
    return -1
end

-- Adding the counter and keeping the stream cleaned of old entries
redis.call('XADD', KEYS[1], 'MINID', '~', start_window, '*', 'count', ARGV[3])
-- Setting stream expiration
redis.call('EXPIRE', KEYS[1], ARGV[1])
return 1

Применение

Для реализации NodeJS с библиотекой ioredis определена новая команда streamCounterAdd для выполнения сценария Lua.

import Redis, { Callback, RedisOptions, Result } from "ioredis"

declare module "ioredis" {
  interface RedisCommander<Context> {
    streamCounterAdd(
      key: string,
      windowMs: string,
      count: string,
      limit: string,
      callback?: Callback<string>
    ): Result<string, Context>
  }
}

const redis = new Redis({
  scripts: {
    streamCounterAdd: {
      numberOfKeys: 1,
      lua: `
                local now = redis.call('TIME')
                local start_window = tonumber(now[1]) - tonumber(ARGV[1])
                local range = redis.call('XRANGE', KEYS[1], start_window, '+')
                local count = 0;
                for _, item in ipairs(range) do
                    count = count + tonumber(item[2][2])
                end
                if tonumber(ARGV[3]) > 0 and count >= ARGV[3] then
                    return -1
                end
                redis.call('XADD', KEYS[1], 'MINID', '~', start_window, '*', 'count', ARGV[3])
                redis.call('EXPIRE', KEYS[1], ARGV[1])
                return 1
            `,
    },
  },
})

Функция throttleRequest использует команду streamCounterAdd для применения ограничения скорости для каждого клиента и возвращает значение true, если запрос должен регулироваться.

async function throttleRequest(
  counterName: string,
  windowPeriondMS: number,
  limit: number
): Promise<bool> {
  const res = await this.redis.streamCounterAdd(
    counterName,
    windowPeriondMS.toString(),
    "1",
    limit.toString()
  )
  return parseInt(res) < 0
}

Ограничитель скорости

Пример реализации ограничения скорости API для Express с использованием библиотеки counter-sliding-window использует токен клиента в качестве имени счетчика:

import { SlidingWindowCounterRedis } from "counter-sliding-window"

app.get("/do-something", async (req, res) => {
  const authToken = req.headers["authorization"]
  if (await throttleRequest(authToken, 5000, 10)) {
    // HTTP 429 Too Many Requests
    return res.sendStatus(429)
  }
  // Do the actual stuff ...
})

Промежуточное ПО rateLimiter может быть определено для применения ограничения скорости ко всем обработчикам. Промежуточное ПО вызывает функцию throttleRequest, чтобы проверить, должен ли запрос регулироваться, и отвечает HTTP 429 Too Many Requests, если запрос регулируется.

function rateLimiter(windowPeriondMS: number, limit: number) {
  return async function (
    request: Request,
    response: Response,
    next: NextFunction
  ): Promise<void> {
    const authToken = req.headers["authorization"]
    if (await throttleRequest(authToken, windowPeriondMS, limit)) {
      // HTTP 429 Too Many Requests
      return res.sendStatus(429)
    }
    next()
  }
}


app.use(rateLimiter(5000, 10))

Первоначально опубликовано на https://trycatch22.net.