Promise ReactiveX

ReactiveX

필자는 nodejs 서버를 개발할때 주로 Promise 패턴을 사용한다.
비동기 패턴중 상당히 유명한 async.js 도 널리사용하는듯하나, bluebird Promise 에서 async.js에서 지원하는 모든 함수들을 커버가가능하고, 가장 마음에드는 부분은 chain 형식으로 문법을 제공하여 Promise 패턴이 async 보다 코드 가독성이 좋다고 판단되서다.
bluebird official api
다른방식의 비동기 패턴을 써보고싶은마음에 우연히 발견한 ReacticeX 에대해서 소개해보고자 한다.


Promise와 ReactiveX

우선 ReactiveX 를 줄여서 rx 또는 observer 패턴이라고 표현을 많이들한다.
rx 는 Microsoft에서 개발되어 angular 2.0 에서 기본으로 탑재 되기도하였고 현재 많은 개발언어에서 third party library 로 반응형 프로그래밍(observable)을 사용할수있도록 자리를 잡은것 같다.
Promise든, ReactiveX 를 학습하게되면 필자가 생각하는 이점은 다음과 같다.

  • 안드로이드 진영에서 Promise 또는 ReactiveX(RxJava)로 개발할수있다.
  • IOS 진영에서 Promise 또는 ReactiveX(RxSwift)로 개발할수있다.
  • javascript 에서 promise.js 또는 rx.js 를 지원하므로 back-end(nodejs) 나front-end 에서 개발할수있다.

결론은 Promise 패턴이나 ReactiveX 를통한 반응형 프로그래밍을 습득하게되면, 모든 플랫폼에 패턴을 통일시킬수 있다.
nodejs 로 개발한 서버개발자가 ios 코드를 보게되더라도 패턴의 통일화로 어느정도 로직은 알아볼수있단 뜻이다. (물론 개발언어는 직접습득해야겠죠)
한가지 플랫폼에 종속받지않고 다양한 플랫폼을 넘나들면서 개발하는 개발자들은 각자가 추구하는 패턴을 사용하여서 통일시키면 알아보기도 쉽고 개발생산성도 좋아지지않을까 ?
본론으로 돌아가 필자는 ReactiveX 반응형 프로그래밍을 사용하기위해 Promise로 작성된 nodejs 서버의 일부 코드를 ReactiveX 코드로 수정하여 사용한 느낌을 적어보도록하겠다.


ReacticeX 설치와 사용법

일단 nodejs에서는 npm을통해 한줄의 명령어로 아주 쉽게 설치가능하다.
필자는 ReactiveX 를 rx 라고 지칭하도록하겠다.


1
$ npm install rx --save



필자는 nodejs 에서 query를 좀더 쉽게 수행할수있도록 (transaction 등) queryHelper라는 모듈을 만들어 사용중이다.
이 Promise로 작성된 queryHelper 모듈을 rx 로 변경해보았다.
먼저 기존에 개발된 queryHelper 모듈은 다음과 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
function getConnection(){
return new Promise((resolve, reject) => {
mysql.createPool(conf.db).getConnection((err, connection) => {
if(err){reject(err);return false;}
resolve(connection);
});
});
};
function doQuery(connection, resource){
return new Promise((resolve, reject) => {
connection.query(
(typeof(resource.query) === 'function') ? resource.query(resource.data) : resource.query
, (err, data) =>{
if(err){reject(err);return false;}
if((resource.expect||'many') === 'single'){
if(Array.isArray(data)) resolve(data[0])
else resolve(data)
}else{
if(data.length === 0) resolve(undefined)
else resolve(data)
}
});
})
}
function execute(resource){
return new Promise((resolve, reject) => {
getConnection().then((connection) => {
return doQuery(connection, resource).then((data) => {
connection.release();
resolve(data);
}).catch((err) => {
connection.release();
reject(err)
});
}).catch((err) => {
reject(err);
});
});
}
function transaction(resources){
return new Promise((resolve, reject) => {
getConnection().then((connection) => {
connection.beginTransaction(err => {if(err){reject(err);return false;}});
return Promise.mapSeries(resources, (resource ,index ,length) => {
return doQuery(connection,resource);
}).then((data) =>{
connection.commit((err)=>{
connection.release();
if(err){reject(err);return false;}
resolve(data);
})
}).catch((err) => {
connection.rollback(() =>{
connection.release();
reject(err);
})
})
}).catch((err) => {
reject(err);
});
});
}



다음은 ReactiveX로 코드를 변경해보았다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
function doQuery(connection, resource){
return new Promise((resolve, reject) => {
connection.query(
(typeof(resource.query) === 'function') ? resource.query(resource.data) : resource.query
, (err, data) =>{
if(err){reject(err);return false;}
if((resource.expect||'many') === 'single'){
if(Array.isArray(data)) resolve(data[0])
else resolve(data)
}else{
if(data.length === 0) resolve(undefined)
else resolve(data)
}
});
})
}
function getConnection(){
return Rx.Observable.create(observer => {
mysql.createPool(mysqlConfig).getConnection((err, connection) => {
if(err){observer.onError(err); return false;};
observer.onNext(connection);
});
});
};
function execute(resource){
return Rx.Observable.create(observer => {
getConnection().subscribe(
connection => {
return Rx.Observable.fromPromise(doQuery(connection, resource)).subscribe(
data => { observer.onNext(data)},
err => {observer.onError(err); }
)
},
err => {
observer.onError(err);
}
,
() => {}
)
});
}
function transaction(resources){
return Rx.Observable.create(observer => {
getConnection().subscribe(
connection => {
return Rx.Observable.of(resources)
.mergeMap(q => Rx.Observable.forkJoin(q.map(r => {
return doQuery(connection,r);
})))
.subscribe(
data => {
connection.commit((err)=>{
connection.release();
if(err){observer.onError(err);return false;}
observer.onNext(data);
})
},
err => {
connection.rollback(() =>{
connection.release();
observer.onError(err);
})
}
)
},
err => {
observer.onError(err);
},
() => {
}
)
});
}



필자가 생소하기도하고, 어려웠던부분은 다음과같다.
두개의 로직을 보면 rx로 작성된 queryHepler에서도 doQuery() 함수부분은 Promise로 작성된 로직과 같은걸 볼수있다.
원래 rx의 doQuery()함수는


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function doQuery(connection, resource){
return Rx.Observable.create(observer => {
connection.query(
(typeof(resource.query) === 'function') ? resource.query(resource.data) : resource.query
, (err, data) =>{
if(err){observer.onError(err); return false;}
if((resource.expect||'many') === 'single'){
if(Array.isArray(data)) observer.onNext(data[0])
else observer.onNext(data);
}else{
if(data.length === 0) resolve(undefined)
else observer.onNext(data);
}
});
});
}



이렇게 작성하였으나, 이렇게 작성할경우 execute() 함수는 제대로 실행이되나, transaction() 함수는 제대로 실행되지않았다. 이유는 즉슨 transaction()함수에서


1
2
3
4
5
6
7
8
function transaction(resources){
...
return Rx.Observable.of(resources)
.mergeMap(q => Rx.Observable.forkJoin(q.map(r => {
return doQuery(connection,r);
})))
...
}



Promise.all() 과 같은 역활을 하는 Rx.Obervable.forkjoin() 인자값으로는 스트림(stream)들의 위치하게된다.
Promise.all 또는 Promise.map 와 Rx.Observable.forkjoin()의 다른점은, Rx.Observable.forkjoin()의 인자값에는 Rx.Observable.of(‘data1’),Rx.Observable.of(‘data2’) … 와같은 순수 스트림 데이터들이 위치하게되는 반면, Promise.all() 또는 Promise.map() 은 인자값으로 [ Promise.resolve(‘data1’), Promise.resolve(‘data2’) ] 와같이 Promise들이 위치되는것은 같으나, 좀더 나아가 각 Promise들 마다 로직을 수행할수있다는 점이다.
아래의 코드를 보면 이해가될것이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Promise.all
Promise.all(
[
Promise.resolve('fakeData1').then(data => 'realData1'),
Promise.resolve('fakeData2').then(data => 'realData2'),
])
.then(result => {
console.log('Promise result => ' + result);
}).catch((err) => {
next(new Error(err));
})
// Promise style을 rx style로 변경.
// Promise style처럼 forkJoin()에서는 각각의 스트림에 세부적인 로직을 정의할수없음 .
// 에러뜸
Rx.Observable.forkJoin(
Rx.Observable.of('fakeData1').subscibe(data => 'realData1'),
Rx.Observable.of('fakeData2').subscibe(data => 'realData2')
)
.subscribe(
result => console.log(result),
err => console.log(err)
);
// Rx.Observable.forkJoin() 이렇게 사용해야함.
Rx.Observable.forkJoin(
Rx.Observable.of('realData1'),
Rx.Observable.of('realData2')
)
.subscribe(
result => console.log('rx result => ' + result),
err => console.log(err)
);



예제에서 보시다시피 Rx.Observable.forkJoin() 함수는 인자값내에선 적당한 로직이있는 Rx.Observable을 사용할수없으며 로직수행후 처리할 로직이있다면, 로직수행후 callback(onNext()) 내부에서 또다른 Rx.Observable을 구현해야할것이다 .( callback hell 이 생각나지 왜 ..)
rxjs forkjoin() 의 sample 코드도 promise를 사용하는것을 확인할수있다.
그래서 필자는 실제 쿼리를 수행하는 doQuery()를 Promise로 기존의 코드를 그대로 두고, execute() 함수에서 doQuery()함수 호출시 Promise를 Rx.Observable 로 변환해주는 Rx.Observable.fromPromise() 함수를 사용하였다.


견해

이렇게 기존에 Promise로 작성된 queryHelper모듈을 rx style로 변경을 해보았다.
필자도 rx를 언젠간 써봐야지 하고 미뤄두다가 써보니 좋은점도 있고 그렇지않은점도 있는것 같다.
다음은 극단적인예로, 로직을 처리하기위해 사전에 먼저 수행되어야할 로직이있다면 callback을 받아 새로운 로직을 구성하는 Promise와 rx로 작성해보았다. (의존관계의 로직처리)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Promise version
Promise.resolve({'key1':'value1'}).then((data)=>{
data.key1 = 'newValue1'; //데이터가공
return Promise.resolve(Object.assign(data,{'key2':'value2'})) //가공된 데이터가 새로운 Promise에 필요
}).then((data) => {
data.key2 = 'newValue1'; //데이터가공
return Promise.resolve(Object.assign(data,{'key3':'value3'})) //가공된 데이터가 새로운 Promise에 필요
}).then((data) => {
console.log(data);
})
.catch((err) => {
next(new Error(err));
});
// rx version
Rx.Observable.of({'key1':'value1'}).subscribe(data => {
data.key1 = 'newValue1'; //데이터가공
Rx.Observable.of(Object.assign(data,{'key2':'value2'})).subscribe(data => { //가공된 데이터가 새로운 Rx.Observable에 필요
data.key2 = 'newValue2'; //데이터가공
Rx.Observable.of(Object.assign(data,{'key3':'value3'})).subscribe(data => { //가공된 데이터가 새로운 Rx.Observable에 필요
console.log(data);
},
err => new Error(err),
() => {}
)
},
err => new Error(err),
() => {}
)
},
err => new Error(err),
() => {}
);



Promise는 chain 형식으로 문법을 사용할수있어 callback hell을 피할수있으며, 한눈에 보기에도 rx보다 가독성이 좋다
반면 rx 의 경우에는 불가피하게 중첩 callback이 생겨 Promise보다는 가독성이 떨어진다.
대신 rx 의 장점은 우선 제공되는 함수가 엄청나게 많아 제대로 사용하게되면 불필요한 로직을 대폭 줄일수있을 것 같고, 그에따른 학습의 진입장벽이 조금 있을것 같다.
필자는 learn rxjs 를 참고하였다.
간단하게나마 rx 를 사용해보았지만, 반응형 프로그래밍이 요즘 뜨고있으니 무작정 사용해야지(필자는 이런생각을 했다.) 라는 생각보다는 상황에 맞게 Promise와 rx를 적절하게 같이 사용하는것이 가장 좋은 방법이라 생각이든다.
rx에 대해서 의견이 있으시면 언제든지 댓글을 남겨주시면 저도 열심히 배우도록하겠습니다 ….