Действие продолжается….

В предыдущей статье мы рассмотрели основы RxJS, исследуя ключевые понятия, такие как Observables и Subjects. Теперь, во второй части нашей серии, мы поднимем наше понимание RxJS на новый уровень.

Мы углубимся в более сложные темы, такие как операторы, планировщики и методы тестирования. Эти концепции позволят вам справляться со сложными преобразованиями данных, управлять параллелизмом и обеспечивать надежность ваших приложений на основе RxJS. Оставайтесь с нами в части 3, где мы рассмотрим еще более интересные аспекты RxJS.



Давайте продолжим наш путь к освоению реактивного программирования с помощью RxJS и Angular!

Операторы

Операторы — неотъемлемая часть RxJS, предоставляющая мощные инструменты для манипулирования и преобразования потоков данных, испускаемых Observables. Они позволяют выполнять широкий спектр операций с выдаваемыми значениями, например преобразование, фильтрацию, комбинирование, обработку ошибок и служебные задачи. Категоризация операторов может помочь нам лучше понять их назначение и выбрать подходящие для наших конкретных нужд.

Операторы преобразования

Трансформационные операторы позволяют изменять или преобразовывать испускаемые значения из Observable. Мы обсудим некоторые из них

  • map:оператор map используется для преобразования каждого испускаемого значения из исходного Observable путем применения к нему функции сопоставления. Он возвращает новый Observable с преобразованными значениями. Образец:
constructor() {
    const source = from([1, 2, 3, 4, 5]);

    source
      .pipe(map((value: number) => value * 10))
      .subscribe((value: number) => {
        this.transformedNumbers.push(value);
      });
  }

  • pluck:оператор pluck используется для извлечения определенного значения свойства из каждого испускаемого объекта в исходном Observable. Он возвращает новый Observable только с извлеченными значениями свойств.
constructor() {
    const source = from([{ v: 1 }, { v: 2 }, { v: 3 }]);

    source
      .pipe(pluck('v'))
      .subscribe((num: number) => {
        this.extractedNames.push(num);
      });
  }

  • сканирование:оператор сканирования используется для применения функции накопления к каждому выдаваемому значению и накопления промежуточных результатов. Он возвращает новый Observable, который выдает накопленные значения.
constructor() {
    const source = from([1, 2, 3, 4, 5]);

    source
      .pipe(scan((accumulator: number, value: number) => accumulator + value, 0))
      .subscribe((sum: number) => {
        this.accumulatedSums.push(sum);
      });
  }

Операторы фильтрации

  • фильтр: этот операторпозволяет выбирать определенные значения из наблюдаемого на основе условия. Он принимает функцию предиката, которая оценивает каждое испускаемое значение из наблюдаемого источника. Если функция предиката возвращает true для значения, это значение включается в результирующий наблюдаемый объект. Если он возвращает false, значение исключается или отфильтровывается.
export class EvenNumbersComponent {
  numbers$: Observable<number>;
  evenNumbers$: Observable<number>;

  constructor() {
    this.numbers$ = new Observable<number>(observer => {
      let count = 1;
      setInterval(() => {
        observer.next(count++);
      }, 1000);
    });

    this.evenNumbers$ = this.numbers$.pipe(
      filter(number => number % 2 === 0)
    );
  }
}

  • take: этот оператор позволяет нам получить определенное количество значений из наблюдаемого, а затем автоматически завершить наблюдаемое. Это позволяет нам контролировать, сколько значений мы хотим получить до того, как наблюдаемое закончится.
constructor() {
    this.numbers$ = new Observable<number>(observer => {
      let count = 1;
      setInterval(() => {
        observer.next(count++);
      }, 1000);
    });

    this.firstThree$ = this.numbers$.pipe(
      take(2)
    );
  }

  • skip:этотоператор позволяет игнорировать указанное количество значений, выдаваемых наблюдаемым объектом, и начинать выдавать значения после этого. Он эффективно пропускает начальное указанное количество значений и выдает оставшиеся значения.
export class ExampleComponent implements OnInit {
  skippedValues: number[];

