В анализе данных есть несколько направлений для поиска аномалий: детектирование выбросов (Outlier Detection) и «новизны» (Novelty Detection).
Но перед тем, как рассматривать поиск аномалий, необходимо рассмотреть независимое исключение весов. Переход к групповому случаю не представляет существенных сложностей. Следуя стандартной схеме ARD, пусть априорное распределение на веса будет нормальным с центром в нуле p(w) =N(0,α^2), апостериорное — нормальным с произвольными параметрами qφ(w)=N(μ,σ^2), где μ ∈ Rm, α, σ∈Rm×mиα,σ — диагональные матрицы. Далее для упрощения выкладок считается, что σi=σii. KL-дивергенция для отдельного веса wi будет выглядеть так:

Можно исключить веса α, выразив их максимальные значения через μ и σ:

Тогда первую формулу можно упростить:

и итоговая формула примет вид … и тут вы такие «Чиго?! Слышь, Масян, воу, воу, палехче, мы тут пост пришли читать, чо началось-то?». ;)
Чуть меньше года назад когнитивные сервисы Microsoft пополнились новым сервисом — Anomaly Detector. Если простыми словами, то это обычный REST API сервис, который позволяет отслеживать и обнаруживать отклонения в данных временных рядов с помощью машинного обучения. Вы отправляете ему JSON обект, а он скажет, что из этих данных выбивается из общих значений. Данные можно отправлять, как в реалтайме (real-time) или в потоковой передаче, так и просто на проверку целой пачкой (batch).
Сразу, наше любимое, 20 000 запросов в месяц — бесплатно. Мы же любим, когда что-то бесплатно. ;)
Так, это всё классно, но какие данные мы можем туда отправить? Первое, что приходит в голову и каких данных точно в избытке, то это данные системы мониторинга. Почему бы и нет? У нас есть постоянно наполняемая база счетчиков производительности, есть метрики изменений CPU, RAM, объема диска и т.д. К сожалению, у меня нет никаких IoT-устройств, где бы можно было отследить температуру, влажность или другие показания. Как мне кажется, то это один из наглядных примеров для использования Anomaly Detector.
Anomaly Detector можно найти в Marketplace на портале Azure, там ничего сложного. Нашли, создали, развернули.

Данные будем брать из системы мониторинга SCOM (System Center Operations Manager) за последний день обычным SQL-запросом.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
select TimeSampled as timestamp, SampleValue as value from PerformanceDataAllView pdv with (NOLOCK) inner join PerformanceCounterView pcv on pdv.performancesourceinternalid = pcv.performancesourceinternalid inner join BaseManagedEntity bme on pcv.ManagedEntityId = bme.BaseManagedEntityId where path = 'fqdn вашего сервера' AND objectname = 'Processor Information' AND countername = '% Processor Time' and TimeSampled >= DATEADD(day, -1, GETDATE()) group by path, TimeSampled, SampleValue order by timesampled asc |

Ок, что-то есть, какие-то метрики мы видим, но т.к. нам необходимо немного подготовить данные для Anomaly Detector, надо их чуть-чуть преобразовать. Мне удобней это делать через python. Для этого подключимся к базе сразу через python и соберем все в словаре, который представим в виде JSON. pymssql даёт возможность выполнять sql запрос, а точнее метод fetchall() и получать на выходе словарь (dict).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import configparser import pymssql import json config = configparser.ConfigParser() config.read("config.ini") scom_server = config["scom"]["server"] scom_user = config["scom"]["user"] scom_password = config["scom"]["password"] conn = pymssql.connect(scom_server, scom_user, scom_password, 'OperationsManager') cursor = conn.cursor(as_dict=True) cursor.execute( """ select TimeSampled as timestamp, SampleValue as value from PerformanceDataAllView pdv with (NOLOCK) inner join PerformanceCounterView pcv on pdv.performancesourceinternalid = pcv.performancesourceinternalid inner join BaseManagedEntity bme on pcv.ManagedEntityId = bme.BaseManagedEntityId where path = 'srv-scom-01.rambler.ramblermedia.com' AND objectname = 'Processor Information' AND countername = '% Processor Time' and TimeSampled >= DATEADD(day, -1, GETDATE()) group by path, TimeSampled, SampleValue order by timesampled asc """) row = cursor.fetchall() for item in row: item["timestamp"] = item["timestamp"].strftime("%Y-%m-%dT%H:%MZ") results = {"series": row, "granularity": "minutely", "customInterval": 5} with open('testdata.json', 'w') as file: json.dump(results, fp=file) |

