Привет, я Наоки Комото (河本 直起), работаю инженером по машинному обучению в AnyMind.

В AnyMind мы разрабатываем среду MLOps, и в предыдущей статье мы представили инфраструктуру данных для машинного обучения с использованием BigQuery View Table.

В нашей предыдущей статье об инфраструктуре пакетного прогнозирования Vertex AI мы представили архитектуру, использующую Vertex AI во время ее введения. С тех пор мы внесли некоторые улучшения в ходе работы, поэтому мы хотели бы представить эти улучшения в этой статье.

Текущий процесс машинного обучения

Во-первых, как предварительное условие, мы вводим текущий поток от генерации данных до обслуживания результатов прогнозирования.

В настоящее время мы создаем входные данные, обучающие модели и пакетное прогнозирование, а также обслуживаем результаты прогнозирования следующим образом:

Сначала входные данные подготавливаются в виде таблицы просмотра в BigQuery, а конвейер обучения модели обучает модель на основе данных. Последующий конвейер пакетного прогнозирования выполняет прогнозирование с использованием обученных моделей и сохраняет результаты прогнозирования в BigQuery и Firestore (режим хранилища данных). Firestore (режим хранилища данных) используется для получения результатов прогнозирования.

Приложение продукта запрашивает ключи (например, идентификатор пользователя для прогнозирования для каждого пользователя) и метаданные (например, максимальное количество результатов, необходимых для рекомендации) у API, обслуживающего результаты прогнозирования. Затем API обслуживания извлекает результаты прогнозирования из Firestore (режим хранилища данных) на основе запроса и возвращает их. Обслуживающий API развернут в Cloud Run.

Дополнительные сведения о процессе генерации входных данных см. в следующей статье:



Дополнительные сведения об обучении модели, пакетном выводе и процессе обработки результатов вывода см. в следующих статьях:



Улучшения

В настоящее время мы используем монорепозиторий для создания входных данных, как описано в этой статье.

А для обучения модели и конвейера пакетного прогнозирования, а также Serving API для результатов прогнозирования каждый проект имеет собственный репозиторий. Причина этого заключалась в том, что обработки, общей для всех проектов, было не так много, поэтому от стандартизации было мало пользы, а лучшие практики для разработки еще не были установлены, поэтому риск стандартизации был высок.

Последующие разработки в нескольких проектах в некоторой степени показали, какие части должны быть стандартизированы, а какие должны быть реализованы на индивидуальной основе. Кроме того, поскольку увеличение затрат на техническое обслуживание и стоимость новых разработок в результате разработки по проектам стали слишком высокими, чтобы их можно было игнорировать, мы решили продолжить частичную стандартизацию.

В частности, для конвейера обучения модели и пакетного прогнозирования мы стандартизировали развертывание и мониторинг. Для процессов, общих для всех проектов, мы создали общие компоненты, чтобы их стандартизировать. Что касается API, обслуживающего результаты прогнозирования, мы разделили общие процессы на модули. Мы также создали репозитории шаблонов для конвейеров обучения модели и пакетного прогнозирования, а также для API, обслуживающего результаты прогнозирования, чтобы уменьшить объем работы, необходимой для новых разработок, и уменьшить вариативность методов разработки среди разработчиков.

В результате для разработки на основе проектов нам нужно только разработать специфичные для проекта компоненты и конвейеры для конвейеров обучения модели и пакетного прогнозирования, а также интерфейсы для API, обслуживающего результаты прогнозирования.

Далее мы надеемся представить детали этих улучшений.

Стандартизация процесса развертывания Vertex Pipelines

Вот что мы сейчас делаем для развертывания Vertex Pipelines:

  1. Отправка образов компонентов в Google Container Registry (GCR)
  2. Добавить информацию о конфигурации в файлы определения компонентов
  3. Загружать общие файлы определения компонентов, общие для проектов
  4. Компиляция конвейера и отправка в Google Cloud Storage (GCS)
  5. Создать задание конвейера (для одноразового запуска)
  6. Создать планировщик конвейера (для периодического запуска)

Когда задание конвейера выполняется один раз, общая структура процесса развертывания Vertex Pipelines выглядит следующим образом:

Каждый репозиторий развертывается путем отправки файлов конфигурации в Cloud Build через CircleCI.

