Django — рабочие процессы для реальных финансовых приложений
Здравствуйте, этот пост будет посвящен некоторым вопросам инфраструктуры, таким как какую базу данных использовать, как автоматизировать получение данных и т. д. Вы можете прочитать его как введение в финансовые приложения с django.
Прежде всего, вы должны определить план проекта. Спроси себя:
- Какие данные должны быть статическими?
- Какие данные следует часто обновлять? Как часто мне нужны новые данные?
Для большинства веб-приложений, связанных с финтехом, нам нужно ОЧЕНЬ сосредоточиться на том, как обрабатываются данные, когда и как.
В качестве примера; Сравните веб-сайт, такой как TradingView (который не был создан с помощью django, но в любом случае является приложением, критически важным для данных), с простым блогом CRUD. CRUD ссылается на операции «Создание, чтение, обновление, удаление» и состоит из большей части веб-приложения.
Они различаются по нескольким параметрам:
- Исследовательское приложение - › Может использовать передовые технологии для представления данных в реальном времени, например. по цене живой акции
- Исследовательское приложение -> Некоторые данные необходимо периодически извлекать, и они не являются живыми, например. скажем, вы хотите получать 5-минутный ohlcv каждые 5 минут
- Исследовательское приложение -> Данные, о которых говорилось выше, должны быть получены откуда-то
- Исследовательское приложение -> Меньше кода biolerplait для использования. Я видел много руководств по созданию блогов, но почти ничего по финтех/финансам.
- Простой блог -> Контент создается/управляется пользователями
- …
Учитывая это, давайте рассмотрим некоторые возможности нашей обработки данных. Я предполагаю, что вы выбрали источник данных, такой как сторонний API.
В любом случае; Прежде чем сделать это, давайте быстро подумаем об управлении базой данных. Я использую postgresql для производства, и я всегда использую postgresql + django. Postgresql имеет множество утилит, связанных с django. Просто быстро изучите документы django о дополнительных утилитах для postgresql. Вы можете использовать другие поля, индексы и запросы, которые оказались весьма полезными, по крайней мере, для моей веб-разработки в прошлом.
Для данных, которые должны быть запрошены (например, через библиотеку запросов), нам каким-то образом нужно определить функциональные возможности, которые работают периодически.
Ошибки, которых следует избегать при работе с API в django:
- Выполнение работы в представлении (Недостаток: 1 ответ представления = 1 (дорогой) запрос API/взаимодействие ORM)
Вместо того, чтобы делать это, вы должны выбрать любое и надежное решение. Для небольших приложений вы можете взглянуть на django-q, для средних и больших приложений я бы определенно выбрал celery и celery beat.
Я думаю, что использование сельдерея является промежуточным. Я постараюсь рассказать о некоторых основных примерах + принципах работы с сельдереем.
Итак, прежде всего, давайте рассмотрим простую функцию запроса API, которую я запрограммировал в другом посте:
def simple_request(url: str) -> Any: assert isinstance(url, str), 'url has to be a str' # Start the request workflow response = requests.get(url) if response.status_code != 200: return None response = response.json() # Check the results (Sometimes its 200, but empty content) if not response: print('Result is empty') return None # Response was successfull return response
Так что это не обязательно должно быть внутри проекта django. Это довольно полезно для любой работы с API, но давайте посмотрим, как преобразовать его для работы с celery + celery beat.
Предварительно требования:
- Установите celery (pip) и celery beat и настройте его
- ВНИМАНИЕ: Это займет некоторое время! Пожалуйста, ознакомьтесь с официальной документацией по сельдерею, чтобы начать с настройки, и вернитесь после того, как поймете следующее.
# Terminal window celery -A my_app worker -l info -c 12 -p "eventlet" celery -A my_app beat -l info
Теперь давайте создадим новый tasks.py, в котором вы определите следующее:
from celery import shared_task @shared_task( bind=True, max_retries=5, autoretry_for=(requests.RequestException), ignore_results=False, ) def call_api(self, url: str) -> Any: assert isinstance(url, str), 'url has to be a str' # Start the request workflow response = requests.get(url) if response.status_code != 200: raise self.retry(countdown=60*1) response = response.json() # Check the results (Sometimes its 200, but empty content) if not response: print('Result is empty') return None # Response was successfull return response
Об этом коде:
- @shared_task -› декоратор, который можно использовать для ВСЕХ задач, которые вы определяете. Для определения настройки задачи требуются аргументы ключевых слов.
- bind=True, это говорит о том, что вы должны использовать self в качестве первого параметра
- max_retries, autoretry_for и поднять self.retry(countdown=60*1) -> Это просто говорит сельдерею: а) автоматически повторить/повторно запустить задачу после обратного отсчета (60*1 секунд), б) обернуть всю задачу в попытку, кроме блок, который проверяет RequestException и c) останавливает процесс повторной попытки на max_retries
- ignore_results автоматически имеет значение False, но я хочу упомянуть об этом, потому что важно сохранить результаты API для дальнейшей работы с ними.
- Вы можете настроить эти параметры, но это хороший первый шаблон для вас, если вы новичок в celery.
Итак, после объяснения давайте воспользуемся этим кодом. Поэтому я создам пример и объясню его. Внимательно прочитайте комментарии, чтобы заставить его работать/обновите его для вашего варианта использования.
from celery import shared_task, group MY_API_KEY = "" # -> Replace the empty string with your api key @shared_task( ignore_results=True, ) def update_stocks(url=None) -> Any: # We assume that we’ve programmed a Stock model with a pric DecimalField # And also with a ticker CharField and a volume BigIntegerField stocks = Stock.objects.all().only("ticker") # We also assume that there is an url where you get the data from You have to define it here f.ex. I will use FMP to get the volume and price # Make sure to define the base_url without hard coded ticker! if not url: base_url = "https://financialmodelingprep.com/api/v3/quote-short/{}?apikey={}" arguments = [ base_url.format( stock, MY_API_KEY, ) for stock.ticker in stocks.iterator() ] grouped_api_tasks = group( [call_api.s(url) for url in arguments] ) chord( grouped_api_tasks )(process_api_data.s()) @shared_task( ignore_results=True ) def process_api_data(data): # Do some custom manipulation # You can overwrite this with your own logic/data handler try: refactored = [ d[0] for d in data if isinstance(d, list) ] except Exception as e: print(e) return None if not refactored: print("Refactoring was empty") return None updated_instances = 0 for data_dict in refactored: stock = Stock.objects.get(symbol=data_dict["symbol"]) stock.volume = data_dict["volume"] stock.price = decimal.Decimal.from_float(data_dict["price"]) stock.save( update_fields=["volume", "price"] ) updated_instances += 1 continue print(f"Finsished with: {update_stocks} stocks updated")
Некоторые важные примечания:
- update_stocks -> корневая задача, которая обрабатывает выполнение дочерних и обратных вызовов
- arguments -> Это определяет список всех URL-адресов с отформатированным base_url. Так что мы можем динамически добавлять контекст тикера к каждой итерации URL.
- group() -> Используется для параллельного выполнения. Поэтому задачи API не должны ждать друг друга и выполняются независимо. Это большое преимущество холста сельдерея, поскольку вы НИКОГДА не хотите блокировать рабочий процесс.
- chord() –> запускает group() и объединяет ее в список вместе. Поэтому все результаты API будут переданы в process_api_data.s(). Не требуется передавать данные arg. Он автоматически передается в дизайне chord(). process_api_data — это наша задача обратного вызова, которая выполняется, когда выполняются ВСЕ задачи.
- process_api_data.s() -> ярлык s() — это просто короткая версия Signature(), которую необходимо использовать в рабочем процессе холста. Требуется указать программе, что мы передаем задачу сельдерея, а не обычную функцию, которая выдаст ошибку.
- задача process_api_data повторяет данные API и ищет связанный объект Stock. Затем он обновляет запрошенный объект новыми значениями данных.
- Вы можете использовать ignore_results здесь, так как здесь все достигается без необходимости в другой дочерней задаче, поэтому нам не нужно сохранять результаты, поскольку они возвращают None
Как видите, есть шаблон проектирования:
-› Корневая задача (управление задачами)
-> Чайлдс (параллельный)
-› Обратный звонок (управление базой данных)
В завершение давайте настроим задачу для работы с celery beat в change_this_with_your_app/beat.py.
from celery import Celery from celery.schedules import crontab # We assume that youve create an app instance (read the celery docs) app.conf.beat_schedule = { ‘stocks run-every-hour’: { # Change my_app with the app_label django name ‘task’: ‘my_app.tasks.update_stocks’, # Decide for your schedule ‘schedule’: crontab(minute=1, hour=’*’ days_of_week=’1-5’ ) }
О коде:
- «задачи» — это путь к вашей определенной задаче
- «расписание» определяет используемый crontab. Этот устанавливает расписание на каждую минуту каждый час с понедельника по пятницу.
Я попытаюсь описать оптимизированную версию приведенного выше кода (tasks.py), потому что она не будет хорошо масштабироваться при БОЛЬШОМ количестве акций. Если вы хотите, чтобы я рассказал об этом в следующем посте, обязательно выразите мне свой интерес.
Мой фон: https://palmy-investing.com/
Еще несколько постов, о которых я хотел бы написать (если есть интерес):
Некоторые из тем, которые пришли бы мне в голову:
- Как создать диаграммы OHLCV? Какие библиотеки использовать? Опять же, я мог бы написать об обоих; приложениеa& локально
- Как работать с большими объемами данных? Статистические методы с пандами и numpy
- Подробный обзор API. Какой API объективно лучший для вас?
Как провести анализ настроений? Где я могу получить новостные статьи, связанные с акциями и криптовалютой?