Время и данные необходимо поместить в timestamp и value в значение ключа series. На выходе получаем файл testdata.json примерно с таким набором {«series»: [{«timestamp»: «2019-10-20T22:02Z», «value»: 6.00938940048218}, {«timestamp»: «2019-10-20T22:07Z», «value»: 5.63296794891357}, {«timestamp»: «2019-10-20T22:37Z», «value»: 5.63296794891357}, {«timestamp»: «2019-10-20T22:42Z», «value»: 11.3306369781494} ….
Теперь надо написать обращение к REST API Anomaly Detector.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
import requests import json batch_detection_url = "/anomalydetector/v1.0/timeseries/entire/detect" latest_point_detection_url = "/anomalydetector/v1.0/timeseries/last/detect" # регион в котором вы создали Anomaly Detector endpoint = 'https://westeurope.api.cognitive.microsoft.com' # ключ subscription_key = '########################' #data_location = "sample.json" data_location = "testdata.json" def send_request(endpoint, url, subscription_key, request_data): headers = {'Content-Type': 'application/json', 'Ocp-Apim-Subscription-Key': subscription_key} response = requests.post(endpoint + url, data=json.dumps(request_data), headers=headers) return json.loads(response.content.decode("utf-8")) def detect_batch(request_data): print("Detecting anomalies as a batch") result = send_request(endpoint, batch_detection_url, subscription_key, request_data) print(json.dumps(result, indent=4)) if result.get('code') is not None: print("Detection failed. ErrorCode:{}, ErrorMessage:{}".format(result['code'], result['message'])) else: anomalies = result["isAnomaly"] print("Anomalies detected in the following data positions:") for x in range(len(anomalies)): if anomalies[x]: print(x, request_data['series'][x]['value']) def detect_latest(request_data): print("Determining if latest data point is an anomaly") result = send_request(endpoint, latest_point_detection_url, subscription_key, request_data) print(json.dumps(result, indent=4)) file_handler = open(data_location) json_data = json.load(file_handler) detect_batch(json_data) detect_latest(json_data) |
subscription_key необходимо подставить из Keys на портале Azure в сервисе Anomaly Detector.

Можно сказать, что цель достигнута…. почти. ;) У меня было два типа данных: с явно выраженной аномалией или, говоря другими словами, скачком по загрузке CPU и «ровные» данные без каких-либо скачков.

С левой стороны аномалий нет, а вот с правой стороны Anomaly Detector посчитал, что индексы 106, 234 и 292 со значениями 20, 19 и 31 явно выбиваются из общих показателей. Чтобы данные сделать более наглядными, надо расчехлить pyplot из python и порисовать графики. ;)
Для этого у Microsoft есть Microsoft Azure Notebooks, где можно запусть Jupyter. Кстати, тоже бесплатный. Наша задача не только построить график полученных данных, но и выделить точки, которые Anomaly Detector считает аномальными.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
subscription_key = '##################################################' endpoint = 'https://westeurope.api.cognitive.microsoft.com/anomalydetector/v1.0/timeseries/entire/detect' import requests import json import pandas as pd import numpy as np from __future__ import print_function import warnings warnings.filterwarnings('ignore') # Import library to display results import matplotlib.pyplot as plt %matplotlib inline from bokeh.plotting import figure,output_notebook, show from bokeh.palettes import Blues4 from bokeh.models import ColumnDataSource,Slider import datetime from bokeh.io import push_notebook from dateutil import parser from ipywidgets import interact, widgets, fixed output_notebook() def detect(endpoint, subscription_key, request_data): headers = {'Content-Type': 'application/json', 'Ocp-Apim-Subscription-Key': subscription_key} response = requests.post(endpoint, data=json.dumps(request_data), headers=headers) if response.status_code == 200: return json.loads(response.content.decode("utf-8")) else: print(response.status_code) raise Exception(response.text) def build_figure(sample_data, sensitivity): sample_data['sensitivity'] = sensitivity result = detect(endpoint, subscription_key, sample_data) columns = {'expectedValues': result['expectedValues'], 'isAnomaly': result['isAnomaly'], 'isNegativeAnomaly': result['isNegativeAnomaly'], 'isPositiveAnomaly': result['isPositiveAnomaly'], 'upperMargins': result['upperMargins'], 'lowerMargins': result['lowerMargins'], 'timestamp': [parser.parse(x['timestamp']) for x in sample_data['series']], 'value': [x['value'] for x in sample_data['series']]} response = pd.DataFrame(data=columns) values = response['value'] label = response['timestamp'] anomalies = [] anomaly_labels = [] index = 0 anomaly_indexes = [] p = figure(x_axis_type='datetime', title="Batch Anomaly Detection ({0} Sensitvity)".format(sensitivity), width=800, height=600) for anom in response['isAnomaly']: if anom == True and (values[index] > response.iloc[index]['expectedValues'] + response.iloc[index]['upperMargins'] or values[index] < response.iloc[index]['expectedValues'] - response.iloc[index]['lowerMargins']): anomalies.append(values[index]) anomaly_labels.append(label[index]) anomaly_indexes.append(index) index = index+1 upperband = response['expectedValues'] + response['upperMargins'] lowerband = response['expectedValues'] -response['lowerMargins'] band_x = np.append(label, label[::-1]) band_y = np.append(lowerband, upperband[::-1]) boundary = p.patch(band_x, band_y, color=Blues4[2], fill_alpha=0.5, line_width=1, legend='Boundary') p.line(label, values, legend='Value', color="#2222aa", line_width=1) p.line(label, response['expectedValues'], legend='ExpectedValue', line_width=1, line_dash="dotdash", line_color='olivedrab') anom_source = ColumnDataSource(dict(x=anomaly_labels, y=anomalies)) anoms = p.circle('x', 'y', size=5, color='tomato', source=anom_source) p.legend.border_line_width = 1 p.legend.background_fill_alpha = 0.1 show(p, notebook_handle=True) # Hourly Sample sample_data = json.load(open('sample_hourly.json')) sample_data['granularity'] = 'minutely' sample_data['period'] = 1 # 95 sensitivity build_figure(sample_data, 95) |

