Optimizing Transformations and Actions
아래 내용을 중심으로 학습을 진행했습니다.
- Use advanced RDD operations
- Identify what operations cause shuffling
- Understand how to avoid shuffling when possible
- Group, combine, reduce key-value pairs
Advanced RDD Operations
advanced RDD의 연산에 대해서 알아보도록 하겠습니다.
- Numeric RDDs는 statistical 연산을 할 수 있습니다. 해당 연산을 통해 standard deviation, sum, mean, max, min, 등의 통계적이 연산이 가능합니다.
- mapPartitions - partition level에서의 기능을 제거하고 map과 유사합니다. 이 연산은 map function이 record당 높은 overhead의 cost를 갖고 있을때 특히 유용합니다. 예를 들어서 DB에 접근을 한다고 할때, 각 record를 각각 접근하는게 아니라 파티션 당 하나씩 접근이 가능하기 때문에 높은 성능을 보입니다. mapPartitionsWithIndex는 파라미터로 파티션의 index를 추가해주는 역할을 합니다. 각 파티션을 기준으로 map을 돌리느냐 아니면 각 record별로 map을 수행하느냐의 차이로 이해하면 쉽습니다.
- foreachPartition - foreach를 파티션을 기준으로 수행한다고 생각하시면 됩니다. 만약 web service또는 외부 저장소로 batch 연산을 하고 싶을때 사용하면 record를 기준으로하는 foreach와 다르게 파티션을 기준으로 수행하기 때문에 효율적으로 처리가 가능합니다.
- fold - reduce의 special case입니다. initial zero accumulator와 function을 넘겨주면, 그 function은 RDD의 요소들과 accumulator 사이에 연산을 수행하고, 최종 accumulator를 리턴합니다.
- aggregate - fold와 유사하고, 2가지 기능을 수행합니다. 각 파티션에서 수행, 각 파티션의 accumulator로부터 결과를 통합(combine)
- countByValue - 각 값이 발생하는 개수를 카운팅할때 사용하면 좋습니다. 내부적으로 pair RDD로 mapping이 된 이후에 countByKey가 호출됩니다.
Operations on Key/Value RDDs
key/value RDDs에만 사용이 가능 operations에 대해서 알아보도록 하겠습니다.
- reduceByKey - 모든 key의 값들에 대해서 reduce를 수행합니다. 각 파티션에서 각 key의 결과를 통합합니다.
- countByKey - reduceByKey와 유사한 역할을 하지만 map에 있는 결과를 driver로 전송해주는게 차이가 있습니다. driver로 전송을 하기 때문에 테스트를 하는 경우에만 사용하는게 좋습니다. 너무 많으 결과가 driver로 전송이되면 그 만큼 network I/O가 발생하고, driver의 memory도 고려해야 하기 때문
- aggregateByKey, foldByKey - key를 기준으로 동작합니다.
- groupByKey - 모든 파티션에으로 부터 key를 기준으로 모든 값들을 그룹핑합니다. 이 작업은 shuffling이 발생하기 때문에 spark성능에 안좋을 수 있으니 주의해서 사용을 해야합니다. 너무 큰 데이터에서 사용할 경우에는 OOM이 발생할 수 있습니다.
- Lookup - 특정 키를 갖는 모든 값을 반환해주는 역할을 합니다.
- mapValues - 키를 변경시키지 않고 각 value에 map function을 적용하는 것을 말합니다. map 대신에 mapValues를 사용하면 spark는 이전 파티셔닝이 유효한지 알고 있기 때문에 재 파티셔닝을 할 필요가 없습니다.
- repartition과 sort를 파티션내에서 하게 되면 repartition과 key를 기준으로 sort가 하나의 스텝에서 모두 동작을 합니다. 이 방법은 repartition을 한 뒤에 sortByKey를 호출하는 것보다 더 효율적입니다.
Example: Average Per Key
aggregate functions을 어떻게 사용하는지에 대해서 알아보도록 하겠습니다. 또한 왜 groupByKey를 피해야만 하는지에 대한 설명도 하겠습니다.
예를 들어 RDD에 있는 각 key의 값들의 평균을 계산하고 싶습니다. 이 과정은 groupByKey를 호출한 뒤에 각 키의 합과, 개수를 나누어 평균의 값을 계산하면 됩니다. 하지만 이 방법은 효율적인 계산을 하는 방법이 아닙니다.
위 방법보다 좀더 효율적인 방법은 key를 기반으로 aggregate를 하는 방법입니다. 일단 accumulator를 0으로 초기화를 합니다. 첫번째 function은 주어진 key의 모든 값들을 iterate를 하며, key의 값에 대해서 accumulator의 값을 하나씩 증가를 합니다. 두번째 function에서는 각 partition으로 부터 주어진 key별로 accumulator를 통합(combine)을 합니다. 마지막으로 mapValues를 호출하면 각 key별로 average를 계산할 수 있습니다.
위 두 방법중에 groupByKey는 records가 이미 같은 파티션에 co-located가 되어있다 하더라도, 모든 값들이 network를 통해 shuffle이 되고, 키 별로 평균을 계산하기 때문에 추가적으로 network I/O가 발생할 뿐만아니라, 더 많은 메모리를 사용하게 됩니다.
그와 반대로, aggregateByKey를 두가지 단계로 나누어 계산을 하게 되면, 오직 하나의 key의 쌍이 하나의 파티션에 shuffle되기 때문에 상당한 I/O와 memory 사용량을 줄일 수 있습니다.
Another Example
각 key의 첫번째 값을 얻어오기 위한 방법입니다. 만약에 groupByKey를 호출한 뒤에 map/mapValues를 수행하고 각 그룹의 첫번째 값을 가져올 수 있습니다. 하지만 이 문제는 memory의 사용량이 늘어난다는 것입니다. 더 좋은 해결 방법은 reduceByKey를 사용하는 것 입니다. 각 pair의 값들이 전달됨으로써 비교가 가능하고 record를 가져올 수 있습니다. 이 방법은 memory overhead를 발생하지 않고, 각 파티션의 모든 키에 대해서 효율적으로 수행이 가능합니다.
Asynchronous Actions
Actions은 blocking operation입니다. 즉, transformations이 lazily하게 실행이 되는 동안, count, collect, foreach와 같은 action을 수행합니다. driver는 해당 작업이 끝날때 까지 대기를 합니다. 그렇기 때문에 끝나는 시점을 알기 위해 callback함수를 구현할 수 있습니다. 만약 여러개의 async 연산이 필요하다면 FIFO job 스케줄러는 순차적으로 연산을 처리하기 때문에 FIFO job 스케줄러를 사용하면 안됩니다. 그렇기 때문에 actions 연산을 parallel하게 하기 위해서는 Fair 스케줄러를 사용하면 됩니다.
Batching Output
Spark job으로 데이터를 뽑기 위해서는 스탠다드 save APIs를 사용하면 됩니다. 이때 foreach와 foreachPartition에 대해서 정확하게 이해하는게 중요합니다. foreach는 record를 하나하나 수행하는 것을 말합니다. 이 처리는 큰 양의 데이터를 처리하는 batch작업에는 foreach를 사용하는 것은 비효율적입니다. 이때는 foreachPartition이 더 적절합니다. foreachPartition은 각 파티션내에 있는 모든 records의 iterator를 parallel하게 수행하기 때문에 효율적입니다. ODBC, JDBC에 데이터를 저장하거나, message, REST endpoints로 custom ouput을 보내기 위해서도 foreachPartition을 사용하면 됩니다.
Broadcast Variables
Broadcast variables은 read-only값으로 driver program에서 모든 executors로 보내집니다. broadcast variables은 RDD 연산을 통해 검색이 가능합니다. 이 변수는 모든 executors에서 공유하는 변수이기 때문에 shuffling이 발생하지 않습니다. 즉 보내놓고 그냥 서로 공유해서 사용하기 때문에 효율적으로 사용이 가능합니다.
[참고] Big Data University - Spark Fundamentals II
'빅데이터 > Spark' 카테고리의 다른 글
[Spark] RDD 영속화(캐싱) - 이론 및 예제 (0) | 2016.07.25 |
---|---|
[Spark] Caching and Serialization (0) | 2016.07.15 |
[Spark] RDD Architecture (0) | 2016.07.15 |
Spark RDD - Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing (0) | 2016.07.01 |
Spark Shell을 이용한 간단한 예제 및 앱 배포 방법 (0) | 2016.06.16 |