Перед развертыванием конвейера функция создания заданий конвейера была развернута в Cloud Functions, а пакет с процессом развертывания конвейера (Пакет развертывания конвейера) был выпущен в реестр артефактов. Образы компонентов Common Components хранятся в GCR, а файлы определений хранятся в GCS.

Если задание конвейера является одноразовым, при обновлении репозитория конвейера для конкретного проекта будет создан образ Cloud Builder для развертывания конвейера, а также установлен и выполнен пакет с процессом развертывания конвейера. Затем, после загрузки файлов определения компонентов, общих для всех проектов, компоненты, специфичные для проекта, загружаются, а конвейер компилируется и передается. После этого вызывается функция создания задания конвейера с настройками для создания задания конвейера.

Если задание конвейера является запланированным запуском, структура выглядит следующим образом:

Когда репозиторий конвейера для конкретного проекта обновляется, установленный пакет для процесса развертывания создает планировщик, который периодически вызывает функцию для создания заданий конвейера. Это гарантирует, что задания конвейера создаются на регулярной основе.

Для получения подробной информации об этом потоке развертывания следующих процессов:

  • Отправка образов компонентов в Google Container Registry (GCR)
  • Добавить информацию о конфигурации в файлы определения компонентов
  • Компиляция конвейера и отправка в Google Cloud Storage (GCS)

Пожалуйста, обратитесь к следующей статье для получения дополнительной информации.



Здесь мы надеемся представить детали следующих процессов и способов их стандартизации.

  • Загружать общие файлы определения компонентов, общие для проектов
  • Создать задание конвейера (для одноразового запуска)
  • Создать планировщик конвейера (для периодического запуска)

Создание общих компонентов между проектами

Во-первых, мы объясним создание компонентов, общих для проектов, описанных ниже. Общие компоненты развертываются из репозитория, отдельного от каждого конвейера, и конвейеры загружают развернутые общие компоненты при развертывании.

Как упоминалось выше, для конвейеров обучения модели и пакетного прогнозирования для каждого проекта создается отдельный репозиторий. По этой причине компоненты Kubeflow, которые обычно используются между проектами, создаются и развертываются в отдельном репозитории, и конвейер каждого проекта обращается к этим компонентам при развертывании.

Следующие два необходимы для использования компонента:

  • Изображение компонента
  • Файл определения компонента (component.yaml)

Поскольку путь к образу компонента описан в файле определения компонента, из конвейера проекта необходимо загрузить только файл определения компонента. Таким образом, общий репозиторий компонентов создает образы компонентов, передает их в GCR и сохраняет файлы определений компонентов в GCS.

Файлы определения компонентов автоматически загружаются локально из GCS в процессе развертывания каждого конвейера в модуле обработки развертывания Vertex Pipelines, описанном ниже.

Процесс загрузки развернутых общих компонентов выглядит следующим образом:

import os
from pathlib import Path
from gcsfs import GCSFileSystem


def _load_common_components(
    common_component_download_dir: str,
    common_component_dir: str
    ) -> None:
    fs = GCSFileSystem()
    common_component_dirs = fs.ls(common_component_dir)
    for component_dir in common_component_dirs:
        component_file = os.path.join(component_dir, "component.yaml")
        component_name = component_dir.split("/")[-1]
        local_path = os.path.join(
            common_component_download_dir, component_name, "component.yaml")
        Path(local_path).parent.mkdir(parents=True, exist_ok=True)
        fs.get(component_file, local_path)


_load_common_components(
    common_component_download_dir='local_path_load_component_file_to',
    common_component_dir="gcs_path_component_is_uploaded")
```

Since the component definition file is automatically loaded when the pipeline is deployed, the pipeline can be refered in same way as other project-specific components by specifying the directory as follows:

```Python
component_store = kfp.components.ComponentStore(
    local_search_paths=[
        "component/",  # local directory project specific component definition files locate
        "component/common/"  # local directory common component definition files locate
    ]
)

project_specific_component_op = component_store.load_component('project_specific_component')
common_component_op = component_store.load_component('common_component')

Обычно используемые компоненты включают следующие процессы, например:

  • Зарегистрируйте результаты прогнозирования в BigQuery
  • Регистрация результатов прогнозирования в Firestore (режим хранилища данных)
  • Получить время начала выполнения

Создание конвейерного задания

Далее мы объясним описанный ниже процесс создания задания конвейера. Функция создания конвейерных заданий развертывается в Cloud Functions, а конвейерные задания создаются путем вызова этой функции.

Функции для создания конвейерных заданий следующие:

import os
import json
import dataclasses
from google.cloud import aiplatform


# set project id as environment variable in docker image from cloudbuild.yaml
_PROJECT_ID = os.getenv('PROJECT_ID')
_PIPELINE_SERVICE_ACCOUNT = f"service-account-for-pipeline-job-creation@{_PROJECT_ID}.iam.gserviceaccount.com"


class TypeValidationError(Exception):
    pass


@dataclasses.dataclass
class Arguments:
    display_name: str
    pipeline_spec_uri: str
    pipeline_root: str
    location: str = "asia-east1"
    enable_caching: bool = False

    def __post_init__(self):
        # validate types
        fields = dataclasses.fields(self)
        for field in fields:
            value = getattr(self, field.name)
            if not isinstance(value, field.type):
                raise TypeValidationError(
                    f"{field.name} must be {field.type}")


def main(request):
   # decode http request payload and translate into JSON object
    request = request.get_data()
    try: 
        request = json.loads(request.decode())
    except ValueError as e:
        print(f"Error decoding JSON: {e}")
        return "JSON Error", 400
    print(f'request: {request}')

    try:
        # validate and set default value
        arguments = Arguments(**request)
    except TypeError as e:
        print(f"Error getting arguments: {e}")
        return "Invalid Argument", 400
    except TypeValidationError as e:
        print(f"Error validating argument type: {e}")
        return "Invalid Argument Schema", 400

    print("initialize aiplatform client")
    aiplatform.init(
        project=_PROJECT_ID,
        location=arguments.location,
    )

    print("create pipeline job")
    try:
        job = aiplatform.PipelineJob(
            display_name=arguments.display_name,
            template_path=arguments.pipeline_spec_uri,
            pipeline_root=arguments.pipeline_root,
            enable_caching=arguments.enable_caching
        )
        job.submit(
            service_account=_PIPELINE_SERVICE_ACCOUNT
        )
        print("job submitted")

        # send success message to slack

    except Exception as e:
        
        # send error message to slack

        return "Internal Error", 500
    return "Job Completed", 200 

В предыдущей статье реализован вызов через PubSub. Для упрощения обработки он был изменен для вызова с запросами Http. Кроме того, ранее проверка схемы выполнялась схемой сообщения PubSub, но теперь проверка типа выполняется внутри функции.

Хотя в приведенном выше коде это опущено, когда процесс создания конвейерного задания завершается или завершается сбоем, в Slack отправляется сообщение, уведомляющее об этом.

Для развертывания конвейера разработаны следующие общие клиентские модули для облачных функций:

import json
import requests
import google.oauth2.id_token
import google.auth.transport.requests


class FunctionClient:
    def __init__(
        self,
        project_id: str,
        location: str,
        function_name: str
        ) -> None:  
        request = google.auth.transport.requests.Request()
        self.url = f'https://{location}-{project_id}.cloudfunctions.net/{function_name}'
        self.token = google.oauth2.id_token.fetch_id_token(request, self.url)
    
    def request(
        self,
        request_parameter: dict
        ):
        response = requests.post(
            self.url, 
            headers={
                'Authorization': f"Bearer {self.token}", 
                "Content-Type": "application/json"},
            data=json.dumps(request_parameter)
        )
        print(f"status code: {response.status_code}")
        print(f"reason: {response.reason}")
```

To create a pipeline job, this module is called with the pipeline job configuration information as follows:

```Python
import dataclasses


_CREATE_PIPELINE_JOB_FUNCTION_NAME = "pipeline-job-creation-function-name"
_CREATE_PIPELINE_JOB_LOCATION = "pipeline-job-creation-function-location"


@dataclasses.dataclass
class _PipelineJobMessage:
    display_name: str
    location: str
    pipeline_spec_uri: str
    pipeline_root: str
    enable_caching: bool


function_client = FunctionClient(
    project_id="project_id",
    location=_CREATE_PIPELINE_JOB_LOCATION,
    function_name=_CREATE_PIPELINE_JOB_FUNCTION_NAME
)
message = dataclasses.asdict(
    _PipelineJobMessage(
        display_name="pipeline_job_display_name",
        location="pipeline_job_location",
        pipeline_spec_uri="pipeline_spec_uri",
        pipeline_root="pipeline_root",
        enable_caching="pipeline_job_enable_caching"
    )
)
function_client.request(
    request_parameter=message
)

Запланированный запуск конвейерного задания

