Integration Patterns: Репликация и очередь сообщений

27 декабря 2013 г.

Хочу поделиться примером интеграции нескольких систем в одном из приложений, которое мы недавно разрабатывали. Я покажу проблемы и подходы, которые мы применяли. Думаю отдельные приёмы могут быть полезны в ваших проектах.

Суть системы в предоставлении аналитики пользователям из большого объема структурированной информации. Работать с аналитикой пользователи могут с помощью набора фильтров. Меняя фильтры, пользователь видит разлиные графики, таблицы и т.п. Данные нужны для анализа и принятия важных решений, поэтому должны быть точными и поступать своевременно.

Сложность была в том, чтобы достаточно быстро обрабатывать большие объемы данных. База данных проекта размером порядка 700 ГБ, среднее количество записей в основных таблицах около 100 млн.

Я намернно уберу многие детали, чтобы показать только суть проблем и решений.

Уточнять, что это за проект и кто заказчик я не могу по NDA.

Версия 1.0 - Репликация и предрасчеты

Первое решение, которое было реализовано:

Основные этапы работы версии 1.0:

  1. Из сторонней системы идет репликация определенных данных
  2. Триггеры перехватывают вставку, удаление и обновление данных, делают расчеты и заполняют денормализованные таблицы в БД проекта. Важно, что они ставят флаг измененным записям, это нужно для следующего шага. (Статья на эту тему Intergration Patterns: Shared DB, state machine и очередь сообщений)
  3. Сервисы предрасчетов раз в период времени делают выборку данных, которые отмечены, как измененные. Сервисы насчитают все возможные комбинации, которые может запросить пользователь с помощью фильтров. Насчитанные данные попадают в БД для выборок. На последнем этапе сервис сбрасывает флаг изменения у записей в проекте, т.к. они теперь обработаны.
  4. Веб-приложение реализует бизнес-логику и осуществляет работу пользователя с системой
  5. На последнем этапе пользовтаель взаимодействует с веб-приложением через браузер, где ему отображается набор возможных фильтров, графики, таблицы и другие данные. При изменении фильтров все данные перестраиваются

Проблемы версии 1.0:

  • Долго выполняется денормализация в триггерах. Расчет и вставка в денормализованные таблицы не успевали за поступающими через репликацию данными.
  • Выставление флага об изменении данных работало очень долго. Плюс сервисы, которые обрабатывали данные с этим флагом, должны были флаг сбрасывать, возникали взаимоблокировки.
  • С такой цепочкой обработки показать новые данные пользователям получается только со значительной задержкой.
  • Подготовка данных для новых пользователей занимала несколько суток вместо нескольких минут, как требовалось.
  • Невозможно было масштабировать на многих пользователей, т.к. узким местом были обработка триггерами и пересчет данных.

В таком варианте реализация заняла около года командой из 6 программистов. С этого момента проект передали нам.

Версия 2 - Очередь и масштабирование

Нужно было заменять архитектуру и учесть ошибки прошлой реализации. Главными требованиями были быстрая доставка новых данных каждому пользователю и легкое масштабирование системы.

Новая версия делалась короткими итерациями и небольшой командой. За первые 3 месяца было сделано примерно 10 различных прототипов, перепробованы различные идеи по ускорению и масштабированию. Все их перечислять будет слишком затратно, если будут предложения в комментариях и мы такое уже пробовали, то я объясню почему решение может не сработать. Наконец была найдена архитектура, которая привела к нужным характерискам.

