В недавно опубликованной статье Реактивные потоки и многопоточность. Если вы разработчик JavaScript, вы знаете, что JavaScript является однопоточным, и можете подумать, что это не имеет отношения к вам. Не так быстро! Принципы, обсуждаемые в этой статье, также применимы к RxJS, и вы даже можете добиться параллелизма в JavaScript. Итак, позвольте мне поделиться некоторыми экспертными знаниями.

Примечание: в примерах используется RxJS ≥ 6

Планировщики

В вышеупомянутой статье я показал, что вы можете позволить потокам работать в разных потоках, используя операторы publishOn и subscribeOn. Эти операторы также существуют в RxJS (publishOn называется ObservationOn). Как они работают, если вы не можете использовать их для выбора темы? Они влияют на то, когда потоки выполняются. Они делают это с помощью планировщиков.

Проще говоря, планировщик контролирует выполнение единицы работы. В RxJx встроено пять планировщиков, три из которых относятся к асинхронному выполнению:

  • Как можно скорее: планирует выполнение в очереди для микрозадач
  • Асинхронный: планирует выполнение в очереди для макрозадач.
  • Кадр анимации: планирование выполнения следующего кадра анимации.

Если вы хотите узнать больше о микрозадачах, макрозадачах и кадрах анимации, я рекомендую видео YouTube «Дальнейшие приключения цикла событий».

А пока позвольте мне сказать, что использование одного из этих планировщиков делает поток асинхронным. Чтобы использовать их в текущей версии RxJS, вы можете импортировать их следующим образом:

import {animationFrameScheduler, asapScheduler, asyncScheduler} from 'rxjs';

В старых версиях вроде этого:

import { async } from 'rxjs/scheduler/async';
import { asap } from 'rxjs/scheduler/asap';
import { animationFrame } from 'rxjs/scheduler/animationFrame';

Помимо операторов, упомянутых ранее, вы также можете передать планировщик в другие функции, например of:

of(1,2, asyncScheduler)
  .subscribe(console.log);
console.log("Before or after?")
//Prints
//Before or after?
//1
//2

Без планировщика "До или после?" напечатали бы наконец.

Разветвляйся и присоединяйся

Распространенной задачей является получение данных с сервера, и часто нам требуется более одного запроса. Прежде чем мы получим ответ, требуется некоторое время, и, чтобы быть быстрее, рекомендуется иметь независимые запросы параллельно.

Хотя отправка запроса и обработка ответа не могут выполняться параллельно из-за ограничения одного потока в JavaScript (или может?), Нам не нужно ждать одного ответа, прежде чем мы сможем отправить следующий запрос.

Но что, если нам нужны ответы на все запросы, прежде чем мы сможем продолжить? Дамы и господа, я представляю forkJoin:

import { forkJoin, from } from 'rxjs';
forkJoin(
  from(fetch(someUrl)),
  from(fetch(someOtherUrl)),
)
.subscribe(console.log);
//Prints
//[Response, Response]

Функция fetch делает запрос (AJAX). Функция from создает наблюдаемый объект, который генерирует ответ. forkJoin создает наблюдаемый объект, который ожидает получения обоих ответов и генерирует массив, содержащий оба ответа.

Если вы знакомы с обещаниями, это похоже на Promise.all ():

Promise.all(
  fetch(someUrl),
  fetch(someOtherUrl)
)
.then(console.log)

Теперь мы включили в наш поток некоторую степень параллелизма, и я думаю, что мы готовы к реальному параллельному выполнению.

Веб-воркеры

Были введены веб-воркеры, позволяющие выполнять код JavaScript в фоновом режиме в другом потоке. Наконец-то многопоточность!

Однако вы не можете просто выполнить какой-либо код с помощью веб-воркеров. Чтобы избежать опасностей многопоточности и сохранить однопоточность, веб-воркеры JavaScript отделены от другого кода и могут взаимодействовать только с сообщениями.

Для тех, кто никогда не использовал воркеры, вот как выглядит рабочий код:

onmessage = function(e) {
  //do something
  const result = ...
  postMessage(result);
}

И основной сценарий:

const myWorker = new Worker(‘worker.js’);
myWorker.onmessage = result => {
  //do something with the result
}
myWorker.postMessage("foo");

Основной сценарий отправляет сообщение «foo», рабочий что-то делает и возвращает результат основному сценарию с помощью postMessage, который основной сценарий получает через обработчик событий onmessage.

Как мы можем использовать это с наблюдаемыми? Самый простой способ - использовать flatMap / mergeMap.

function processWithWorker(value) {
  return new Observable(subscriber => {
    const myWorker = new Worker('worker.js');
    myWorker.onmessage = result => {
      subscriber.next(result);
      subscriber.complete();
      myWorker.terminate();
    }
    myWorker.postMessage(value);
  });
}
of(1,2)
.pipe(
  flatMap(processWithWorker)
)
subscribe(console.log);
//prints whatever the workers return

Для простоты мы создаем воркер для каждого значения, отправляемого источником.

Значение испускается, затем создается рабочий. Мы отправляем ему сообщение и возвращаем новый наблюдаемый. Этот наблюдаемый излучает значение, как только рабочий возвращает его. Обратите внимание, что мы испускаем также сигнал завершения и завершаем рабочий процесс.

Рабочий код выполняется параллельно с основным сценарием, но из-за природы потоков значения по-прежнему обрабатываются одно за другим. Что, если мы хотим, чтобы они обрабатывались параллельно? Что ж, это невозможно без затемнения смысла кода. Но если у вас есть только несколько фиксированных значений, вы можете использовать forkJoin:

forkJoin(
 processWithWorker(1),
 processWithWorker(2),
}
.subscribe(console.log);

Оба значения будут обрабатываться параллельно и независимо от основного скрипта.

Заключение

Хотя JavaScript является однопоточным, RxJS следует тем же принципам, что и другие библиотеки для реактивных потоков. Мы можем создавать асинхронные потоки, иметь некоторую степень параллелизма, а веб-воркеры даже допускают параллелизм.