Далее мы объясним запланированное выполнение заданий конвейера, описанных ниже. При развертывании конвейера, требующего периодического выполнения, создается облачный планировщик, который отправляет сообщения, содержащие параметры задания конвейера, в облачные функции для создания конвейера выше по указанному расписанию.

В частности, разработан следующий универсальный клиентский модуль для создания Cloud Scheduler.

import json
import dataclasses
from google.cloud import scheduler_v1


@dataclasses.dataclass
class SchedulerSetting:
    name: str
    schedule: str
    location: str = "asia-east1"
    description: str = ""
    time_zone: str = "UTC"
    is_pause: str = False


class SchedulerClient:
    def __init__(
        self,
        project_id: str
        ) -> None:
        self.project_id = project_id
        self.client = scheduler_v1.CloudSchedulerClient()

    def _if_scheduler_exists(
        self,
        scheduler_name: str,
        location: str
        ) -> bool:
        # get existing job
        list_job_request = scheduler_v1.ListJobsRequest(
            parent=f"projects/{self.project_id}/locations/{location}",
        )
        page_result = self.client.list_jobs(request=list_job_request)
        exist_scheduker_names = [i.name for i in page_result]
        return True if scheduler_name in exist_scheduker_names else False
    
    def _delete_scheduler(
        self,
        scheduler_name: str
        ) -> None:
        # delete existing job
        delete_job_request = scheduler_v1.DeleteJobRequest(
            name=scheduler_name
        )
        response = self.client.delete_job(request=delete_job_request)
        print(f"deleted: {response}")

    def _create_scheduler(
        self,
        job: scheduler_v1.Job,
        location: str
        ) -> None:
        # create scheduler if not exists
        create_job_request = scheduler_v1.CreateJobRequest(
            parent=f"projects/{self.project_id}/locations/{location}",
            job=job
        )
        response = self.client.create_job(request=create_job_request)
        print(f"created: {response}")

    def _pause_scheduler(
        self,
        scheduler_name: str
        ) -> None:
        pause_job_request = scheduler_v1.PauseJobRequest(
            name=scheduler_name
        )
        response = self.client.pause_job(request=pause_job_request)
        print(f"paused: {response}")

    def _delete_scheduler_if_exists(
        self,
        scheduler_name: str,
        location: str
        ) -> None:
        # delete previous scheduler if exists
        if self._if_scheduler_exists(
            scheduler_name=scheduler_name,
            location=location):
            print(f"delete previous scheduler: {scheduler_name}")
            self._delete_scheduler(scheduler_name)

    def _pause_scheduler_if_is_pause(
        self,
        scheduler_name: str,
        is_pause: bool
        ) -> None:
        # pause scheduler if needed
        if is_pause:
            print(f"pause scheduler: {scheduler_name}")
            self._pause_scheduler(
                scheduler_name=scheduler_name)

    def create_pubsub_trigger(
        self,
        scheduler_setting: SchedulerSetting,
        pubsub_topic_id: str,
        message: dict = {}
        ):
        self._delete_scheduler_if_exists(
            scheduler_name=scheduler_setting.name,
            location=scheduler_setting.location
        )

        string_message = json.dumps(message) 
        pubsub_target = scheduler_v1.PubsubTarget(
            topic_name=f"projects/{self.project_id}/topics/{pubsub_topic_id}",
            data=string_message.encode("utf-8")
        )
        print(f"create scheduler triggering pubsub: {scheduler_setting.name}")

        # create scheduler
        create_job = scheduler_v1.Job(
            name=scheduler_setting.name,
            description=scheduler_setting.description,
            schedule=scheduler_setting.schedule,
            pubsub_target=pubsub_target,
            time_zone=scheduler_setting.time_zone
        )
        self._create_scheduler(
            job=create_job,
            location=scheduler_setting.location)
        print(f"created scheduler: {scheduler_setting.name}")

        self._pause_scheduler_if_is_pause(
            scheduler_name=scheduler_setting.name,
            is_pause=scheduler_setting.is_pause
        )

    def create_function_trigger(
        self,
        scheduler_setting: SchedulerSetting,
        function_location: str,
        function_name: str,
        service_account: str,
        message: dict = {}
        ):
        self._delete_scheduler_if_exists(
            scheduler_name=scheduler_setting.name,
            location=scheduler_setting.location
        )
        url = f'https://{function_location}-{self.project_id}.cloudfunctions.net/{function_name}'
        print(f"http target uri: {url}")
        print(f"http target sa: {service_account}")
        token = scheduler_v1.OidcToken(
            service_account_email=service_account)

        string_message = json.dumps(message) 
        http_target = scheduler_v1.HttpTarget(
            uri=url,
            http_method=scheduler_v1.HttpMethod.POST,
            oidc_token=token,
            body=string_message.encode("utf-8")
        )
        print(f"create scheduler triggering function: : {scheduler_setting.name}")

        # create scheduler
        create_job = scheduler_v1.Job(
            name=scheduler_setting.name,
            description=scheduler_setting.description,
            schedule=scheduler_setting.schedule,
            http_target=http_target,
            time_zone=scheduler_setting.time_zone
        )
        self._create_scheduler(
            job=create_job,
            location=scheduler_setting.location)
        print(f"created scheduler: {scheduler_setting.name}")

        self._pause_scheduler_if_is_pause(
            scheduler_name=scheduler_setting.name,
            is_pause=scheduler_setting.is_pause
        )