Основные этапы работы версии 2.0:

  1. Из сторонней системы идет репликация определенных данных. В идеале нам нужно было заменить репликацию из сторонней БД на очередь, но продавить это решение в сторонней команде оказалось невозможно по политическим причинам.
  2. Изменение данных по-прежнему обрабатывается триггерами, но уже иначе. Триггеры отправляют изменения сразу в очередь через CLR-функции. Для очередей мы выбрали RabbitMQ, нас устроила его скорость работы (до 2500 сообщений в секунду) и надежность.
  3. Обработку новых данных осуществляли сервисы, которые были подписаны на очереди. В этом месте мы могли довольно просто масштабироваться. Плюс, обработка новых данных в управляемом коде сделала возможным извещать пользователей о новых данных в их отчетах через SignalR.
  4. Данные, обработанные сервисами, не считались заранее, а записывались в отдельные БД для каждого пользователя. В этих БД было всего несколько денормализованных таблиц, заточенных под быстрые выборки. Таким образом, пользователям не нужно было ждать пересчета, они могли видеть новые данные сразу при их поступлении. При работе с базами данных, их оптимизации и профилировании помог прекрасный блог SomewhereSomehow's SQL Server notes, рекомендую к прочтению. БД пользователей могли быть размещены на разных серверах, что позволяло безболезненно наращивать клиентскую базу.
  5. Дальше в веб-приложении формировались запросы и отсылался JSON на клиента. Также были проанализированы различные способы работы с данными на стороне веб-приложения. В частности мы перешли с NHibernate на Dapper.
  6. Часть данных на сторону клиента стали передаваться через SignalR, было уменьшено общее кол-во ajax-запросов из-за ограничения браузеров.

Такая реализация оказалась очень удачной для решаемой задачи, но в результате пришлось внести несколько существенных изменений.

Первоначальное заполнение БД

Отдельно обращу внимание на первоначальное заполнение БД пользователя. При создании нового пользователя для него нужно обработать много данных. Использовать для этой цели очередь нецелесообразно и долго, поэтому первоначальное заполнение было реализовано отдельным сервисом, который напрямую делал вставку в новую БД.

Версия 3 - Service Broker и оптимизация системы

При тестировании системы на pre-production серверах обнаружилась проблема, когда репликация переставала работать. Дело было в том, что CLR-функции довольно медленно отправляли данные из MSSQL в RabbitMQ. Частично это было из-за скорости работы CLR-функции, но в основом из-за того, что RabbitMQ был на отдельном сервере. Это уменьшало скорость отправки сообщений через сеть. В моменты, когда сеть между серверами отваливалась, у нас падала и репликация.

Решение было найдено с помощью вставки в архитектуру системы еще одного брокера очереди, на этот раз встроенного в MSSQL - Service Broker. Это позволило ускорить обработку входящих изменений, т.к. работа с Service Broker с того же СУБД была очень быстрой и надежной. Когда мы не могли читать данные, то очередь просто копиласть на сервере БД и работа всей системы не останавливалась.

Небольшие изменения были сделаны для оптимизации трафика между сервисами. Триггеры стали отправлять не только идентификаторы, но и измененные данные. Таким образом, последующие сервисы имели всю необходимую информацию и не делали дополнительные запросы обратно в БД проекта. В триггеры была добавлена проверка, что во время репликации изменились только значимые колонки, чтобы не было лишних срабатываний.

Выводы

Проект вышел в релиз через 8 месяцев с командой из 3 программистов и 3-5 человек в отделе тестирования.

Как мы решили в ретроспективе, успех второй версии был обусловлен большим количеством прототипов и анализом первого неудачного решения. Основные возможности по масштабированию нам дали очереди сообщений и разделение баз данных.

