Вдохновение

Серверная часть платформы Thinkdata Works состоит из примерно дюжины микросервисов Golang. Эти сервисы используют protobufs поверх RPC для связи с нашим уровнем API, а также друг с другом. За последние несколько лет эти услуги стали разнообразными и непоследовательными по условностям, функциям и стилю.

В течение последних 12 месяцев мы ставили перед собой цель «модернизировать» эти сервисы — создать единый стиль, гарантирующий использование лучших практик по всем направлениям. Создание комплексного опыта разработки позволило улучшить наши шаблоны обработки ресурсов, навыки управления базами данных, видимость ошибок и методы тестирования. Это начинание позволило нам выявить ряд шаблонов, которые мы использовали для решения различных технических задач на нашей платформе. Мы извлекли их в собственные репозитории, чтобы сделать их бесплатными для сообщества Golang. Мы надеемся, что они могут быть полезны всем, кто сталкивается с аналогичными проблемами.

гопайплайн

просмотреть на github

Большая часть прошлой работы включала задачи, связанные с последовательной обработкой или перемещением ресурсов между местами хранения. Дальнейшие итерации показали, что этот шаблон хорошо масштабируется для одновременной работы с несколькими входными данными. Инвестиции в постоянное развитие этих процессов приносили дивиденды с точки зрения производительности.

Этот шаблон был извлечен как gopipeline — пакет с минимальными зависимостями и поддержкой дженериков go 1.18+. Это позволяет разработчику легко формировать конвейер обработки, не беспокоясь об управлении изменениями параллелизма. Этот пакет лучше всего использовать, когда выполняемые шаги выполнения могут быть следующими:

  • долго выполняющиеся задачи
  • работа с удаленными ресурсами или запрос на них
  • выполнение операций ввода-вывода

Этот пакет включает в себя останавливаемую и непрерывную обработку ошибок, отчеты о ходе выполнения и обработку групп ожидания, чтобы один или несколько конвейеров могли работать с другими асинхронными процессами.

Пример:

package main

import (
 "context"
 "time"
 "github.com/thinkdata-works/gopipeline/pkg/gopipeline"
)

type resource struct {
 id           string
 signature    string
 external_url string
}

func process(ctx context.Context, resources []resource) []error {
 errs := []error{}

 // Define the new pipeline with concurrency count and size
 pipeline := gopipeline.NewPipeline[*resource](5, 100)

 // Register our function that will feed values to the top of the pipeline
 pipeline.RegisterInputProvider(func(ctx context.Context, c chan *resource) {
  defer close(c)
  for _, r := range resources {
   c <- &r
  }
 })

 // Register our error handler
 pipeline.RegisterErrorHandler(func(err error) bool {
  errs = append(errs, err)
  return true // return false for non-halting
 })

 // Compose our steps
 pipeline.RegisterSteps(
  getNewExternalUrl, reSignResource, applyChanges, notifyDownstream,
 )

 pipeline.RegisterReporter(
  5 * time.Second, func(r Report) {
   fmt.Printf(
   "\n\n=====Begin stats=====\nTotal items finished: %d\nTotal items in pipeline: %d\nAverage items per second: %.6f\n=====End stats=====\n\n",
   r.TotalFinished, r.TotalInPipeline, r.ItemsPerSecond,
   )
  },
 )

 err := pipeline.Work(ctx)
 if err != nil {
  errs = append(errs, err)
 }

 return errs
}

func getNewExternalUrl(ctx context.Context, r *resource) (*resource, error) {
 // Dispatch external request
 url, err := external_services.GetNewUrl(r.id)
 if err != nil {
  return r, err
 }
 r.external_url = url
 return r, nil
}

func reSignResource(ctx context.Context, r *resource) (*resource, error) {
 // Dispatch request to create new signature
 signature, err := external_services.SignUrl(r.id, r.external_url)
 if err != nil {
  return r, err
 }
 r.signature = signature
 return r, nil
}

func applyChanges(ctx context.Context, r *resource) (*resource, error) {
 // Apply changes to some kind of storage
 err := storage.ApplyResourceChanges(r.id, r.signature, r.external_url)
 if err != nil {
  return r, err
 }
 return r, nil
}

func notifyDownstream(ctx context.Context, r *resource) (*resource, error) {
 external_services.NotifyListeners(r.id, r.signature, r.external_url)
 return r, nil
}

гокеш

просмотреть на github

Появившиеся в результате нашей работы с gopipeline, конвейеры часто повторно запрашивали одни и те же удаленные ресурсы на своих этапах. Мы остановились на шаблоне кэширования, который позволял одновременно выполнять шаги конвейера для постановки в очередь запросов к одному и тому же ресурсу. На основе этого мы создали gocacheкэш элементов на основе обещаний с минимальной зависимостью и поддержкой дженериков go 1.18+.

Пример:

package main

import (
 "io/ioutil"
 "net/http"
 "github.com/thinkdata-works/gopipeline/pkg/gocache"
)

type User struct {
 id uint64 `json:"id"`
 login string `json:"login"`
}

type UserService struct {
 userCache *gocache.Cache[string, *User]
}

func NewUserService() *UserService {
 return &UserService{
  userCache: gocache.NewCache[string, *User]
 }
}

func (u *UserService) Get(id string) (*User, error) {
 user, err := u.userCache.Get(id, func() (*User, error) {
  r, err := http.Get(fmt.Sprintf("http://users:80/%s", id))
  if err != nil {
   return err
  }

  var u User
  err := json.NewDecoder(r.Body).Decode(&u)
  if err != nil {
   return nil, err
  }
  return &u, nil
 })
 if err != nil {
  return nil, err
 }
 return user, nil
}

Особая благодарность Мэтту Поллаку и Кевину Бирку за их вклад в разработку и авторство этих проектов.

Оба этих репозитория проверены в боевых условиях и готовы к производству. Пожалуйста, посетите репозитории для получения полной документации, реализации и примеров. Мы с радостью приветствуем отзывы и сотрудничество в этой работе.