планировщик создается путем вызова этого клиентского модуля Cloud Scheduler с сообщением, включающим параметры задания конвейера, как показано ниже:

import dataclasses


_CREATE_PIPELINE_JOB_FUNCTION_NAME = "pipeline-job-creation-function-name"
_CREATE_PIPELINE_JOB_LOCATION = "pipeline-job-creation-function-location"
_DEMPLOYMENT_SERVICE_ACCOUNT = "service-account-for-pipeline-deployment@{project_id}.iam.gserviceaccount.com"


@dataclasses.dataclass
class _PipelineJobSetting:
    display_name: str
    location: str
    pipeline_spec_uri: str
    pipeline_root: str
    enable_caching: bool


scheduler_client = SchedulerClient(
    project_id="project_id"
)
scheduler_setting = SchedulerSetting(
    name="scheduler_name",
    schedule="cron_schedule",
    location="scheduler_location",
    description="scheduler_description",
    time_zone="scheduler_time_zone",
    is_pause="if_scheduler_is_paused"
)
message = dataclasses.asdict(
    _PipelineJobSetting(
        display_name="pipeline_job_display_name",
        location="pipeline_job_location",
        pipeline_spec_uri="pipeline_spec_uri",
        pipeline_root="pipeline_root",
        enable_caching="pipeline_job_enable_caching"
    )
)
scheduler_client.create_function_trigger(
    scheduler_setting=scheduler_setting,
    function_location=_CREATE_PIPELINE_JOB_LOCATION,
    function_name=_CREATE_PIPELINE_JOB_FUNCTION_NAME,
    service_account=_DEMPLOYMENT_SERVICE_ACCOUNT.format(
        project_id="project_id"),
    message=message
)

Модулизация развертывания Vertex Pipelines

Представленные до сих пор процессы развертывания конвейера, за исключением функции создания заданий конвейера, развернутой в Cloud Functions, имеют модульную структуру для общего использования в проектах. Этот модуль обработки развертывания Vertex Pipelines выпускается в виде пакета Python в реестре артефактов, как показано ниже, и устанавливается и выполняется при развертывании каждого конвейера.

Этот модуль обработки развертывания выполняет следующие процессы:

  1. Добавьте информацию о конфигурации в файлы определения компонентов проекта.
  2. Загрузить общие файлы определения компонентов
  3. Компиляция конвейера и отправка в Google Cloud Storage (GCS)
  4. Вызовите функцию для создания задания на разовое выполнение или создайте планировщик для периодического выполнения
  5. Уведомление Slack о выполненных/неудачных заданиях

Кроме того, модуль содержит значения, которые должны быть общими для проектов, такие как часовой пояс планировщика и формат путей, по которым сохраняются различные файлы. Поэтому при разработке в каждом проекте могут быть переданы только те значения, которые необходимо задать для каждого проекта.

Развертывание конвейеров вершин

Развертывание конвейера выполняется путем подготовки образа Cloud Builder с установленными выше пакетами и выполнения файла определения конвейера.

Шнур для файла пайплайна просто следующий. Он включает в себя определение конвейера и процесс вызова модуля развертывания с информацией, необходимой для развертывания (настройки планировщика и задания конвейера) в качестве аргументов.

import os
from pathlib import Path
import kfp
from vertex_pipeline_deployer import (
    PipelineConfig,
    PipelineDeployer
)