И …. получаем такой график. Как видим, что вот наши 3 значения. Но я решил пойти дальше и посмотреть, что покажет Anomaly Detector, если мы немного нагрузим CPU … и получился уже вот такой график.

Прямоугольная секция — это данные с предыдущего графика, это наши предыдущие 3 точки, Anomaly Detector их уже не учитывал, а вот резкий скачок, как видим, отметил в 3-х значениях: начало подъёма, пик и среднее значение, которое всё еще выбивается из среднего набора данных.
Я думаю, что это хороший пример, когда для критического сервиса можно подключить такую аналитику. Почему я вспомнил про Anomaly Detector? На прошедшем Microsoft Azure AI Hackathon один из победителей сделал IoT устройство, которое собирало данные с термодатчика внутри жизненно важной среды, отправляло их в Azure IoT-Hub, раз в минуту запускался код в Azure Functions, который собирал данные в Anomaly Detector и если температура начинала расти, то отправлял уведомление через Twilio API в Whatsapp или по SMS. До ужаса простое решение, но работает как часы.
Можно написать бота, которые будет генерировать вот такие картинки с графиками, где уже будет наглядно видно, что что-то начинает идти не так и прикреплять вам в сообщение с данными от мониторинга, разных датчиков и т.д.
Документация по Anomaly Detector — https://docs.microsoft.com/en-us/azure/cognitive-services/anomaly-detector/overview
Ну и наш любимый раздел — «слышь, а чо по деньгам?!» ж)
