Продолжение «WebSockets and AsyncIO: Beyond 5-line Samples»

В этом продолжении предыдущей статьи я обсуждаю дизайн, библиотеки и обеспечение качества в области WebSockets, асинхронного программирования и параллелизма с Python.

В первой статье мы рассмотрели технологию WebSockets и ее соответствие Python. Я также предложил вымышленный вариант использования, чтобы добавить больше контекста в обсуждение.

Теперь пора переключить внимание на AsyncIO, встроенную библиотеку для написания параллельного кода с использованием синтаксиса async / await. (Я рекомендую взглянуть на это руководство, если вы не знакомы с ним.) Наряду с websockets, asyncio позволит нам выполнить требования предлагаемого варианта использования.

Полный пример есть на GitHub; document_inspector.py - это главный файл. Объяснять каждую из более чем 200 строк кода было бы скучно, поэтому давайте сосредоточимся на ключевых функциях.

Обработка параллелизма

Я закрыл предыдущую статью, сказав, что параллельные задачи могут использоваться для обработки сообщений и ответов в каналах связи WebSocket. Если перевести это на язык программирования, нам сначала понадобятся задачи отправителя и получателя сообщения, представленные в виде функций. Функции - это первоклассные объекты в Python, что означает, что они могут быть переданы в качестве аргументов другим функциям.

AsyncIO поставляется с ожидаемой функцией asyncio.gather(). Он используется для запуска параллельных функций в заданной последовательности, как показано в приведенном ниже фрагменте кода.

Ожидается, что список results будет состоять из двух элементов. Первый хранит результат отправителя сообщения и может быть проигнорирован. Второй хранит результат получателя, что означает, что объект будет возвращен при успешном выполнении.

В вымышленном варианте использования есть еще один сценарий, в котором мы можем использовать asyncio.gather(): после получения ответа GetDocument (шаг 1) нам нужно отправить n сообщений, чтобы получить подробную информацию о контейнере виджета, в зависимости от количества графических виджетов, имеющихся в данном документе (шаг 2). Такие сообщения можно отправлять одновременно, чтобы сократить время проверки документа, а именно:

Как мы видели, asyncio упрощает выполнение параллельных функций и беспрепятственное получение их результирующих значений. Также есть поддержка обработки исключений. (За подробностями обращайтесь к официальной документации.)

Проблема синхронизации задач

Теперь, когда мы знаем, как выполнять задачи отправителя и получателя одновременно, пришло время заняться другой проблемой: синхронизацией. В предлагаемом варианте использования начальный шаг 2 зависит от успеха на шаге 1, а начальный шаг 3 зависит как минимум от одного ответа с шага 2. Это происходит потому, что создание сообщений, которые будут отправлены на следующих шагах, зависит от данных, собранных из предыдущих ответов. . Взгляните на этот syncio.Event подход к решению этой проблемы.

«События AsyncIO могут использоваться для уведомления нескольких задач о том, что произошло какое-то событие. ”

- Документация по AsyncIO Event

Из-за характеристик рабочей нагрузки примера отправитель запускает рабочую нагрузку и может спать до тех пор, пока не появится какой-либо ответ для обработки. Затем он просыпается, обрабатывает ответ, который включает отправку последующих сообщений, если это необходимо, и снова засыпает, пока не станут доступны новые ответы. Но кто знает, что только что пришли новые ответы? Получатель!

Это означает, что обе задачи должны иметь доступ к общему объекту asyncio.Event. Класс SendReceiveSyncHelper был создан для поддержки элементов синхронизации задач. Пожалуйста, обратитесь к атрибуту __new_reply_event и связанным методам для asyncio.Event подробностей об использовании, которые, кстати, очень просты.

К сожалению, одной генерации события недостаточно. Нам также нужен буфер, который позволяет получателю делиться полученными объектами с отправителем. Это работа атрибута __unhandled_replies. В двух словах, получатель заполняет буфер, а отправитель очищает его после обработки входящих ответов.

Особого внимания заслуживает третий атрибут: __pending_reply_ids. Он состоит из массива, который содержит идентификаторы ожидающих ответов, что означает, что идентифицированные ими сообщения были отправлены, но еще не получили ответа.

На диаграммах ниже показано общее использование общих переменных. Начиная с шагов 1 и 2:

Затем шаги 2 и 3:

Переменные __pending_reply_ids и __unhandled_replies помогают принять решение, следует ли держать WebSocket открытым или нет. Если нет ожидающих или необработанных ответов, рабочая нагрузка проверки считается выполненной для данного документа, и соединение может быть закрыто.

Вы можете увидеть код, использованный для достижения этих результатов, в методах __send_get_widgets_messages() и __receive_get_widgets_messages() полного примера.

Прежде чем закрыть этот раздел, стоит сказать, что это решение можно адаптировать / повторно использовать для решения проблем синхронизации задач в различных сценариях использования, а не только в задачах WebSockets, с которыми столкнулась моя команда.

Обработка ошибок

Параллельные задачи будут корректно завершены при успешном выполнении. Но мы знаем, что сбои случаются всегда, и лучше быть готовым к правильным действиям, когда они случаются, чтобы не нарушить работу приложения.

Тайм-аут

Как я уже упоминал, рабочая нагрузка по проверке считается выполненной для данного документа, если нет ожидающих или необработанных ответов. Это может занять три или 20 секунд, или намного больше, если ожидаемый ответ так и не получен, что блокирует всю рабочую нагрузку.

С помощью asyncio.wait_for() можно указать, как долго они хотят ждать завершения задач. Отменяет задачу и вызывает asyncio.TimeoutError ошибку при истечении времени ожидания. Вы можете видеть, что __hold_websocket_communication() заключен в asyncio.wait_for() в приведенном примере:

Отмена задачи

Вызов __get_widgets(), в свою очередь, заключен в asyncio.AbstractEventLoop.run_until_complete(). Мы увидим больше о циклах событий в разделе Синхронный и асинхронный ниже. А пока помните, что вызов asyncio.AbstractEventLoop.run_until_complete() заключен в блок try/except, и это позволяет нам отменить все задачи при любом исключении, возникшем во время выполнения рабочей нагрузки.

Отменяя задачи, мы обеспечиваем высвобождение их ресурсов.

Как мы видели выше, asyncio.wait_for() отменяет задачу при истечении времени ожидания, но могут возникать исключения разных типов. Так что наличие кода для явной отмены задач по-прежнему того стоит. В приведенном ниже фрагменте кода показано, как это сделать:

Войти и вернуться

Пора поговорить о дизайнерских решениях. В классе DocumentInspector есть только один открытый метод: get_widgets(). Он скрывает всю сложность, которую мы видели до сих пор, предоставляя пользователям упрощенный интерфейс.

Что касается обработки исключений, это единственное место, где регистрируется техническая информация, поэтому вывод программы остается чистым и лаконичным. Отслеживание отказов может быть легко выполнено на основе таких журналов.

Наконец, пустой список возвращается, если возникает исключение при получении данных виджетов данного документа. В случае фиктивного варианта использования предположим, что это решение было принято для предотвращения поломки всего приложения из-за ошибок в нескольких различных рабочих нагрузках, которые выполнялись при каждом выполнении. При наличии подробных журналов случайные ошибки могут быть допустимы. Если это не соответствует вашим потребностям, подумайте о том, чтобы повторно вызвать исключение и обработать его в восходящем направлении.

Синхронный против асинхронного

Вы, наверное, заметили, что зарезервированные слова async и await чаще использовались в первых фрагментах кода, чем в последних. Это произошло потому, что я использовал восходящий подход для объяснения asyncio функций. Под этим я подразумеваю, что начал с DocumentInspector внутреннего / параллельного материала, который является более сложным, и закончил его общедоступным get_widgets() методом, который проще. Кстати, метод синхронный, а не сопрограмма.

Связующим звеном между синхронным и асинхронным кодом является метод asyncio.AbstractEventLoop.run_until_complete(). Он возвращает результат данной сопрограммы или вызывает исключение.

«Цикл событий - это ядро ​​каждого приложения AsyncIO».

- Документация по AsyncIO EventLoop

В полном примере я назвал его внутри __run_until_complete(), последнего метода, который мы обсудим в этой статье.

Обратите внимание, что объект цикла событий хранится в переменной. Это позволяет нам передать его __cancel_all_tasks() - отменяя параллельные задачи в случае ошибок, как объяснялось ранее, - и должным образом закрыть цикл обработки событий после завершения рабочей нагрузки проверки документов.

Теперь взгляните на future arg: он преобразуется в сопрограмму, созданную asyncio.wait_for(). Во время выполнения это оболочка для задач отправителя и получателя. Таким образом, цикл событий содержит все элементы, необходимые для выполнения рабочей нагрузки, и предоставляет get_widgets() соответствующий объект, который должен быть возвращен в конце.

Замечание: Python 3.7 временно представил asyncio.run() как высокоуровневый подход к asyncio.AbstractEventLoop.run_until_complete(). Он выполняет все, что я описал в этом разделе, а также Отмена задачи. Хотя он объявлен как предварительный API, его подпись и поведение с тех пор не изменились (см. Исходный код в 3.7, 3.8 и 3.9 для справки). Стоит рассмотреть asyncio.run(), если ваш проект не требует Python ‹3.7.

Итак, основные инструменты, необходимые для реализации вымышленного варианта использования, представленного в предыдущей статье, уже готовы. На этом мы заканчиваем «оперативные» обсуждения.

Единичные тесты

В этом последнем разделе я кратко рассмотрю вопросы, связанные с модульными тестами. Это больше об извлеченных уроках и будущих улучшениях, чем о деталях написания таких тестов.

Причуды версии Python

AsyncIO включен в стандартную библиотеку начиная с Python 3.4. Сопрограммы с синтаксисом async и await были добавлены в 3.5. Но встроенный модуль unittest стал asyncio-функциональным только в 3.8.

Для запуска кода, над которым моя команда работала, требуется поддержка Python 3.6+, а это значит, что для тестирования асинхронного кода в версиях 3.6 и 3.7 потребовались обходные пути. Мы использовали поучительную публикацию в блоге Стратегии тестирования асинхронного кода в Python в качестве справочного материала для выполнения работы.

Мы ожидаем, что это будет временно, и обновляем модульные тесты, когда есть договоренность о том, что для запуска кода требуется Python ≥ 3.8 - обходные пути несовместимы с 3.8. Таким образом, вместо того, чтобы повторять эталонный пост в оставшейся части этого раздела, я собираюсь выделить два недавних встроенных улучшения, которые значительно упростили модульное тестирование асинхронного кода с версии 3.8.

AsyncMock

В Python 3.8 добавлен AsyncMock для поддержки асинхронной версии Mock. Объект AsyncMock будет вести себя так, чтобы объект распознавался как асинхронная функция, а результат вызова был ожидаемым. Также были добавлены соответствующие новые функции утверждения для тестирования, например, assert_awaited(), assert_awaited_with() и await_count.

Тем не менее, написание модульных тестов для асинхронных функций теперь очень похоже на то, что разработчики Python всегда делали при тестировании синхронного кода. Например:

Менеджер контекста и методы итерации

MagicMock (вы правильно прочитали, я больше не говорю о AsyncMock) был улучшен. Магические методы теперь поддерживают __aenter__, __aexit__, __aiter__ и __anext__; следовательно, асинхронные диспетчеры контекста и итерации можно легко имитировать:

Вышеупомянутый фиктивный объект затем можно использовать для тестирования асинхронных циклов, таких как async for message in websocket:, который выполняет итерацию по сообщениям, полученным клиентом WebSocket в примере кода.

Если вы хотите увидеть больше примеров модульных тестов, включая предложения по исправлению асинхронных функций, обратитесь к сопутствующему репозиторию на GitHub. Я оставил там несколько тестов.

Последние мысли

Я уже рассказывал о WebSockets, и об AsyncIO хватило. Я считаю, что еще многое предстоит узнать о них, но я надеюсь, что у меня было достаточно ясности, чтобы поднять то, что я узнал до сих пор, с помощью моих товарищей по команде.

Дизайн и стратегии, обсуждаемые в данной статье, в настоящее время используются в производственном коде. Они помогли нам создать простое в обслуживании модульное решение, которое способствует повторному использованию и скрывает сложность от клиентов.

Написание - это обмен, поэтому я надеюсь, что представленные здесь знания помогут другим командам преуспеть в решении проблем, связанных с WebSockets и AsyncIO. Не стесняйтесь комментировать, отзывы всегда приветствуются.

Всего наилучшего!

использованная литература