RxJS — Operators and Usage

Rambabu Padimi
3 min readNov 6, 2020
Observables & Observer://Observableconst testObservable = new Observable(subscriber => {
subscriber.next("RED");
subscriber.next("GREEN");
subscriber.next("BLUE");
setTimeout(() => {
subscriber.next("WHITE");
subscriber.next("YELLOW");
subscriber.complete();
}, 1000);
});
// observerconst testObserver = {
next: function(value: any) {
console.log(value);
},
error : function(error: any) {
console.log(error);
},
complete: function() {
console.log('completed');
}
};
testObservable.subscribe(testObserver);
Subject// subjectconst listSubject = new Subject();
listSubject.subscribe((result) => {
console.log('subject result one: ',result);
});
listSubject.subscribe((result) => {
console.log('subject result two: ',result);
});
listSubject.next(['red', 'green', 'blue']);
Filter// filterconst filterObservable = new Observable(sub => {
sub.next(23);
sub.next(34);
sub.next(36);
});
filterObservable.pipe(filter((item: any) => {
return (item % 2) === 0;
})).subscribe(result => {
console.log('filter result is---',result);
});
debounceTime, distinctUntilChanged// debounceTime, distinctUntilChangedconst input = document.querySelector('input');
const searchObservable = fromEvent(input,'input');
searchObservable.pipe(
map((event: any) => event.target.value),
debounceTime(2000),
distinctUntilChanged()).subscribe(value => {
console.log('search result: ' + value);
});
reduce// reduceof(33, 44, 56, 67, 78).pipe(reduce((total, val) => {
return total + val;
})).subscribe(result => {
console.log('reduce result: ',result);
});
scan// scanof(33, 44, 56, 67, 78).pipe(scan((total, val,index) => {
return total + val;
})).subscribe(result => {
console.log('scan result ',result);
});
map// mapconst listObservable = new Observable(subscriber => {
subscriber.next(23);
subscriber.next(26);
subscriber.next(28);
subscriber.next(29);
});
listObservable.pipe(map((item) => {
return (+item) + 10;
})).subscribe(value => {
console.log('map values: ',value);
});
mergeMap// mergeMapconst input1 = document.querySelector('#input1');
const input2 = document.querySelector('#input2');
let observable1 = fromEvent(input1, 'input');
let observable2 = fromEvent(input2, 'input');
observable1.pipe(
mergeMap((event1: any) => {
return observable2.pipe(
map((event2: any) => {
return event1.target.value + ' '+ event2.target.value;
}))
})).subscribe(result => {
console.log('merge map result: ',result)
});
switchMap// switchMapconst switchMapButton = document.querySelector('#switch-map');
const switchOb1 = fromEvent(switchMapButton, 'click');
const switchOb2 = interval(1000);
switchOb1.pipe(
switchMap(event => {return switchOb2})
).subscribe(result => {
console.log('switch map result: ',result);
});
startWith, endWith// startWithconst source = of('one','two','three');
const result = source.pipe(startWith('list is:'));
result.subscribe(value => {
console.log('startwith operator result: ',value);
});
// endWithsource.pipe(endWith('list end;')).subscribe(result => { console.log('endwith opeartor result: ',result)});
pairWise//pairwiseconst pairwiseObservable = new Observable(subscriber => {
subscriber.next(23);
subscriber.next(25);
subscriber.next(28);
subscriber.next(28);
subscriber.next(29);
});
pairwiseObservable
.pipe(pairwise())
.subscribe(result => {
console.log(result);
});
retryWhen// retryWheninterval(1000).pipe(map(value => {
if(value > 5) {
throw value;
}
return value;
}),
retryWhen(errors => {
return errors.pipe(
tap(value => console.log(`value ${value} is too high`)),
delayWhen(value => timer(value * 1000)))
})).subscribe(result => {
console.log('retry when result ',result);
});
zip// zipconst source1 = of('one');
const source2 = of('two');
const source3 = of('three');
const source4 = of('four');
const zipResult = zip(source1,
source2.pipe(delay(1000)),
source3.pipe(delay(5000)),
source4.pipe(delay(3000)));
zipResult.subscribe(result => {
console.log('zip result ',result);
});
groupBy// groupBy
const people = [{name : 'John', grade: 'A', age: 25},
{name : 'Krish', grade: 'B', age: 28},
{name : 'Mike', grade: 'B', age: 29},
{name : 'Jack', grade: 'C', age: 31},
{name : 'Tom', grade: 'D', age:43},
{name : 'Jim', grade: 'A', age: 29}];
from(people).pipe(groupBy(person => {
return person.grade
}),
mergeMap(group => {
return group.pipe(toArray());
})).subscribe(result => {
console.log('group by result: ',result);
});
skip, skipUntil, skipWhile// skipinterval(1000).pipe(skip(5)).subscribe(result => {
console.log('skip result: ',result);
});
// skipUntilinterval(1000).pipe(skipUntil(timer(5000))).subscribe(result => {
console.log('skip until result: ',result);
});
// skipWhileinterval(1000).pipe(skipWhile(value => {return
value<5
})).subscribe(result => {
console.log('skip while result: ',result);
});
takeWhile// takeWhileconst takeWhileObservable =  of(3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3);takeWhileObservable.pipe(takeWhile(item => {
return item <= 5;
})).subscribe(result => {
console.log('take while result: ',result);
});
min, max// minof({name : 'John', grade: 'A', age: 25},
{name : 'Krish', grade: 'B', age: 28},
{name : 'Mike', grade: 'B', age: 29},
{name : 'Jack', grade: 'C', age: 31},
{name : 'Tom', grade: 'D', age:43},
{name : 'Jim', grade: 'A', age: 29})
.pipe(min<any>((a: any, b: any) =>
a.age < b.age ? -1 :1))
.subscribe(result => {
console.log('min result ', result.name);
});
// maxof({name : 'John', grade: 'A', age: 25},
{name : 'Krish', grade: 'B', age: 28},
{name : 'Mike', grade: 'B', age: 29},
{name : 'Jack', grade: 'C', age: 31},
{name : 'Tom', grade: 'D', age:43},
{name : 'Jim', grade: 'A', age: 29})
.pipe(max<any>((a: any, b: any) =>
a.age < b.age ? -1 : 1))
.subscribe(result => {
console.log('max result ', result.name);
});

Thanks for reading….:)

--

--