26 комментариев:

  1. Александр, привет!
    Я пока напишу вопросы по 1-ой версии, т.к. трудно понять ее особенности, чтобы осознать плюсы перехода к след. версиям вашего проекта:
    1. Какая БД используется в стороннеТип репликации из сторонней системы? Стандартный механизм

    ОтветитьУдалить
  2. Сторонная БД была MSSQL и репликация самая обычная.

    ОтветитьУдалить
  3. Александр, привет!

    Я пока напишу вопросы по 1-ой версии, т.к. трудно понять ее особенности, чтобы осознать плюсы перехода к след. версиям вашего проекта:
    1. Какая БД используется в сторонней системе и в проекте?
    2. Тип репликации между БД сторонней системы и БД проекта? Стандартный механизм или если свое, то в чем преимущества, какую проблему решил "свой" механизм?
    3. Касательно НСИ, есть ли она в проекте и есть ли особенности по ее денормализации и обновлению из сторонней системы?
    4. Выделена ли работа с НСИ? Это, например, необходимо, если бы у вас появился дополнительный сайт с иной бизнес-логикой, осуществяющий иную обработку, но пользующуюся основными общими справочниками.
    5. Необходимы ли результаты обработки передавать обратно в сторонюю систему? Может ли понадобиться в будущем?
    6. У вас есть кол-во срезов аналитики равно числу пользовательских фильтров. Что если заказчик захотел бы посадить системного аналитика, который бы строил дополнительный срезы, но без привлечения разработчиков (намек на OLAP-кубы). Это возможно?
    7. В чем особенность бизнес-логики приложения? Просто отображение read-only результатов или модификация данных в том числе?
    8. Что значит "перестраиваются" во фразе "При изменении фильтров все данные перестраиваются"?

    ОтветитьУдалить
  4. Спасибо за вопросы, система действительно большая и я уверен, что уточнения нужны:
    1. MSSQL с идентичной схемой
    2. Типичный механимз. Замены репликации не было, вы можете об этом прочитать и увидеть на схемах с первой по третью версию
    3.
    5. Нет, связь односторонняя
    6. Дело в том, что не все фильтры детерменированы (например, может быть введен произвольный текст, в в системе есть еще Sphinx, но он убран со схем для простоты). Один из прототипов использовал OLAP-кубы, но оказалось невозможно покрыть все варианты фильтров. Есть ограничиться определенным набором фильтров, то вполне можно использовать OLAP-кубы
    7. Дело в том в приложении не так много бизнес-логики, это как раз одна из самых простых частей системы. SPA на backbone.js было в несколько раз сложнее, чем приложение на ASP.NET MVC.
    8. В первой версии предрасчеты представляли из себя самодельные OLAP-кубы. Перестраиваются значит наборы данных пересчитывались.

    ОтветитьУдалить
  5. 1. раз схема одинакова, то БД ~600Гб - это мини-копия сторонней БД? А зачем она нужна?
    2. тип merge или transaction?
    3. нормативно-справочная информация. термин из КИС, ERP. По-сути, справочники, контрагенты.
    6. Не понял в чем ограничение OLAP-кубов? Если фильтры не детерминированы и строятся на основе запросов пользователей, то есть AMO http://msdn.microsoft.com/en-us/library/ms345091.aspx.
    7. т.е. MVC проект отражает аналитику для пользователя в зависимости от фильтров?

    Ну вот если бы этот проект подымал IBM, то в своей архитектуре для сторонней системы они бы писали вэб-сервис, который регистрировался бы в общей шине предприятия (без репликации). Соответственно инициация забора данных осуществлялась бы неким workflow, т.е. исключило бы сразу две проблемы, т.к. этим бы занимался разработанный адаптер в сторонней системе.

    "Долго выполняется денормализация в триггерах. Расчет и вставка в денормализованные таблицы не успевали за поступающими через репликацию данными.
    Выставление флага об изменении данных работало очень долго. Плюс сервисы, которые обрабатывали данные с этим флагом, должны были флаг сбрасывать, возникали взаимоблокировки".



    По идее денормализация - отдельная операция, которая может осуществлена как отдельный модуль в общей шине предприятия. Пример workflow общей шины: сторонняя ERP-адаптер -> шина -> модуль денормализации-адаптер > шина -> olap-куб-адаптер. В этой схеме появление и обновление НСИ - это ответственность модуля денормализации.

    ОтветитьУдалить
  6. 1. Это часть необходимых для проекта данных
    2. Точно не скажу, не мы настривали
    3. Ясно, я так и подумал, решил уточнить. Справоников почти нет.
    6. Точнее скажу, что невозможно выделить необходимые изменения для куба, чтобы могла насчитаться аналитика. Плюс перестроение куба при изменении данных занимает время, которое не соизмеримо с требованиями.
    7. Да


    Надо понимать, что невозможно было и сейчас невозможно заменить репликацию из сторонней БД на что-либо. Вы всё прочитали? Здесь суть в ограничении, изначально есть сторонняя БД и репликация, с этим ничего нельзя сделать. Отсюда часть решений с триггерами.

    ОтветитьУдалить
  7. прочитал про Версию 2.0. 1-ый вопрос снимается. :)

    ОтветитьУдалить
  8. Со сторонней БД работает еще множество других проектов и там много чего "исторически сложилось". Отчасти в этом проекте была борьба с legacy инфраструктурой и подгонка необходимой архитектуры для реализации нашего проекта.

    ОтветитьУдалить
  9. Да, я понял :)
    У меня осталась еще пара вопросов
    1. Если предрасчет и денормализация разные процессы, то на поздних версиях проекта куда делся предрасчет, т.е. собственная реализация кубов?
    2. Почему БД пользователя несколько и почему несколько сервисов денормализации?

    ОтветитьУдалить
  10. 6. Я все же не понимаю как была решена проблема перестроения куба. Вы по идее программным путем делаете то же самое, т.е. я хочу сказать, что в общем смысле массив данных как-то нужно "ворочить" в обоих случаях. Так чем же ваша реализация лучше?

    ОтветитьУдалить
  11. Очень хорошие вопросы :) То, что я упустил в расскаже сейчас всплывет здесь.
    1. Предрасчет был фактически OLAP-кубом. В последних версиях не было никаких предрасчетов, мы научились писать запросы и стоить индексы так, чтобы все запросы укладывались в 1-5 секунд. Поэтому при постоянном потоке новых данных пользователь видел самую актуальную картику в аналитике. Денормализация в пользовательских БД как раз строила такую структуру, чтобы из нее можно было быстро выбрано нужные ID, посчитать агрегатные функции и т.п.
    2. У каждого пользователя свои потребности, поэтому разбивая БД на пользователей мы уменьшаем общее кол-во записей для выборок. Во-вторых, премиум пользователи могут иметь свои мощности на сервере и не зависеть на нагрузок других пользователей.
    Сервисы денормализации - это я их так назвал для упрощения :) На самом деле они подписывались каждый на свое событие. Когда происходили опредленные изменения, сервис-обработчик должен был решить что с этим делать, подготовить данные для вставки/обновления данных в пользовательских БД, известить пользователей, которые сейчас подключены к системе о том, что часть данных обновлена (через SignalR).

    ОтветитьУдалить
  12. 6. Она не лучше, она просто другая. Мы ничего не предрасчитывали, а считали всё налету. С момента, когда данные изменились, до момента, когда пользователь им может пользоваться, могло пройти всего 5-10 секунд при нормальной загрузке.
    На схеме еще нет сторонних сервисов, включающих полнотектстовый поиск, я не хочу сейчас вдавать в подробности, но результаты поиска по сути нельзя было полностью предрасчитать.

    ОтветитьУдалить
  13. Политика заказчика состояла в покупке одного-двух мощных серверов и размещении их в дата-центрах. Поэтому мы ориентировались не на распределенную облачную инфраструктуру, а на конкретные ограничнные ресурсы. 1-е, 2-е и 3-е решения были запущены на идентичных серверах.
    Увеличение производительность произошло только за счет изменения архитектуры, без какого-либо наращивания можности железа.

    ОтветитьУдалить
  14. Можно подробнее про отдельную БД для каждого пользователя? Новая бд создается автоматически или вручную? Сервис публичный или закрытый?

    ОтветитьУдалить
  15. Это БД, которая состоит из нескольких денормализованных таблиц. Эти таблицы созданы с одной целью, самая быстрая выборка данных с помощью SQL-запросов.


    Пользователи появляются не часто, сервис не публичный, для доступа к нему нужно изначально заплатить. Это делают только те, кто знают продукт и компанию. Называть его не могу по NDA.


    Для БД пользователя есть скрипт, который ее создает. Ручное создание заключается в запуске скрипта, дальше всё идет автоматом. Нет задачи сделать полностью автоматическое создание, иначе это было бы реализовано, сложностей нет.

    ОтветитьУдалить
  16. Спасибо, просто интересно - насколько накладно будут отдельные базы при тысячах или десяти тысячах пользователей. И как происходит обновление схемы данных на всех базах?

    ОтветитьУдалить
  17. Александр, расскажи, пожалуйста, чуть подробнее про создание новой пользовательской БД.

    Как надёжно подключить эту новую базу к работе, чтобы сообщения для неё:
    а) не потерялись
    б) не обрабатывались повторно

    По всей видимости, первоначальная заливка выполняется долго.
    Можно (наверное) использовать Snaphot изоляцию для чтения данных из основной БД проекта, но что в это время происходит с текущими событиями?
    Сервис денормализации копирует событие в какую-то специальную очередь для данной БД, и когда БД наконец поднимется, то сервис достанет события из этой очереди и обработает?

    Вопрос 2. Требуется приостановить пользовательскую БД. Решается этим же подходом с выделенной очередью для БД?

    Вопрос 3. Сервисы денормализации были приостановлены или перегружены и в очереди (шине данных) скопилось много старых событий. Эти события не должны применяться к новой БД, поскольку в неё залита более свежая информация. Как это решается? Какими-то временнЫми метками?

    Вопрос 4. Что такое "Сервис обработки событий изменений" на диаграмме версии 3.0? Его задача перекладывать события из формата Service Broker в шину сообщений RabbitMQ?

    Не подскажешь, что почитать на тему очередей? Не обязательно про конкретную реализацию (MSMQ, RabbitMQ и т.п.), можно на базовом уровне - какие-то общие принципы, подходы, шаблоны.

    Спасибо.

    ОтветитьУдалить
  18. Продукт на рынке B2B. Его заказчиком может стать только большая корпорация или гос. структура. Ориентировочная база клиентов 300-400 компаний/учреждений. Исходя из этого и делалось такое решение.


    Обновление происходит через FluentMigrator, который дописали для работы с несколькими одинаковыми БД. Все БД названы по конвенции, зная UserId, который лежит в основной БД, можно легко найти БД пользователя.

    ОтветитьУдалить
  19. > Как надёжно подключить эту новую базу к работе

    БД создается под пользователя с нужной ему аналитикой. Процесс вставки данных для начальной работы и поток сообщений через очередь никак не могут друг другу помешать, т.к. пишут данные они в одни и те же таблицы. Новая БД создается и заполняется примерно 5-10 минут. Сообщения, которые сформируются за это время могут ее только дополнить.

    > Сервис денормализации копирует событие в какую-то специальную очередь для данной БД, и когда БД наконец поднимется, то сервис достанет события из этой очереди и обработает?

    Несовсем так. Создается пустая БД со схемой. В БД проекта пользователю в соответствие ставит аналитика по определенным темам, в это время запускается заполнение БД пользователя через сервис (минуя очередь) и события в очереди могут идти паралелльно.

    > Требуется приостановить пользовательскую БД. Решается этим же подходом с выделенной очередью для БД?

    Можно подробнее про этот сценарий? Зачем ее приостанавливать?

    > Вопрос 3. Сервисы денормализации были приостановлены или перегружены и в очереди...

    На самом деле в шине находятся Event'ы, которые должны последовательно накатиться на пользовательскую БД. Бывало, что очереди забивались по каким-то причинам, тогда мы просто ждали 1-2 часа, пока сервисы разгребут эти очереди.

    Я думаю в критичных ситуация можно просто очищать очереди, для каждого пользователя перезапускать процедуру инициализации, которая приведет пользовательскую БД к актуальному состоянию.

    > Что такое "Сервис обработки событий изменений" на диаграмме версии 3.0?

    Фактически он читает из шины СУБД и перераспределяет события по очередям. Например, при обработке определенных сущностей они должны попасть в более приоритетные очереди RabbitMQ, это решает как раз этот сервис.

    > Не подскажешь, что почитать на тему очередей?

    Мне очень понравилась книга Enterprise Integration Patterns и сайт http://www.eaipatterns.com. Сама книга была довольно давно выпущена (по меркам развития IT), но общие подходы и шаблоны там актуальны. В том же RabbitMQ реализовано множество шаблонов, описанных в этой книге.

    ОтветитьУдалить
  20. >Процесс вставки данных для начальной работы и поток сообщений через очередь никак не могут друг другу помешать, т.к. пишут данные они в одни и те же таблицы.

    Я полагал как раз наоборот - два различных процесса не могли бы друг другу помешать, если бы писали в разные, никак не связанные между собой таблицы. А так надо предпринимать какие-то меры, иначе могут быть проблемы - от непонятной актуальности данных и вплоть до взаимоблокировок. Разве нет?

    >Можно подробнее про этот сценарий? Зачем ее приостанавливать?

    Какое-нибудь обслуживание, обновления просят перезагрузиться, аппаратные проблемы... Неважно, главное что база временно недоступна.

    >На самом деле в шине находятся Event'ы, которые должны последовательно накатиться на пользовательскую БД

    Если работают несколько сервисов, то как гарантировать, что события будут накатываться последовательно и в нужном порядке? Одну базу может обслуживать только один сервис?

    >Мне очень понравилась книга Enterprise Integration Patterns

    Спасибо!

    ОтветитьУдалить
  21. > Я полагал как раз наоборот - два различных процесса не могли бы друг другу помешать

    Нет, представьте себе два потока событий с отметкой о версии. Если версия события устарела, то она просто будет проигнорирована.

    > Неважно, главное что база временно недоступна

    Сообщения для нее будут копиться в очереди

    > Если работают несколько сервисов, то как гарантировать, что события будут накатываться последовательно и в нужном порядке?



    Да, это хороший вопрос. Дело в том, что мы знаем про версию события, когда оно произошло. Единсвенная возможная ситуация, если что-то сломается в репликации, потому что ориентация идет только на время изменения записи в БД проекта.


    В идеале у нас должна быть шина из сторонней системы, где бы события были уже упорядочены. В нашем случае мы упорядочиваем их сами.

    ОтветитьУдалить
  22. А по какой причине в варианте 3 сохранили шину RabbitMQ? Почему не хватило очереди на ServiceBroker-е?
    Теоретически полностью sql-ными (точнее sb-шными) механизмами можно было переносить данные на конечные БД пользователей, раз и источник события, и приемник - это БД.

    ОтветитьУдалить
  23. Дарья, уже где-то было в комментариях, напишу еще раз :) На схеме есть квадратик "Сервис обработки событий изменений". Там на самом деле ряд сервисов с логикой. Раньше вся эта логика была в треггерах, теперь она управляемом коде. БД шлет в шину всё подряд, но не все данные нужны и не все данные должны попасть в сервисы в "сыром" виде. Плюс этот квадратик распределяет поток данных по приоритетным очередям в зависимости от их значимости.

    ОтветитьУдалить
  24. Понятно.
    Приоритетность очередей действительно на SB реализовать довольно проблематично (хотя реализуемо), а вот "управляемый код" и разбор данных мог бы быть и на обработчике сообщения SB, разве что количество SB-очередей могло бы вырасти в результате.
    В нашей системе широко используется и SB, и RabbitMQ (только при "внешних" источниках событий), поэтому было очень интересно прочитать про ваш опыт.

    ОтветитьУдалить
  25. Это интересно, вы где-то описывали ваш подход?

    ОтветитьУдалить
  26. Боюсь, что только во внутренней документации :)

    ОтветитьУдалить

Моя книга «Антихрупкость в IT»

Как достигать результатов в IT-проектах в условиях неопределённости. Подробнее...