Вдохновение
Серверная часть платформы Thinkdata Works состоит из примерно дюжины микросервисов Golang. Эти сервисы используют protobufs поверх RPC для связи с нашим уровнем API, а также друг с другом. За последние несколько лет эти услуги стали разнообразными и непоследовательными по условностям, функциям и стилю.
В течение последних 12 месяцев мы ставили перед собой цель «модернизировать» эти сервисы — создать единый стиль, гарантирующий использование лучших практик по всем направлениям. Создание комплексного опыта разработки позволило улучшить наши шаблоны обработки ресурсов, навыки управления базами данных, видимость ошибок и методы тестирования. Это начинание позволило нам выявить ряд шаблонов, которые мы использовали для решения различных технических задач на нашей платформе. Мы извлекли их в собственные репозитории, чтобы сделать их бесплатными для сообщества Golang. Мы надеемся, что они могут быть полезны всем, кто сталкивается с аналогичными проблемами.
гопайплайн
просмотреть на github
Большая часть прошлой работы включала задачи, связанные с последовательной обработкой или перемещением ресурсов между местами хранения. Дальнейшие итерации показали, что этот шаблон хорошо масштабируется для одновременной работы с несколькими входными данными. Инвестиции в постоянное развитие этих процессов приносили дивиденды с точки зрения производительности.
Этот шаблон был извлечен как gopipeline — пакет с минимальными зависимостями и поддержкой дженериков go 1.18+. Это позволяет разработчику легко формировать конвейер обработки, не беспокоясь об управлении изменениями параллелизма. Этот пакет лучше всего использовать, когда выполняемые шаги выполнения могут быть следующими:
- долго выполняющиеся задачи
- работа с удаленными ресурсами или запрос на них
- выполнение операций ввода-вывода
Этот пакет включает в себя останавливаемую и непрерывную обработку ошибок, отчеты о ходе выполнения и обработку групп ожидания, чтобы один или несколько конвейеров могли работать с другими асинхронными процессами.
Пример:
package main import ( "context" "time" "github.com/thinkdata-works/gopipeline/pkg/gopipeline" ) type resource struct { id string signature string external_url string } func process(ctx context.Context, resources []resource) []error { errs := []error{} // Define the new pipeline with concurrency count and size pipeline := gopipeline.NewPipeline[*resource](5, 100) // Register our function that will feed values to the top of the pipeline pipeline.RegisterInputProvider(func(ctx context.Context, c chan *resource) { defer close(c) for _, r := range resources { c <- &r } }) // Register our error handler pipeline.RegisterErrorHandler(func(err error) bool { errs = append(errs, err) return true // return false for non-halting }) // Compose our steps pipeline.RegisterSteps( getNewExternalUrl, reSignResource, applyChanges, notifyDownstream, ) pipeline.RegisterReporter( 5 * time.Second, func(r Report) { fmt.Printf( "\n\n=====Begin stats=====\nTotal items finished: %d\nTotal items in pipeline: %d\nAverage items per second: %.6f\n=====End stats=====\n\n", r.TotalFinished, r.TotalInPipeline, r.ItemsPerSecond, ) }, ) err := pipeline.Work(ctx) if err != nil { errs = append(errs, err) } return errs } func getNewExternalUrl(ctx context.Context, r *resource) (*resource, error) { // Dispatch external request url, err := external_services.GetNewUrl(r.id) if err != nil { return r, err } r.external_url = url return r, nil } func reSignResource(ctx context.Context, r *resource) (*resource, error) { // Dispatch request to create new signature signature, err := external_services.SignUrl(r.id, r.external_url) if err != nil { return r, err } r.signature = signature return r, nil } func applyChanges(ctx context.Context, r *resource) (*resource, error) { // Apply changes to some kind of storage err := storage.ApplyResourceChanges(r.id, r.signature, r.external_url) if err != nil { return r, err } return r, nil } func notifyDownstream(ctx context.Context, r *resource) (*resource, error) { external_services.NotifyListeners(r.id, r.signature, r.external_url) return r, nil }
гокеш
просмотреть на github
Появившиеся в результате нашей работы с gopipeline, конвейеры часто повторно запрашивали одни и те же удаленные ресурсы на своих этапах. Мы остановились на шаблоне кэширования, который позволял одновременно выполнять шаги конвейера для постановки в очередь запросов к одному и тому же ресурсу. На основе этого мы создали gocacheкэш элементов на основе обещаний с минимальной зависимостью и поддержкой дженериков go 1.18+.
Пример:
package main import ( "io/ioutil" "net/http" "github.com/thinkdata-works/gopipeline/pkg/gocache" ) type User struct { id uint64 `json:"id"` login string `json:"login"` } type UserService struct { userCache *gocache.Cache[string, *User] } func NewUserService() *UserService { return &UserService{ userCache: gocache.NewCache[string, *User] } } func (u *UserService) Get(id string) (*User, error) { user, err := u.userCache.Get(id, func() (*User, error) { r, err := http.Get(fmt.Sprintf("http://users:80/%s", id)) if err != nil { return err } var u User err := json.NewDecoder(r.Body).Decode(&u) if err != nil { return nil, err } return &u, nil }) if err != nil { return nil, err } return user, nil }
Особая благодарность Мэтту Поллаку и Кевину Бирку за их вклад в разработку и авторство этих проектов.
Оба этих репозитория проверены в боевых условиях и готовы к производству. Пожалуйста, посетите репозитории для получения полной документации, реализации и примеров. Мы с радостью приветствуем отзывы и сотрудничество в этой работе.