# set as environment variable from cloudbuild.yaml
PROJECT_ID = os.environ['PROJECT_ID']
MODEL_VERSION = os.environ['MODEL_VERSION']
PROJECT_NAME = os.environ['PROJECT_NAME']

_IS_SCHEDULED = True  # if pipeline is scheduled or not

# get pipeline name from file name
PIPELINE_NAME = Path(__file__).name.split(".")[0]

# [fixed] set up pipeline specific configs
pipeline_conf = PipelineConfig(
    name=PIPELINE_NAME,
    project_id=PROJECT_ID,
    project_name=PROJECT_NAME,
    model_version=MODEL_VERSION,
    description="pipeline_description",
    enable_caching="pipeline_enable_caching"
)
if _IS_SCHEDULED:
    pipeline_conf.set_scheduler(
        schedule="scheduler_schedule",
        description="scheduler_description",
        is_pause="scheduler_is_pause"
    )


# define pipeline parameter
pipeline_parameter = {
    "param_1": "aaa",
    "param_2": "bbb"
}


# global variable for pipeline compile
PIPELINE_LOCATION = pipeline_conf.location
PIPELINE_ID = pipeline_conf.id


@kfp.dsl.pipeline(
    name=PIPELINE_ID, 
    description=pipeline_conf.description, 
    pipeline_root=pipeline_conf.root
    )
def pipeline(
    param_1: str,
    param_2: str
    ):
    
    # load components
    component_store = kfp.components.ComponentStore(
        local_search_paths=[
            "component/",  # local directory project specific component definition files locate
            "component/common/"  # local directory common component definition files locate
        ]
    )

    componentcomponent.yamlop = component_store.load_component('component_1')
    componenttask.pyop = component_store.load_component('component_2')
    
    componentcomponent.yamltask = componentcomponent.yamlop(
        param_1=param_1
    )
    componenttask.pytask = componenttask.pyop(
        param_2=param_2
    ).after(componentcomponent.yamltask)
    
    
# deploy pipeline and its parameter
pipeline_deployer = PipelineDeployer(
    pipeline_config=pipeline_conf
)
pipeline_deployer.deploy(
    pipeline_func=pipeline,
    pipeline_parameter=pipeline_parameter
)

Здесь PipelineDeployer выполняет процесс развертывания, а PipelineConfig представляет собой класс данных, содержащий информацию, необходимую для развертывания. Эти два интерфейса являются интерфейсами модуля обработки развертывания с точки зрения конвейера.

`PipelineConfig`, как упоминалось выше, содержит значения по умолчанию, правила именования и т. д. Например, правила именования для конвейера `display_name` лучше, чтобы они были одинаковыми между проектами, поэтому они генерируются внутри `PipelineConfig` из переданной версии модели, имя проекта и имя конвейера.

В реальном коде значения конфигурации задаются в файлах конфигурации для каждой среды и загружаются из файла конвейера, поэтому значения можно установить в соответствии со средой.

Этот файл выполняется в Cloud Build с использованием вышеупомянутого образа Cloud Builder следующим образом:

steps:
  - id: Deploy Pipeline
    name: "gcr_path_of_deployment_cloud_builder:latest"
    entrypoint: 'python'
    args: ['path_of_pipeline.py']

Таким образом, процессы в файле конвейера при развертывании следующие:

  1. Отправка образов компонентов в Google Container Registry (GCR)
  2. Вызов модуля обработки развертывания

Затем вызываемый модуль обработки развертывания выполняет следующую обработку:

  1. Добавьте информацию о конфигурации в файлы определения компонентов проекта.
  2. Загрузить общие файлы определения компонентов
  3. Компиляция конвейера и отправка в Google Cloud Storage (GCS)
  4. Вызовите функцию для создания задания на разовое выполнение или создайте планировщик для периодического выполнения
  5. Уведомление Slack о выполненных/неудачных заданиях

В случае периодического запуска созданный планировщик вызывает функцию создания конвейерного задания по заданному расписанию, и конвейерное задание создается.

Мониторинг выполнения заданий конвейера

Как упоминалось выше, функция создания задания конвейера и пакет развертывания конвейера отправят уведомление в следующих случаях:

  • Создание планировщика завершилось неудачно или завершено
  • Создание конвейерного задания завершилось неудачно или завершено.

В дополнение к этому подготовлена ​​функция Cloud Functions для отслеживания состояния выполнения конвейерных заданий. Эта функция отправляет уведомления в Slack в следующих случаях:

  • Когда работа конвейера завершена
  • Когда задание конвейера не выполнено
  • Когда задание конвейера продолжает выполняться дольше указанного времени

Я использовал следующую статью ZOZO Technologies в качестве ссылки для разработки этого мониторинга.



Репозиторий шаблонов

Как описано выше, конвейеры обучения модели и пакетного прогнозирования можно развернуть, вызвав модуль развертывания. А общие компоненты можно использовать для общих процессов в проектах. Здесь загрузка и вызов модулей обработки развертывания и использование общих компонентов должны быть прописаны в коде проекта при каждой разработке. По этой причине мы подготовили репозитории шаблонов, где разработчикам нужно только разработать обработку конвейера и компонентов и изменить настройки.

В Kubeflow есть функция Lightweight Python Components, которая позволяет определять компоненты как функции Python. Это очень полезная функция для простой пробной версии, но ее использование в рабочем коде дает слишком большую свободу описания кода. Чтобы предотвратить это, ограничивая использование этой функции, репозиторий шаблонов заставляет использовать только обычные компоненты.

Кроме того, определено следующее разделение, чтобы доменная логика не записывалась в конвейеры или компоненты (task.py):

  • Конкретные процессы, такие как загрузка данных, обучение модели и предварительная обработка, реализованы в виде модулей.
  • Компоненты (task.py) просто вызывают модули
  • Конвейеры описывают рабочий процесс этих компонентов.

Поэтому структура каталога, содержащего пайплайны, компоненты и модули, упрощается следующим образом:

.
├── module
├── component
│   ├── component_1
│   │   ├── task.py
│   │   └── component.yaml
│   └── component_2
│       ├── task.py
│       └── component.yaml
└── pipeline
    ├── train.py
    └── batch_predict.py

Разделение конвейеров обучения модели и пакетного прогнозирования

Как показано в приведенном выше каталоге, конвейер обучения модели и конвейер пакетного прогнозирования определены в отдельных файлах. Поэтому конвейерные задания также создаются отдельно для каждого.

После завершения обучения конвейер обучения модели сохраняет информацию об обученных моделях (например, страны, в которых модель обучалась, версию модели и время начала обучения) и флаг завершения. Конвейер пакетного прогнозирования ожидает создания флага завершения, а затем продолжает прогнозирование на основе информации о модели после завершения обучающего конвейера. Это соединяет конвейер обучения модели и конвейер пакетного прогнозирования.

Стандартизация API обслуживания результатов прогнозирования

До этого момента мы сосредоточились на конвейерах обучения моделей и пакетного прогнозирования и предприняли для них усилия по стандартизации. Наконец, мы хотели бы рассказать о наших усилиях по стандартизации API, обслуживающего результаты прогнозирования.

API, обслуживающий результаты прогнозирования, просто извлекает и возвращает результаты прогнозирования, хранящиеся в Firestore (режим хранилища данных), из конвейера обучения модели в ответ на запрос от приложения продукта, как показано ниже.

API обслуживания результатов прогнозирования стандартизирован путем упаковки процесса получения результатов прогнозирования из Firestore (режим хранилища данных).

Процесс получения результатов прогнозирования принимает одно или несколько ключевых значений результата прогнозирования, преобразует их в тип Firestore (режим хранилища данных) и возвращает преобразованное содержимое. Это скрывает спецификацию Firestore (режим хранилища данных) от вызывающей стороны. Пакет также содержит правила именования для типа и другие правила, установленные командой инженеров по машинному обучению.

Это позволяет нам просто писать интерфейс и, при необходимости, дополнительные процессы форматирования при разработке отдельных проектов.

Для API обслуживания результатов прогнозирования мы также создали репозиторий шаблонов, который содержит базовую конфигурацию API, обработку и описания интерфейсов. Пакет устанавливается во время развертывания.

Сводка

Мы разрабатываем новые конфигурации от проекта к проекту, когда они необходимы, а затем стандартизируем их, если в этом есть смысл.

В настоящее время нам удалось стандартизировать проекты машинного обучения, в которых используется пакетное прогнозирование, как описано выше, но мы еще не смогли сделать это, например, для проектов, требующих онлайн-прогнозирования. Мы надеемся представить новую систему в будущем, когда сможем ее доработать.

Другие статьи о MLOps AnyMind

Вы можете узнать больше о наших усилиях по MLOps в AnyMind в следующей статье: