Django — рабочие процессы для реальных финансовых приложений

Здравствуйте, этот пост будет посвящен некоторым вопросам инфраструктуры, таким как какую базу данных использовать, как автоматизировать получение данных и т. д. Вы можете прочитать его как введение в финансовые приложения с django.

Прежде всего, вы должны определить план проекта. Спроси себя:

  • Какие данные должны быть статическими?
  • Какие данные следует часто обновлять? Как часто мне нужны новые данные?

Для большинства веб-приложений, связанных с финтехом, нам нужно ОЧЕНЬ сосредоточиться на том, как обрабатываются данные, когда и как.

В качестве примера; Сравните веб-сайт, такой как TradingView (который не был создан с помощью django, но в любом случае является приложением, критически важным для данных), с простым блогом CRUD. CRUD ссылается на операции «Создание, чтение, обновление, удаление» и состоит из большей части веб-приложения.

Они различаются по нескольким параметрам:

  • Исследовательское приложение - › Может использовать передовые технологии для представления данных в реальном времени, например. по цене живой акции
  • Исследовательское приложение -> Некоторые данные необходимо периодически извлекать, и они не являются живыми, например. скажем, вы хотите получать 5-минутный ohlcv каждые 5 минут
  • Исследовательское приложение -> Данные, о которых говорилось выше, должны быть получены откуда-то
  • Исследовательское приложение -> Меньше кода biolerplait для использования. Я видел много руководств по созданию блогов, но почти ничего по финтех/финансам.
  • Простой блог -> Контент создается/управляется пользователями

Учитывая это, давайте рассмотрим некоторые возможности нашей обработки данных. Я предполагаю, что вы выбрали источник данных, такой как сторонний API.

В любом случае; Прежде чем сделать это, давайте быстро подумаем об управлении базой данных. Я использую postgresql для производства, и я всегда использую postgresql + django. Postgresql имеет множество утилит, связанных с django. Просто быстро изучите документы django о дополнительных утилитах для postgresql. Вы можете использовать другие поля, индексы и запросы, которые оказались весьма полезными, по крайней мере, для моей веб-разработки в прошлом.

Для данных, которые должны быть запрошены (например, через библиотеку запросов), нам каким-то образом нужно определить функциональные возможности, которые работают периодически.

Ошибки, которых следует избегать при работе с API в django:

  • Выполнение работы в представлении (Недостаток: 1 ответ представления = 1 (дорогой) запрос API/взаимодействие ORM)

Вместо того, чтобы делать это, вы должны выбрать любое и надежное решение. Для небольших приложений вы можете взглянуть на django-q, для средних и больших приложений я бы определенно выбрал celery и celery beat.

Я думаю, что использование сельдерея является промежуточным. Я постараюсь рассказать о некоторых основных примерах + принципах работы с сельдереем.

Итак, прежде всего, давайте рассмотрим простую функцию запроса API, которую я запрограммировал в другом посте:

def simple_request(url: str) -> Any:
    assert isinstance(url, str), 'url has to be a str'
    
    # Start the request workflow
    response = requests.get(url)
    
    if response.status_code != 200:
        return None
    
    response = response.json()

    # Check the results (Sometimes its 200, but empty content)
    if not response:
        print('Result is empty')
        return None
    
    # Response was successfull
    return response

Так что это не обязательно должно быть внутри проекта django. Это довольно полезно для любой работы с API, но давайте посмотрим, как преобразовать его для работы с celery + celery beat.

Предварительно требования:

  • Установите celery (pip) и celery beat и настройте его
  • ВНИМАНИЕ: Это займет некоторое время! Пожалуйста, ознакомьтесь с официальной документацией по сельдерею, чтобы начать с настройки, и вернитесь после того, как поймете следующее.
# Terminal window

celery -A my_app worker -l info -c 12 -p "eventlet"
celery -A my_app beat -l info

Теперь давайте создадим новый tasks.py, в котором вы определите следующее:

from celery import shared_task

@shared_task(
    bind=True,
    max_retries=5,
    autoretry_for=(requests.RequestException),
    ignore_results=False,
)
def call_api(self, url: str) -> Any:
    assert isinstance(url, str), 'url has to be a str'
    
    # Start the request workflow
    response = requests.get(url)
    
    if response.status_code != 200:
        raise self.retry(countdown=60*1)
    
    response = response.json()

    # Check the results (Sometimes its 200, but empty content)
    if not response:
        print('Result is empty')
        return None
    
    # Response was successfull
    return response

Об этом коде:

  • @shared_task -› декоратор, который можно использовать для ВСЕХ задач, которые вы определяете. Для определения настройки задачи требуются аргументы ключевых слов.
  • bind=True, это говорит о том, что вы должны использовать self в качестве первого параметра
  • max_retries, autoretry_for и поднять self.retry(countdown=60*1) -> Это просто говорит сельдерею: а) автоматически повторить/повторно запустить задачу после обратного отсчета (60*1 секунд), б) обернуть всю задачу в попытку, кроме блок, который проверяет RequestException и c) останавливает процесс повторной попытки на max_retries
  • ignore_results автоматически имеет значение False, но я хочу упомянуть об этом, потому что важно сохранить результаты API для дальнейшей работы с ними.
  • Вы можете настроить эти параметры, но это хороший первый шаблон для вас, если вы новичок в celery.

Итак, после объяснения давайте воспользуемся этим кодом. Поэтому я создам пример и объясню его. Внимательно прочитайте комментарии, чтобы заставить его работать/обновите его для вашего варианта использования.

from celery import shared_task, group

MY_API_KEY = "" # -> Replace the empty string with your api key

@shared_task(
    ignore_results=True,
)
def update_stocks(url=None) -> Any:
    
    # We assume that we’ve programmed a Stock model with a pric DecimalField
    # And also with a ticker CharField and a volume BigIntegerField
    
    stocks = Stock.objects.all().only("ticker")
    # We also assume that there is an url where you get the data from You have to define it here f.ex. I will use FMP to get the volume and price 
    # Make sure to define the base_url without hard coded ticker! 
    if not url:
        base_url = "https://financialmodelingprep.com/api/v3/quote-short/{}?apikey={}"
    
    arguments = [
        base_url.format(
            stock, MY_API_KEY,
        ) for stock.ticker in stocks.iterator()
    ]
    
    grouped_api_tasks = group(
        [call_api.s(url) for url in arguments]
    )

    chord(
        grouped_api_tasks
    )(process_api_data.s())

@shared_task(
    ignore_results=True
)
def process_api_data(data):
    # Do some custom manipulation
    # You can overwrite this with your own logic/data handler
    
    try:
        refactored = [
            d[0] for d in data if isinstance(d, list)
        ]
        
    except Exception as e:
        print(e)
        return None
    
    if not refactored:
        print("Refactoring was empty")
        return None
    
    updated_instances = 0
    
    for data_dict in refactored:
        
        stock = Stock.objects.get(symbol=data_dict["symbol"])
        stock.volume = data_dict["volume"]
        stock.price = decimal.Decimal.from_float(data_dict["price"])
        stock.save(
            update_fields=["volume", "price"]
        )
        updated_instances += 1
        continue
        
    print(f"Finsished with: {update_stocks} stocks updated")
    

Некоторые важные примечания:

  • update_stocks -> корневая задача, которая обрабатывает выполнение дочерних и обратных вызовов
  • arguments -> Это определяет список всех URL-адресов с отформатированным base_url. Так что мы можем динамически добавлять контекст тикера к каждой итерации URL.
  • group() -> Используется для параллельного выполнения. Поэтому задачи API не должны ждать друг друга и выполняются независимо. Это большое преимущество холста сельдерея, поскольку вы НИКОГДА не хотите блокировать рабочий процесс.
  • chord() –> запускает group() и объединяет ее в список вместе. Поэтому все результаты API будут переданы в process_api_data.s(). Не требуется передавать данные arg. Он автоматически передается в дизайне chord(). process_api_data — это наша задача обратного вызова, которая выполняется, когда выполняются ВСЕ задачи.
  • process_api_data.s() -> ярлык s() — это просто короткая версия Signature(), которую необходимо использовать в рабочем процессе холста. Требуется указать программе, что мы передаем задачу сельдерея, а не обычную функцию, которая выдаст ошибку.
  • задача process_api_data повторяет данные API и ищет связанный объект Stock. Затем он обновляет запрошенный объект новыми значениями данных.
  • Вы можете использовать ignore_results здесь, так как здесь все достигается без необходимости в другой дочерней задаче, поэтому нам не нужно сохранять результаты, поскольку они возвращают None

Как видите, есть шаблон проектирования:

-› Корневая задача (управление задачами)

-> Чайлдс (параллельный)

-› Обратный звонок (управление базой данных)

В завершение давайте настроим задачу для работы с celery beat в change_this_with_your_app/beat.py.

from celery import Celery
from celery.schedules import crontab

# We assume that youve create an app instance (read the celery docs)

app.conf.beat_schedule = {
    ‘stocks run-every-hour’: {
        # Change my_app with the app_label django name
        ‘task’: ‘my_app.tasks.update_stocks’,
        # Decide for your schedule
        ‘schedule’: crontab(minute=1, hour=’*’ days_of_week=’1-5’ )   
    }

О коде:

  • «задачи» — это путь к вашей определенной задаче
  • «расписание» определяет используемый crontab. Этот устанавливает расписание на каждую минуту каждый час с понедельника по пятницу.

Я попытаюсь описать оптимизированную версию приведенного выше кода (tasks.py), потому что она не будет хорошо масштабироваться при БОЛЬШОМ количестве акций. Если вы хотите, чтобы я рассказал об этом в следующем посте, обязательно выразите мне свой интерес.

Мой фон: https://palmy-investing.com/

Еще несколько постов, о которых я хотел бы написать (если есть интерес):

Некоторые из тем, которые пришли бы мне в голову:

- Как создать диаграммы OHLCV? Какие библиотеки использовать? Опять же, я мог бы написать об обоих; приложениеa& локально

- Как работать с большими объемами данных? Статистические методы с пандами и numpy

- Подробный обзор API. Какой API объективно лучший для вас?

Как провести анализ настроений? Где я могу получить новостные статьи, связанные с акциями и криптовалютой?