  ngOnInit() {
    // Creating an observable that emits numbers from 1 to 5
    const source$: Observable<number> = Observable.of(1, 2, 3, 4, 5);

    // Applying the skip operator to skip the first two values
    const skipped$: Observable<number> = source$.pipe(skip(2));

    // Subscribing to the resulting observable
    skipped$.subscribe((value: number) => {
      // Storing the emitted values in an array
      this.skippedValues.push(value);
    });
  }
}

Комбинированные операторы

  • объединение:онобъединяет несколько наблюдаемых объектов в один наблюдаемый объект и объединяет испускаемые значения из всех наблюдаемых объектов, независимо от порядка, в единый поток значений. Он позволяет обрабатывать несколько потоков одновременно. Используя оператор слияния, вы можете объединять независимые источники данных и получать их значения в едином потоке. Это похоже на объединение различных потоков данных в один поток, обеспечивающий параллельную обработку и эффективную работу с несколькими источниками данных.
import { merge, of } from 'rxjs';

const source1$ = of('a', 'b', 'c');
const source2$ = of('d', 'e', 'f');

const merged$ = merge(source1$, source2$);
merged$.subscribe(value => console.log(value));

  • concat:оператор concat объединяет несколько наблюдаемых объектов в один наблюдаемый путем последовательного объединения их выбросов. Он подписывается на каждую наблюдаемую одну за другой и подписывается на следующую наблюдаемую только после завершения предыдущей. Он сохраняет порядок выбросов.
import { concat, of } from 'rxjs';

const source1$ = of('a','b');
const source2$ = of('x','y');

const concatenated$ = concat(source1$, source2$);
concatenated$.subscribe(value => console.log(value));

  • switchMap: этот оператор используется для выполнения динамического внутреннего наблюдаемого переключения для каждого испускаемого значения из исходного наблюдаемого. Это особенно полезно при работе со сценариями, когда вам нужно переключиться на новую внутреннюю наблюдаемую на основе последнего значения, выданного исходной наблюдаемой.

SwitchMap работает в следующие этапы:

  1. Для каждого значения, испускаемого наблюдаемым источником, применяется проекционная функция.
  2. Функция проекции генерирует внутреннюю наблюдаемую на основе испускаемого значения.
  3. Когда новое значение выдается исходным наблюдаемым, switchMap отписывается от ранее подписанного внутреннего наблюдаемого.
  4. Затем он подписывается на новую внутреннюю наблюдаемую, эффективно переключаясь на последнюю внутреннюю наблюдаемую.
  5. Испускаемые значения из новой внутренней наблюдаемой распространяются на выходную наблюдаемую, заменяя любые ранее испускаемые значения из предыдущей внутренней наблюдаемой.
import { of } from 'rxjs';
import { switchMap } from 'rxjs/operators';

const letters$ = of('A', 'B', 'C');
const numbers$ = of(1, 2, 3);

letters$.pipe(
  switchMap((letter) => numbers$.pipe(
    switchMap((number) => of(`${letter}${number}`))
  ))
).subscribe((result) => {
  console.log(result);
});

Оператор switchMap особенно полезен в ситуациях, связанных с асинхронными операциями, которые необходимо выполнять для каждого передаваемого значения. Это позволяет вам получать только самый последний результат, игнорируя любые предыдущие результаты.

Чтобы проиллюстрировать его практичность, давайте возьмем пример функции поиска в приложении.

Всякий раз, когда пользователь вводит поисковый запрос, выполняется HTTP-запрос для получения соответствующих результатов поиска. Используя switchMap, мы можем эффективно обрабатывать сценарии, в которых пользователь изменяет свой поисковый запрос до завершения предыдущего запроса. В таких случаях switchMap отменяет текущий HTTP-запрос и переключается на новый запрос, гарантируя, что пользователю будут представлены только самые последние результаты поиска. Этот подход улучшает взаимодействие с пользователем, отбрасывая все устаревшие или неполные результаты поиска.

Операторы обработки ошибок

Охватывает только пару операторов обработки ошибок.

  • catchError: этот оператор используется для перехвата ошибок, возникающих в наблюдаемом потоке, и корректной их обработки. Это позволяет вам предоставить альтернативное наблюдаемое значение или значение по умолчанию, которое будет испускаться вместо ошибки.
source$.pipe(
  catchError((error: any) => {
    console.log('An error occurred:', error);
    // Return an alternative observable or a default value
    return of('Fallback Value');
  })
);
  • retry: используется для автоматической повторной подписки на наблюдаемый источник при возникновении ошибки, что позволяет нам повторить операцию заданное количество раз. Он принимает необязательный параметр numberOfRetries, указывающий максимальное количество повторных попыток выполнения операции. В случае возникновения ошибки исходная наблюдаемая будет повторно подписана, и операция будет повторяться до тех пор, пока не будет достигнуто максимальное количество повторных попыток.
import { retry } from 'rxjs/operators';

source$.pipe(
  retry(3)
);

Коммунальные операторы

Есть также некоторые вспомогательные операторы, такие как:

  • касание: это дает возможность выполнять побочные эффекты или действия над каждым испускаемым значением наблюдаемого потока без изменения самих значений. Он часто используется для таких целей, как отладка, ведение журнала или выполнение других немодифицирующих операций. Предоставляя функцию обратного вызова, оператор tap получает каждое испускаемое значение из наблюдаемого источника. В рамках этого обратного вызова мы можем выполнять любые желаемые побочные эффекты или действия без внесения изменений в само значение.
source$.pipe(
  tap((value: any) => {
    console.log('Received value:', value);
    // Perform other side effects or actions
  })
);
  • delay.Оператор delay используется для введения задержки в выпуске значений из наблюдаемого потока. Это особенно полезно для имитации задержек или управления синхронизацией выбросов. Предоставляя параметр delayDuration, указывающий продолжительность задержки в миллисекундах, этот оператор откладывает выпуск значений из наблюдаемого источника после подписки. Это позволяет точно контролировать время выбросов в наблюдаемом потоке.
import { delay } from 'rxjs/operators';

source$.pipe(
  delay(2000)
);
  • таймер.Оператор таймер используется для создания наблюдаемого объекта, который выдает значения после указанной задержки. Он обычно используется для создания таймеров или введения задержек в наблюдаемые потоки. Оператор timer принимает два параметра: delayDuration и intervalDuration . Параметр delayDuration определяет начальную задержку перед отправкой первого значения, а параметр intervalDuration устанавливает временной интервал между последующими отправками значений. Настраивая эти параметры, вы можете контролировать время и частоту эмиссии значений в созданной наблюдаемой.
import { timer } from 'rxjs';

timer(2000, 1000);

Это были лишь некоторые из операторов, которые обычно используются. Операторы RxJS предоставляют широкий спектр функций для управления данными и управления ими в реактивном программировании. Освоение этих операторов позволит вам писать эффективный и отзывчивый код в Angular и других средах JavaScript. Исследуйте, экспериментируйте и раскрывайте потенциал операторов RxJS для создания надежных приложений.

Планировщики

Введение в планировщики

Планировщики в RxJS играют жизненно важную роль в управлении синхронизацией и выполнением наблюдаемых потоков. Они предлагают гибкость для определения того, когда и как должны выполняться операции с наблюдаемыми. Используя планировщики, мы получаем контроль над параллелизмом, вводим задержки, определяем контексты выполнения и многое другое.

Планировщики позволяют нам определять, как и когда планируются наблюдаемые выбросы. Мы можем выбрать немедленное выполнение, использовать таймеры для асинхронного выполнения или даже ориентироваться на определенные контексты, такие как веб-воркеры или кадры анимации. Эта возможность оказывается бесценной для управления параллелизмом, обработки асинхронных операций и точной настройки времени наблюдаемых выбросов.

Использование планировщиков для параллелизма

  • queueScheduler в RxJS гарантирует, что отправленные значения обрабатываются в том порядке, в котором они были получены, в соответствии с подходом «первым поступил — первым обслужен» (FIFO). Это гарантирует, что каждое значение выдается только после того, как предыдущее будет полностью обработано. Этот планировщик полезен, когда нам нужно обрабатывать значения последовательно, без параллельного выполнения или параллельных операций.
import { queueScheduler, from } from 'rxjs';

const observable = from([1, 2, 3, 4], queueScheduler);

observable.subscribe(value => {
  console.log(value);
});

Мы также можем комбинировать queueScheduler с другими операторами и планировщиками для достижения более сложного поведения. Например, мы можем ввести задержки между отправками с помощью оператора delay и queueScheduler, гарантируя, что каждая передача будет обработана до того, как следующая будет поставлена ​​в очередь.

  • asyncScheduler обычно используется для планирования асинхронных операций в приложениях Angular. Внутри используется setTimeOut, чтобы ввести задержку перед выполнением указанной задачи. Например, вы можете использовать asyncScheduler, чтобы отложить выполнение наблюдаемой эмиссии на определенное время.
import { asyncScheduler } from 'rxjs';

const delayedObservable = of('Hello').pipe(delay(1000, asyncScheduler));

// Subscribing to the delayed observable
delayedObservable.subscribe(value => {
  console.log(value); // Output: Hello (after a delay of 1 second)
});
  • animationFrameScheduler предназначен для задач, требующих синхронизации с кадром анимации браузера. Он планирует выполнение задач непосредственно перед следующим кадром анимации, обеспечивая плавную анимацию и сводя к минимуму ненужные вычисления. Например, мы можем использовать animationFrameScheduler для обновления пользовательского интерфейса на основе кадров анимации, создавая визуально приятный и отзывчивый пользовательский интерфейс.
import { animationFrameScheduler, interval } from 'rxjs';

interval(0, animationFrameScheduler).subscribe(() => {
  // Perform UI updates or animation calculations here
});

Давайте возьмем более крупный пример для приведенного выше,

Здесь мы увидим использование animationFrameScheduler для создания эффекта плавной прокрутки:

import { animationFrameScheduler, fromEvent, Observable } from 'rxjs';
import { map, observeOn } from 'rxjs/operators';

// Get the scroll position from the scroll event
const scrollPosition$: Observable<number> = fromEvent(window, 'scroll').pipe(
  map(() => window.pageYOffset),
  observeOn(animationFrameScheduler)
);

// Smooth scroll to the top when the user clicks a button
const scrollToTopButton = document.getElementById('scroll-to-top');

fromEvent(scrollToTopButton, 'click').subscribe(() => {
  scrollPosition$.subscribe((position) => {
    if (position > 0) {
      window.scrollTo({ top: position - 10, behavior: 'smooth' });
    }
  });
});

Наблюдаемый объект scrollPosition$ фиксирует позицию прокрутки окна при каждом событии прокрутки. animationFrameScheduler используется для обеспечения обновления положения прокрутки и эффекта плавной прокрутки непосредственно перед каждым кадром анимации, обеспечивая визуально плавную прокрутку. Когда пользователь нажимает кнопку «прокрутить вверх», подписывается наблюдаемый объект scrollPosition$, и окно плавно прокручивается вверх с шагом 10 пикселей, пока позиция прокрутки не достигнет 0.

Пользовательские планировщики

RxJS также позволяет нам создавать собственные планировщики для удовлетворения конкретных требований. Это может быть полезно, когда нам нужно запланировать задачи в пользовательском контексте или определить собственную логику планирования.

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

// Custom scheduler that delays emissions by a specific duration
const customScheduler = (delay: number) =>
  (source$: Observable<any>) =>
    new Observable<any>(subscriber => {
      const delayHandler = () => {
        subscriber.next();
        subscriber.complete();
      };

      asyncScheduler.schedule(delayHandler, delay);
    });

// Emit a value after a delay using the custom scheduler
const source$ = new Observable(subscriber => {
  subscriber.next('Hello');
});

source$.pipe(
  observeOn(customScheduler(2000))
).subscribe(value => {
  console.log(value); // Output: Hello (after a delay of 2 seconds)
});

Вторая часть нашей серии статей по RxJS посвящена операторам, планировщикам и методам тестирования. Мы изучили различные категории операторов и узнали об управлении выполнением с помощью планировщиков.

Оставайтесь с нами в части 3, где мы углубимся в тестирование, расширенные темы, такие как наблюдаемые объекты высокого порядка, некоторые пользовательские операторы и специфичные для Angular операции RxJS.

Если вам понравилась эта статья, пожалуйста, хлопайте в ладоши, делитесь ею с друзьями и подписывайтесь на меня, чтобы получать больше подобного контента.

Удачного обучения