В области анализа данных обнаружение аномалий играет жизненно важную роль в выявлении отклонений от того, что считается стандартным, нормальным или ожидаемым. Аномалии могут дать важную информацию о потенциальных проблемах, исключительных событиях или выбросах в показателях. В этой статье мы сосредоточимся на обнаружении аномалий одномерных временных рядов, которое включает анализ отдельных показателей во времени. Мы рассмотрим различные методы одномерного обнаружения аномалий, начиная от подходов, основанных на правилах и на основе дисперсии, и заканчивая сравнением двух окон данных. Кроме того, мы обсудим математическую концепцию сезонности и то, как можно использовать преобразование Фурье для определения циклического поведения данных временных рядов.

Что такое аномалия

Словарь определяет anomaly как:

человек или вещь, которые отличаются от обычных или не согласуются с чем-то другим и поэтому неудовлетворительны

источник: Кембриджский словарь

Различие и согласие являются математическими и статистическими понятиями соответственно, в которых различие относится к абсолютной разнице или разнице между точками, а согласие относится к предшествующим скрытым источникам (распределениям, сигналам и т.п.), которые имеют или не разделяют разные точки данных. С математической точки зрения, аномалии — это сравнительный анализ, и поэтому всегда нужно сравнивать как минимум 2 группы. Во временных рядах создание этих групп будет следовать последовательности самого времени. Более того, эта группировка может быть для отдельных точек данных или для наборов точек данных, которые мы будем называть окном. Поскольку это окно перемещается с течением времени, эти окна часто называют движущимися или скользящими окнами.

Окна и агрегаты

Алгоритмы обнаружения аномалий в основном работают с использованием движущихся или скользящих окон, которые фиксируют подмножество точек данных с течением времени. Эти окна служат основой для вычисления различных агрегаций, таких как дисперсия, дисперсия дисперсии, среднее значение и медиана. Оценивая статистические свойства данных в этих окнах, алгоритмы могут выявлять отклонения или неожиданные закономерности в анализируемых показателях. Этот оконный подход позволяет обнаруживать аномалии, рассматривая контекстуальную информацию, предоставляемую окружающими точками данных, что позволяет всесторонне понять поведение данных во времени.

Интермеццо: равноудаленный против неравноудалённого

Данные временных рядов можно разделить на два типа: равноудаленные и неравноудаленные. Данные эквидистантных временных рядов состоят из измерений, записанных через равные промежутки времени, например ежечасно, ежедневно или ежемесячно. С другой стороны, неэквидистантные данные временных рядов имеют нерегулярные временные интервалы между измерениями. Неэквидистантные данные временных рядов могут создавать проблемы при анализе из-за неравномерного интервала между наблюдениями и необходимости использования соответствующих методов обработки. Простым примером может быть SUM по определенному временному окну. При равноудаленных данных эти n точек данных будут одинаковыми для каждого окна. Для неэквидистантных данных SUM будет иметь различные n точек данных, что дает совокупные значения, которые больше несопоставимы.

Неравноудаленные точки данных могут быть сложной темой сами по себе и выходят за рамки этой статьи. Если вы имеете дело с неэквидистантными данными и хотите следовать указаниям этой статьи, лучшим подходом является повторная выборка ваших данных в равноудаленные данные. Для этого подхода можно использовать библиотеку Pandas. Простой метод повторной выборки pandas возвращает равноудаленный массив точек данных временного ряда. Пример приведен ниже:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 
# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
print(f"The dataset has the following amount of values in 1 hours buckets {df.resample('1h').count().value_counts()}")

# drop some values at random
drop_indices = np.random.choice(df.index, 2000, replace=False)
df_subset = df.drop(drop_indices)
print(f"The subset has the following amount of values in 1 hours buckets {df_subset.resample('1h').count().value_counts()}")

# In order to correct such as issue you can clean it up with resample, mean and bfill of ffil
cleaned_subset = df_subset.resample("5min").mean().ffill()
print(f"The cleaned subset has the following amount of values in 1 hours buckets {cleaned_subset.resample('1h').count().value_counts()}")

который дает:

The dataset has the following amount of values in 1 hours buckets value
12       336
dtype: int64
The subset has the following amount of values in 1 hours buckets value
6        88
7        70
5        51
4        42
8        33
9        23
3        15
2         7
10        6
1         1
dtype: int64
The cleaned subset has the following amount of values in 1 hours buckets value
12       335
11         1
dtype: int64

В этом примере DataFrame df имеет индекс даты и времени, представляющий данные временного ряда. Чтобы ввести случайные промежутки с выпадением 2000 предметов случайным образом. DataFrame имеет два столбца: «Value», содержащее наблюдаемые значения, и «timestamp», содержащее соответствующие метки времени. Используя функцию resample и указав желаемую частоту повторной выборки, в данном случае '5Min'для 5-минутного интервала, мы можем выполнить повторную выборку DataFrame. Здесь мы используем функцию агрегирования среднего значения mean() для вычисления среднего значения в пределах каждого 5-минутного интервала. Будут корзины, которые не содержат никаких значений, поэтому мы заполняем их, чтобы обеспечить согласованность

Одномерное обнаружение аномалий временных рядов

Одномерное обнаружение аномалий фокусируется на выявлении аномалий в отдельных показателях. Для одномерного обнаружения аномалий можно использовать несколько методов.

Обнаружение аномалий на основе правил/порогов

Обнаружение аномалий на основе правил или порогов включает установку предопределенных порогов или правил для выявления аномалий. Например, если значение метрики превышает указанный порог, это может рассматриваться как аномалия. Библиотека ADTK (Anomaly Detection Toolkit) на Python предоставляет удобный функционал для реализации обнаружения на основе правил. Рассмотрим пример с использованием детектора ThresholdAD из библиотеки ADTK:

from adtk.detector import ThresholdAD
from adtk.visualization import plot
import pandas as pd

# Create the ThresholdAD detector
detector = ThresholdAD(high=100)

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)

# Detect anomalies
anomalies = detector.predict(df)
plot(df, anomaly=anomalies, anomaly_color="red", anomaly_tag="marker")

Обнаружение на основе дисперсии

Методы обнаружения на основе дисперсии используют статистические меры для выявления аномалий. Обычно используются три подхода: обнаружение на основе квантилей, обнаружение межквантильного диапазона (IQR) и тест обобщенного экстремального студенческого отклонения (ESD). Эти методы принципиально оценивают следующее значение по сравнению с окном значений, которое было раньше. Давайте кратко объясним каждый метод:

  1. Обнаружение на основе квантилей: этот подход вычисляет квантили значений метрик и идентифицирует наблюдения, выходящие за пределы указанного диапазона квантилей, как аномалии.
  2. Обнаружение межквантильного диапазона (IQR): этот метод использует диапазон между верхним и нижним квантилями (IQR) для выявления аномалий. Наблюдения за пределами диапазона IQR считаются аномальными.
  3. Обобщенный тест экстремального студенческого отклонения (ESD): тест ESD выявляет аномалии путем сравнения максимального отклонения наблюдения с ожидаемым максимальным отклонением. Он итеративно удаляет самые крайние аномалии до тех пор, пока больше не будет найдено.

Ниже приведен пример, демонстрирующий реализацию QuantileAD и GeneralizedESDTestAD с использованием библиотеки ADTK:

from adtk.detector import QuantileAD
from adtk.visualization import plot
import pandas as pd

# Create the QuantileAD detector
detector = QuantileAD(high=0.95, low=0.05)

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)

# Detect anomalies
anomalies = detector.fit_predict(df)
plot(df, anomaly=anomalies, anomaly_color="red", anomaly_tag="marker")

from adtk.detector import GeneralizedESDTestAD
from adtk.visualization import plot
import pandas as pd

# Create the GeneralizedESDTestAD detector
detector = GeneralizedESDTestAD(alpha=300)

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)

# Detect anomalies
anomalies = detector.fit_predict(df)
plot(df, anomaly=anomalies, anomaly_color="red", anomaly_tag="marker")

Сравнение двух окон данных

Другой подход к измерению аномалий включает сравнение двух окон данных. Библиотека ADTK предоставляет объект DoubleRollingAggregate для облегчения этого сравнения. Используя этот подход, можно применять различные методы, такие как LevelshiftAD, PersistAD и VolatilityShiftAD. Эти методы также оценивают следующее значение по сравнению с окном значений, которые были ранее. Давайте кратко объясним каждый метод:

  1. LevelshiftAD: этот метод обнаруживает аномалии на основе значительных сдвигов уровня значений метрик. Он сравнивает текущее значение с медианой или средним значением предыдущего окна.
  2. PersistAD: метод PersistAD выявляет постоянные аномалии, которые сохраняются в последовательных окнах. Он оценивает отношение текущего значения к медиане или среднему значению предыдущего окна.
  3. VolatilityShiftAD: этот метод обнаруживает аномалии на основе значительных изменений волатильности или изменчивости значений метрик. Он сравнивает волатильность текущего значения с волатильностью предыдущего окна.

Ниже приведен пример, демонстрирующий реализацию LevelshiftAD, PersistAD и VolatilityShiftAD с использованием библиотеки ADTK:

from adtk.detector import LevelShiftAD
from adtk.visualization import plot
import pandas as pd

# Create the LevelShiftAD detector
window=30
detector = LevelShiftAD(window=window, c=5)

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)

# Detect anomalies
anomalies = detector.fit_predict(df)
plot(df, anomaly=anomalies, anomaly_color="red", anomaly_tag="marker")

from adtk.detector import PersistAD
from adtk.visualization import plot
import pandas as pd

# Create the PersistAD detector
window=100
detector = PersistAD(window=window, c=1)

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)

# Detect anomalies
anomalies = detector.fit_predict(df)
plot(df, anomaly=anomalies, anomaly_color="red", anomaly_tag="marker")

from adtk.detector import VolatilityShiftAD
from adtk.visualization import plot
import pandas as pd

# Create the VolatilityShiftAD detector
window=200
detector = VolatilityShiftAD(window=window, c=2)

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)

# Detect anomalies
anomalies = detector.fit_predict(df)
plot(df, anomaly=anomalies, anomaly_color="red", anomaly_tag="marker")

Сезонность и преобразование Фурье

Сезонность относится к повторяющимся моделям или циклическому поведению, наблюдаемому в данных временных рядов. Преобразование Фурье — это математический инструмент, используемый для анализа и вывода циклических сигналов из данных временных рядов. Применяя преобразование Фурье, можно определить доминирующие частоты или циклические компоненты, что дает представление о сезонном поведении. Затем на основе этих циклических закономерностей могут быть обнаружены аномалии. Пример кода можно найти ниже:

from adtk.detector import SeasonalAD
from adtk.visualization import plot
import pandas as pd

# Create the InterquantileAD detector
detector = SeasonalAD( c=6)

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)

# Detect anomalies
anomalies = detector.fit_predict(df)
plot(df, anomaly=anomalies, anomaly_color="red", anomaly_tag="marker")

Пространственные подходы

Хотя основное внимание в этой статье уделяется одномерному обнаружению аномалий в данных временных рядов, стоит упомянуть пространственные подходы к обнаружению аномалий. Пространственные подходы оценивают вероятность точки данных на основе ее присутствия в пространстве параметров. Эти методы учитывают распространенность данных в этом пространстве, чтобы определить вероятность аномалии. Подходы гистограммы и оценки плотности ядра (KDE) обычно используются для обнаружения пространственных аномалий. Разделив диапазон данных на сегменты и создав гистограмму, можно количественно определить относительную распространенность данных. Метод KDE дополнительно сглаживает гистограмму. Применяя масштабатор MinMax и scipy.interpolate.interp1d, мы можем оценить вероятность того, что значение является аномальным, где 0 указывает на аномалию, а 1 указывает на отсутствие аномалии. В примере ниже используется библиотека KDEpy. Метод быстрого преобразования Фурье подобен тому, что мы видели в примере с сезонностью, только на этот раз он используется для более быстрой аппроксимации плотности путем обнаружения путем обратного преобразования бинов в сигнал.

from KDEpy import FFTKDE
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
df

estimator = FFTKDE(kernel='epa')
# Data=df and 1000 Bins to calculate the density over
density_grid = estimator.fit(df.values).evaluate(1000)
# To make density between parameters comparable, standardization with MinMax scaler can be used
standardized_density = MinMaxScaler().fit_transform(density_grid[1].reshape(-1,1))
grid_range = density_grid[0]

plt.figure(figsize=(14,10))
plt.title("Standardized Density over Range of data using the EPA kernel")
plt.scatter(x=grid_range, y=standardized_density)

Приведенный выше пример кода дает оценку плотности, которую он визуализирует ниже. Обратите внимание, как диапазон определяется диапазоном набора данных, и что плотность была стандартизирована, чтобы упростить интерпретацию относительной плотности. Выбор для epa ядра Епанечникова сделан как быстрый в вычислительном отношении. Это напоминает естественную форму кривой Гаусса без бесконечных ветвей, которые имеет это распределение. Для получения дополнительной информации о ядрах ознакомьтесь с документацией KDEpy.

Следующим шагом будет включение плотности в алгоритм интерполяции, который также может работать со значениями за пределами предоставленных данных. Мы используем scipy.interpolate.interp1d для этого. На данном этапе интерполяция является методом моделирования и будет упоминаться в коде как таковая.

from KDEpy import FFTKDE
import pandas as pd
from scipy.interpolate import interp1d
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
df

estimator = FFTKDE(kernel='epa')
# Data=df and 1000 Bins to calculate the density over
density_grid = estimator.fit(df.values).evaluate(1000)
# To make density between parameters comparable, standardization with MinMax scaler can be used
standardized_density = MinMaxScaler().fit_transform(density_grid[1].reshape(-1,1))
grid_range = density_grid[0]

density_model = interp1d(
    x=grid_range, 
    y=standardized_density.ravel(), 
    bounds_error=False,
    fill_value=0)

print(f"range of the dataset is {df.min().value} - {df.max().value}")
print(f"now we can interpret values outside of the range like -10: {density_model(-10)}")

Что возвращает

range of the dataset is 18.001009818 - 164.947480513
now we can interpret values outside of the range like -10: 0.0

Но давайте по-настоящему! Давайте обнаружим некоторые аномалии на основе пространственной плотности данных. Обратите внимание, что здесь также используется объект визуализации adtk. Объект аномалий немного больше, чем DataFrame с одинаковым индексом и логическим индикатором для всех строк в наборе данных, указывающим, является ли он аномальным или нет.

from KDEpy import FFTKDE
import pandas as pd
from scipy.interpolate import interp1d
from sklearn.preprocessing import MinMaxScaler
from adtk.visualization import plot

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
df

estimator = FFTKDE(kernel='epa')
# Data=df and 1000 Bins to calculate the density over
density_grid = estimator.fit(df.values).evaluate(1000)
# To make density between parameters comparable, standardization with MinMax scaler can be used
standardized_density = 1 - MinMaxScaler().fit_transform(density_grid[1].reshape(-1,1))
grid_range = density_grid[0]

density_model = interp1d(
    x=grid_range, 
    y=standardized_density.ravel(), 
    bounds_error=False,
    fill_value=0)

anomalies = pd.DataFrame(index=df.index, data=density_model(df) > 0.99)
plot(df, anomaly=anomalies, anomaly_color="red", anomaly_tag="marker")

Реальное преимущество этого подхода, помимо того, что он очень хорошо фиксирует аномалии, заключается в том, что при обертывании его scipy.interpolate.interp1d эта концепция плотности может быть легко применена к потоковым данным и имеет простую интерпретацию. В целях мониторинга это также можно вывести для каждого значения, чтобы отслеживать аномалии потока данных:

from KDEpy import FFTKDE
import pandas as pd
from scipy.interpolate import interp1d
from sklearn.preprocessing import MinMaxScaler
from adtk.visualization import plot

# Get some time series data
df = pd.read_csv("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/art_daily_jumpsup.csv")
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
df

estimator = FFTKDE(kernel='epa')
# Data=df and 1000 Bins to calculate the density over
density_grid = estimator.fit(df.values).evaluate(1000)
# To make density between parameters comparable, standardization with MinMax scaler can be used
standardized_density = 1 - MinMaxScaler().fit_transform(density_grid[1].reshape(-1,1))
grid_range = density_grid[0]

density_model = interp1d(
    x=grid_range, 
    y=standardized_density.ravel(), 
    bounds_error=False,
    fill_value=0)

df["anomality"] = density_model(df)
df.plot(subplots=True, figsize=(16,10))

Заключение

Временные ряды Одномерное обнаружение аномалий является важным аспектом анализа отдельных показателей во времени. Методы сравнения на основе правил, дисперсии и окна обеспечивают эффективные средства выявления аномалий в одномерных данных временных рядов. Библиотека ADTK в Python предлагает удобные функции для реализации этих методов. Понимание сезонности и использование таких методов, как преобразование Фурье, может дать ценную информацию о циклическом поведении. Также пространственные подходы, такие как использование оценки плотности с KDE, могут использоваться для понимания аномальности точки данных. Хотя эта статья посвящена одномерному обнаружению аномалий во временных рядах, в будущих статьях можно будет изучить многомерные методы обнаружения аномалий, которые учитывают взаимодействие между несколькими метриками.

Источник: