빅데이터/Spark

Spark RDD - Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

쌍쌍바나나 2016. 7. 1. 22:00
반응형

Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing (2012)에 나온 논문을 읽어보았다. 

  데이터 중간 결과를 재사용하는 iterative한 연산이 필요한 알고리즘들 machine learning and graph algorithms, PageRank, K-means clustering, logstic regression, data mining이 있다. 위 알고리즘을 MapReduce에서 처리를 하게 되면 Map과 Reduce의 사이에서 data replication, disk I/O, serialization의 overhead가 발생한다. 즉, MapReduce가 iteration에서 수행시간이 길게 나오는 이유는 각 iteration을 수행할때마다 stage간에 자료 공유를 HDFS를 통해서 한다. 결과적으로 HDFS를 통하면 File I/O가 빈번하게 발생하고, 수행시간이 느려질 수 밖에없다. 그렇다면 File I/O를 빈번하게 안하면 되는거 아닌가? 그래서 RAM에서 수행하자! 위 같이 RAM에서 수행하면, 쿼리를 할 때마다 처음부터 읽어오기 보다는, 한번 램에 데이터를 로딩하고 쿼리를 수행하기 때문에 속도가 빠르다.

  하지만 데이터를 한번 램에 올린 이후에 fault-tolerant가 발생하면 어떻게 하느냐가 문제, 다시 메모리에 올리는 작업을 해야할까? 어떻게 효율적으로 처리를 할 수 있을까.. replicating을 하거나 check point를 disk에 써놓으면 되나? 라고 생각할 수 있지만 replication을 하는 동안은 연산이 수행되지 않고, disk에 check point를 써놓으면 I/O가 발생하기 때문에 느리다. 

그럼 RAM의 내용을 update하지 말고, read-only로 사용하면 된다. HDFS에서도 수정이 안되는 파일 시스템을 사용한다. 단순화가 여기서는 포인트가  될것 같다. RAM을 read-only로 써보자는 내용을 근간해 등장한게 바로 Resilient Distributed Datasets이다. 

  Resilient Distributed Datasets의 특성은 immutable, partitioned collections of records의 특징이 있다. 즉 records가 모여있는 수정이 불가능한 객체를 말한다. 생성하는 방법은 storage에서 데이터를 불러와 RDD로 변환하거나 RDD에서 RDD로만 가능하다.  immutable이기 때문에 수정이 불가능해, read-only로만 사용이 되고, 생성되는 과정을 기록해 놓은 lineage를 통해  RDD객체를 다시 생성을 할 수 있다. 이렇게 하면 fault-tolerant의 문제를 해결할 수 있다. 

  위에서 언급한 lineage는 계보라는 뜻을 갖고 있고, DAG(directed acyclic graph)로 디자인이 되어 있다.  쉽게 말해서 데이터를 로딩하고, 일련의 과정을 기록하고 있다고 생각하면 된다. 이렇게 기록된 과정은 추후 fault-tolerant의 문제가 발생할때 내가 생성해 놓은 RDD를 이전 lineage를 보고 생성하면 되기 때문에 빠른 복구가 가능하다. 데이터의 형태를 저장한게 아닌 DAG를 저장하기 때문에 cost 또한 크지 않다. 여기서 만약 DAG로 생성된 일련의 과정을 한번씩 수행하면서 데이터를 transformation을 하는게 아니고, 마지막에 모든 과정을 요약해 한번에 action을 한다. 예를 들어서 A->B->C->D 의 과정이 있을때, A->B를 한번 수행하고 결과를 ->C로 보내는게 아닌 B->C->D의 transformationn 과정들을 한번에 수행한다.

  


  위에서 언급했듯이 RDD의 Operation은 transformation, action 두개가 있다. transformation은 map, filter, filterMap, join 등의 연산, action은 save, count 등의 함수를 지원한다. 잘 살펴보면 데이터의 값을 변경시키는 작업을 transformation, 값을 가져오는 작업을 action으로 생각하면 된다. 영화 찍을때도 액션~~ 나한테 줘 결과

  앞에서 언급하듯이 transformation, action의 과정을 lazy-execution이라고 한다. 우리가 방을 더럽히고 한번에 치우는 사람을 게으른 사람이라고 한다. RDD의 연산도 게을러 터지게 연산을 수행한다. 하나씩 처리하는게 아니라, 한방에 연산을 처리한다. 아무리 transformation의 연산을 수행해도 아무일도 일어나지 않고 DAG를 생성한다. 인터프리터에서 transformation의 연산을 수행하고 나서 "응? 벌써 계산했어?  왜이리 빨라 대박 역시 spark"라고 생각이 든다. 하지만 이 작업은 lineage를 생성한 것이다. "역시 spark"만 맞는 말인셈이다. 이제 action의 연산을 수행하면 그제서야 쌓여있었던 tranasformation 연산들을 한번에 수행한다. 이처럼 수행하는 것을 lazy-execution이라고 말한다.

  근데 왜 lazy-execution이 좋은거지?... 한방에 할일을 몰아놓고 처리하면, 게으르다며 그럼 안좋은거잖아.. 물론 그렇지 하지만 게으르게 하는 동안 이놈은 DAG, 즉 lineage를 생성하지 않는가... 준비를 하고 있는 것이다. 내가 얼마나 자원을 사용할 것인가 자원을 배치하고, 이 앞선 모든 transformation의 연산의 코스트를 미리 계산해 최적으로 계산한다. 일을 할때는 큰 일을 먼저 그려보고 그 안에 일을 차근차근 하는게 효율적일 수 있다. 그럼 내가 우선 순위로 해야할 일 부터 처리가 가능하니까

  Spark에서는 두가지 type의 dependency가 존재한다. 하나는 narrow dependencies이고, 다른 하나는 wide dependencies 입니다. 간단하게 말하면 narrow는 한 노드에서 처리 할 수 있는 일은 모아서 하는게 좋다. wide는 모든 노드에 있는 자료를 가져와서 처리하는 것을 말한다. 무엇이 좋겠는가? 당연 narrow dependency가 좋다. 하나의 노드에서 작업으르 할 수 있다는 말이고, 그 말은 노드간에 network I/O가 없으니 결과적으로 엄청 빠르다. 그렇다면 wide는 모든 노드에서의 작업을 모아서 연산을 수행하기 때문에 network I/O가 발생하고, 결과적으로 느리다. 또 하나의 특징은 만약 fault-tolerant가 발생했을때, 한 노드에서만 연산을 했던 narrow의 경우에는 복구의 비용이 저렴하지만, 여러 노드의 dependency가 있는 wide의 경우에는 복구 비용이 크다. 

  Spark에서 job stage를 어떻게 수행하는지에 대한 내용입니다. 작은 네모는 partition을 말하고, partions의 집합은 RDD를 나타냅니다. partition의 색이 검정색은 이미 memory에 있는 partition을 의미합니다. 각 stage의 narrow dependency와 stage간의 wide dependecy를 수행합니다. 여기서 볼 수 있듯이, 이미 정해진 DAG에 따라서 계산을 하고 있습니다. 이미 계산된 파티션은 수행하지 않습니다. 파티션이 수행될 노드는 data-locality를 고려해 결정이 됩니다.

  최초에 데이터가 들어오면 데이터는 파티션으로 나누어져, 각 머신에 분배가 됩니다. 우리가 프로그램을 실제로 실행하는 driver에서는 operation을 worker노드에서 수행됩니다. 필요에 따라 파티 내용이 다른 머신으로 shuffle 되기도 합니다.

  각 worker node에서 작업을 수행하는데 메모리가 부족하면 어떻게 되느냐, 일단 spark에서는 기본적으로 LRU(Least Recently Used)알고리즘을 통해서 오랜 기간 동안 사용되지 않았던 파티셔 부터 정리를 합니다. 기본적으로 원본을 다시 가져올 수 있기 때문에 cache랑 유사하게 동작한다고 생각하시면 됩니다.

  연산을 하는 도중에 fault가 나면 앞서 언급한 lineage를 통해 복구하면 됩니다. lineage는 용량이 작기 때문에 cost역시 크지 않습니다. 특정 파티션에 문제가 생기면 다른 노드에서 가져와서 실행을 합니다. 하지만 wide dependency의 경우에는 모든 파티션을 훑어야 하므로 복구하는 비용이 큽니다. 그럼 checkpoint를 디스크에 써 놓는게 더 유리할 수 있습니다. fault가 났을때 한가지더! 궁금한게 생길 수 있습니다. narrow의 경우에는 하나의 노드를 복구하기 때문에 해당 노드의 파티션만 복구하면 되지만, 만약 wide의 경우에는 파티션을 복구 하는 과정에서 다른 파티션에 영향을 주지 않을까...? 아니면 한 노드에서 복구하는 과정에서 다른 노드의 연산이 멈추면? 이런 문제는 신경을 쓸 필요가 없습니다. 처음에 언급한 RDD의 특징인 read-only이기 때문에 복구를 하는 도중에 다른 partition의 값은 변경을 할래야 할 수가 없습니다. 그렇기 때문에 background에서 aysnc로 수행을 하면 됩니다. 


[참고] http://www-bcf.usc.edu/~minlanyu/teach/csci599-fall12/papers/nsdi_spark.pdf



반응형

'빅데이터 > Spark' 카테고리의 다른 글

[Spark] Optimizing Transformations and Actions  (0) 2016.07.15
[Spark] RDD Architecture  (0) 2016.07.15
Spark Shell을 이용한 간단한 예제 및 앱 배포 방법  (0) 2016.06.16
Spark - RDD  (0) 2016.06.16
Spark 클러스터 구조  (0) 2016.06.13