PySpark 에서 NoneType을 Filtering 하는 방법 (any, all, subset)

pyspark에서 drop method는 NULL을 가진 행을 제거하는데 가장 간단한 함수다. 기본적으로 NULL 값을 가진 행을 모두 제거를 한다. 모든 컬럼이 NULL인 경우 제거를 하고, 하나의 컬럼이 NULL인 경우 제거를 하고 싶은 경우가 있을것이다. 이런 경우에 어떻게 다르게 진행하는지 "any", "all"을 통해 설명

위 작업을 SQL에서 진행한다면 WHERE 절에 해당 컬림이 NULL인지 체크하는 구문을 넣어야 한다. 만약 모든 컬럼에 대해서 해야 하면? 모든 컬럼을 명시해야하는 단점이 있다. (SELECT * FROM TABLE WHERE COL1 IS NOT NULL)

  • drop 메소드에 인수
    • any: 모든 행의 컬럼값 중 하나라도 NULL의 값을 가지면 해당 행을 제거
    • all: 모든 컬럼 값이 NULL이거나 NaN인 경우에만 해당 행을 제거

위에서 보면 항상 "모든"이 조건이 된다. 하지만 drop 인자중에 subset을 이용하면 특정 컬럼에 대해서만 drop을 진행할 수 있다. 아래 소스코드 참고

소스코드

Pyspark NoneType, Null, Blank, Empty String 필터링 하는 방법

pyspark에서 NoneType, NULL, Blank, Empty String 등 사용지 않는 값을 필털이 하는 방법에 대해서 설명을 한다.

  • SQL로 진행하면 컬럼 하나 하나에 대해서 WHERE 절을 이용해서 필터링을 해야한다.
  • 하지만 spark에서 Condition을 생성하고, 해당 Condition을 filter() 함수의 인자로 넘겨주면, 동일한 조건으로 모든 컬럼에 적용이 가능하다.
  • 코드에서 살펴볼 내용
    • None, Null Empty String이 포함된 Row만 추출이 가능
    • None, Null, Empty String을 제외한 모든 Row만 추출도 가능
  • 코드 설명
    • 간단하게 각각 컬럼에 공통적으로 적용할 condition을 입력을 하고, map함수를 이용해 모든 컬럼에 적용을 한다.
    • 마지막에는 reduce함수를 이용해 condition을 모두 "&"(and) 또는 "|"(or) 로 연결하면 condition은 정의 완성
    • 정의된 condition을 filter함수에 넘기면 내가 원하는 Row만 추출이 가능하다.
    • Condition 예)Column<(((CODE IS NOT NULL) AND (NOT (CODE = ))) AND ((TYPE IS NOT NULL) AND (NOT (TYPE = ))))>

소스코드

  Spark에서 제공하는 라이브러리 외에도 형태소 분석기나, 기존에 우리가 사용하던 라이브러리를 사용하는 방법에 대해서 설명한다. 클러스터의 모든 슬레이브에 /var/lib/의 이하 폴더에 사용할 라이브러리를 추가해 놓은 상태입니다. spark-shell이나 spark-submit을 할때 아래와 같이 --driver-class-path를 통해 라이브러리를 포함시키면 됩니다. 하지만 하나씩 하는건 무리가 있으니 아래와 같이 실행하면 모든 library를 한번에 import 할 수 있습니다. 

$ spark-shell --driver-class-path $(echo /var/lib/spark/*.jar | tr ' ' ',')

  하지만? 위처럼 하면 아래와 같은 Exception이 발생을 합니다. class의 path는 submit할때 jar내에 포함된 main 클래스가 포함된 *.class의 파일을 가리킬때 사용을 합니다. (저도 --driver-class-path인줄알고 계속 하고 있었는데, 동작하길래 맞는건줄 알았는데, 알고보니까 분산작업이 아닌 코드에서만 동작을 했던거였고, 각 worker-node에서 라이브러리를 로딩하기 위해서는 아래와 같이 실행.)

ClassNotFoundException이 발생했다. Lost task 0.0 in stage 20.0 (TID 69, slave02): java.lang.NoClassDefFoundError

$ spark-shell —jars $(echo /var/lib/spark/*.jar | tr ' ' ',')



들어가며

  데이터 분석에서 가장 많이, 그리고 자주 사용하는 field가 timestamp의 값이 아닐까 싶다. 그 중에서도 timestamp의 raw데이터를 통해서 새로운 feature를 생성하는 방법이 있다. 예를 들어 날짜별 사용자 방문 수 라고 할때, timestamp의 값에서 우리는 month와 day를 추출해서 새로운 feature의 값으로 사용을 해야 한다. 

소스코드

  DataFrame의 데이터 타입인 df에서 select의 함수를 통해서 원하는 Column을 추출을 하는 과정이다. timestamp_milli의 값은 ms의 단위의 값이기 때문에 1000을 나눈뒤에 spark에서 제공하는 functions의 라이브러리를 import한 뒤에 사용하면 된다. from_unixtime을 통해 date형태의 String값으로 변환을 한 뒤에 month, dayofmonth, hour을 통해서 월, 일, 시간을 추출을 할 수 있다. 

org.apache.spark.sql.functions함수 확인하기

들어가며

  스파크에서 구현을 하다 보면 각 객체간의 변환(?)이 자유로워야 하는것 같다. 예를 들면 RDD에서 추출한 데이터를 DataFrame으로 생성한다든지, DataFrame에서 여러개의 Row를 추출해서 새로운 RDD를 생성한다는 식의 경우를 말한다. 이번에 당면한 문제는 Json Object의 요소를 저장하고 있는 Seq의 Collection을 갖고 있었고, 이 Collection을 바탕으로 DataFrame의 생성이 필요했다. 이번에는 Seq[org.apache.spark.sql.Row]의 데이터를 RDD와 DataFrame으로 변환하는 방법에 대해서 정리를 해보려고 한다. 

소스코드

  logData는 org.apache.spark.sql.Row의 값을 갖고 있다. logData에서 field명이 object인 값을 추출을 하려고 한다. 이때 field의 값 형태가 Json Object의 List로 이루어져 있다. 앞서 글에서 언급한바와 같이 getSeq[Row]를 통해서 Json Object의 리스트의 값을 가지고 왔다. (자세한 내용은 아래 포스팅을 참고하시면 됩니다.) 

[Spark] Json 포맷 처리하기 - Json Object List 

  이때 getSeq[Row]를 통해 얻어낸 객체는 Seq[org.apache.spark.sql.Row]의 형태를 갖고 있습니다. 그럼 이 데이터를 어떻게 RDD로 생성하고, DataFrame의 객체로 변환하기 위해서 sc.parallelize로 위에 얻어낸 Row의 Seq를 넘겨 rdd를 생성하고, 생성된 rdd와 함께, Row의 schema를 함께 넘겨주시면 DataFrame의 객체를 얻을 수 있습니다. 이때 data.apply(0)을 넘긴 이유는 Seq에서 첫번째 Row의 정보를 가져왔고, 나머지와 동일하기 때문에 첫번째 Row의 StructType를 이용했습니다. 마지막으로 DataFrame의 show()를 호출하면 테이블을 확인하실 수 있습니다.



들어가며

  스파크에서는 CSV, Json, Protocol Buffer, Hadoop에서 지원하는 데이터 포맷 등 다양한 포맷을 지원을 한다. 이번에는 Json파일을 읽어서 스키마를 확인을 하고, 스키마에 있는 필요한 데이터를 추출하는 방법에 대해서 알아보려고 한다. 데이터는 공개되어 있는 tweet 데이터를 사용하였고, tweet데이터에서 내가 필요한 데이터를 추출하기 위한 과정을 소스코드로 작성해 보았습니다.

설명

  소스 코드는 tweet의 데이터를 읽어들인 이후에 schema를 확인을 하여 데이터의 포맷이 어떻게 이루어져 있는지 확인을 합니다. (아래 스키마 그림 첨부 했습니다.)  스파크는 lazy execution을 하기 때문에 결과를 확인하기 위해 .take(N)을 사용했습니다. 처음에 tweets의 DataFrame의 객체를 생성을 한 뒤에, map의 연산을 통해서 DataFrame의 각 Row의 객체를 얻어옵니다. 여기서 tweet은 하나의 트윗을 나타내고, Row의 객체 형태를 갖고 있습니다. 

  하나의 Row에서 내가 원하는 field를 가져오기 위해서는 field의 인덱스를 알아야 합니다. 확인하는 방법은 tweet.fieldIndex("columnName")을 통해 확인이 가능합니다. 하나의 tweet의 id를 가져오기 위해서는 아래 스키마에서도 확인이 가능하듯이 long의 데이터 타입을 갖고 있기 때문에 tweet.getLong(tweet.fieldIndex("id"))를 통해서 가져올 수 있습니다.

  그렇다면 primitive의 형태의 데이터외의 데이터는 어떻게 가져와야 할까요. getList를 통해서 List객체를 반환을 받을 수 있습니다. 여기서 contributorsIDs를 가져오려고 한다면, array의 타입이기 때문에 tweet.getList(tweet.fieldIndex("contributorsIDs"))를 통해 가지고 올 수 있습니다. 

  구조체로 표현되어있는 필드의 값을 가져오기 위해서는 getStruct를 통해 가져올 수 있습니다. tweet.getStruct(tweet.fieldIndex("user"))를 통해 user의 구조체를 가져올 수 있고, 각각의 필드의 값은 다시 user의 필드인덱스를 통해서 접근이 가늫압니다. 예를 들어 이름을 가져오고 싶으면 tweet.getStruct(tweet.fieldIndex("user"))의 값을 val user로 할당을 한 뒤에, user.getString(user.fieldIndex("name"))을 통해 가져올 수 있습니다.

  항상 모든 예제는 내가 원하는 범위만큼은 제공하지 않지요. 그래서 제가 삽질을 한 내용을 말씀드리면, 아래와 같이 user가 만약에 Array형태의 Struct면 어떻게 가져와야 할까요? 만약 getList()를 통해 가지고 오게 되면, List[Nothing]의 에러가 발생합니다. 그래서 List에서 각 항목 예를 들면 user의 name을 가져와줘 라고 user.getString(user.fieldIndex("name"))을 하게 되면 'error: value getString is not a member of Nothing' 의 에러가 발생하게 됩니다. 만약 getSeq()를 쓰면 어떻게 될까요. 같은 문제가 발생합니다. 이 문제는 getSeq()를 하게 되면 List 내부에 있는 데이터의 타입을 명시적으로 작성을 해줘야 합니다. getSeq[Row]()로 Row의 DataType을 함께 넘겨주면 문제는 해결이 됩니다. Row...라고 알려줘야 필드에 들어있는 리스트의 구조체의 내부에 있는 값을 가져올 수 있습니다. 

  진짜 위 문제를 해결하기 위해서 반나절은 소비한것 같네요. 이 문제 때문에 해결하는 절차를 보면, 각 데이터를 추출 할 때마다 schema를 확인했습니다. 예를 들면 tweet에서도 user의 구조체를 추출을 하고 user.schema를 하면 schema의 값을 확인을 할 수 있었습니다. 하지만 만약에 getSeq()를 통해 추출한 List는 Nothing이라는 말만 내보내더군요. 아 그러니까 shcema가 없다는 것은 지금 이 List에 담겨있는 객체들이 어떤 객체를 나타내는지 모르는구나... 그래서 명시적으로 Row를 해주자 해서 해결을 했습니다.

소스코드

 

Tweet Schema



파티셔닝 예제 - 페이지랭크(PageRank) 알고리즘

  RDD 파티셔닝에 의한 효과를 볼 수 있는 좀 더 복잡한 알고리즘 예제로 페이지 랭크를 생각 할 수 있다. 페이지 랭크 알고리즘은 구글의 공동 창업자 Larry Page의 이름을 인용한 것으로 얼마나 많은 문서들이 해당 문서를 링크하고 있는지 기초하여 각 문서에 대해서 중요도를 매기는 알고리즘을 말한다. 페이지 랭크 알고리즘은 웹페이지의 중요도를 측정하는 척도로도 사용이 되지만, 과학 논문에서 어떤 논문이 중요한지 평가하거나, SNS의 영향력에 있는 허브유저를 찾아내는 데에도 사용이 되고 있다. 

  페이지 랭크의 단점은 많은 조인을 수행하는 반복알고리즘이다. 그렇기 때문에 많은 조인을 반복적을 효율적으로 처리가 가능한 RDD 파티셔닝을 이용하면 좋은 결과를 얻을 수 있다.(빠르게 중요도를 평가 할 수 있겠지요.)

  페이지 랭크 알고리즘은 (pageID, linklist)의 형태로 각 페이지와 그 이웃 페이지들을 가지고 있으며, 하나는 (pageID, rank)의 형태로 각 페이지의 현재 랭크를 관리합니다. 알고리즘은 아래와 같이 동작을 합니다.

  1. 각 페이지의 랭크를 1.0으로 초기화 한다.
  2. 매번의 반복 주기마다 페이지 p는 공헌치 "랭크(p)/이웃숫자(p)"를 이웃들(자신이 링크하고 있는 페이지)에게 보낸다.
  3. 각 페이지의 랭크를 0.15+ 0.85*(공헌치)로 갱신한다.

PageRank 알고리즘 소스코드

  • 여기서 links와 ranks가 계속 반복적으로 join되는 것을 볼 수 있다. 하지만 links RDD는 고정된 데이터셋이므로 시작점에서 바로 partitionBy()를 적용하였고, 결과적으로는 join을 할때 links는 셔플링을 할 필요가 없어졌다. 또한 persist()를 통해 메모리에 로딩을 하고 사용하기 때문에, 호출을 할때마다 links RDD를 다시 생성할 필요가 없다. 
  • ranks를 생성할때 links RDD의 파티셔닝을 유지하기 위해서 map()이 아닌 mapValues를 사용을 했다.
  • forloop에서 reduceByKey()다음에 mapValues()를 호출했다. reduceByKey()의 결과는 이미 해시 파티셔닝이되어 있으므로 다음 차례의 반복 주기에서 매핑된 결과와 다음 loop에서의 ranks와 links의 조인을 더 효율적으로 만든다. 

(파티셔닝 관련 최적화의 효과를 극대화하려면, 데이터의 키가 변경되어야 하는 경우가 아닐 때 반드시 mapValues()나 flatMapValues()를 사용해야 한다.)


사용자 파티셔너 지정

  파티셔닝은 스파크에서 성능에 상당한 영향을 주는 요소이다. 셔플링을 줄이기 위해서는 파티셔닝을 효율적으로 잘 해야 한다. 그럼 잘한다는게 어떤 말일지 풀어말하면 스파크에서는 HashPartitioner와 RangePartitioner를 이용해 다양한 상황에 적합하도록 구현이 되어있습니다. 하지만 위에 페이지 랭크와 같은 경우에는 어떻게 파티셔닝을 하는게 효율적일지 생각해보면 HashPartitioner와 RangePartitioner를 이요하기 보다는 Partitioner 객체를 만들어서 튜닝을 하는게 더 효율적입니다. 물론 스파크에서도 이처럼 사용자가 Partitioner를 생성을 할 수 있습니다. 

  그렇다면 왜 HashPartitioner와 RangePartitioner를 사용하는게 효율적이지 않을까요. 예를 들어서 페이지랭크에서는 key로 사용해야 하는 값이 각 페이지의 ID(URL)입니다. 기존 파티션을 사용한다면 유사한 URL을 갖고 있는 사이트가 다른 노드에 해싱이 되어 버릴 가능성이 있습니다. 하지만 페이지는 동일한 도메인 안에 있으면 당연히 서로 링크가 많이 되어 있을 것을 알고 있습니다. 그렇기 때문에 결과적으로는 같은 도메인을 갖고 있는 페이지는 동일한 노드에 파티셔닝이 되는것이 유리합니다. 이처럼 도메인 네임에 대해서 해싱을 하는 Partitioner를 구현하면 더 효율적으로 파티셔닝을 할 수 있습니다. 

  사용자 지정 파티셔너를 구현하기 위해서는 org.apache.spark.Partitioner 클래스를 상속받고, 세 개의 메소드를 오버라이딩 하면 됩니다. 

  • numParititions: Int, 생성할 파티션의 개수
  • getPartition(key: Any) 주어진 키에 대한 파티션ID를 반환
  • equals(): 동일한 객체인지 확인

소스코드

[참고] 서적 - 러닝 스파크

RDD데이터 파티셔닝 - 이론 및 예제 

  이번에 설명한 내용은 스파크에서 노드 간 데이터세트의 파티셔닝을 어떻게 제어할 것인가 하는 것이다. 분산 프로그램에서 통신은 비용이 매우 크므로 네트워크 부하를 최소화할 수 있는 데이터 배치는 프로그램 성능을 비약적으로 향상시킬 수 있습니다. 비분산프로그램이 수많은 데이터 레코드 처리를 위해 올바른 자료 구조를 선택할 피ㄹ요가 있는 것처럼, 스파크의 애플리케이션도 네트워크 비용을 줄이기 위해서는 RDD의 파티셔닝을 제어해야 합니다. 파티셔닝은 조인 같이 키 중심의 연산에서 데이터세트가 여러번 재활용 될 때만 의미가 있습니다. 

  스파크의 파티셔닝은 모든 RDD의 키/값 쌍에 대해 가능하며, 시스템이 각 키에 제공된 함수에 따라 값들을 그룹화하도록 합니다. 스파크에서는 각 키가 어떤 노드로 전달되는지 같은 명시적인 제어를 제공하지는 않지만, 어떤 키의 모음들이 임의의 노드에 함께 모여 있게 되는 것은 보장해 줍니다. 예를 들면 Hash-Partition과 Range-Partition이 있습니다. HashPartition은 100으로 나눈 나머지에 대해 동일한 해시 값을 갖는 키들은 동일한 노드에 오게 됩니다. RangePartition은 같은 범위의 키들이 같은 노드에 모이도록 RDD를 범위별로 파티셔닝을 수행한 것을 말합니다.

  예를들면 메모리에 커다란 사용자 정보 테이블을 가지는 애플리케이션을 생각해보면이 프로그램은 (UserID, UserInfo)의 쌍의 RDD를 쓰며 UserInfo는 사용자가 구독하는 주제 리스트를 갖고 있습니다. 애필리케이션은 주기적으로 이 테이블을 최근 5분간 일어난 이벤트 정보를 갖는 작은 파일과 연동 처리를 하게 되는데, 이 파일은 (UserID, LinkInfo) 쌍의 테이블이며 5분간 각 사용자가 어떤 웹 사이트를 클릭했는지의 정보를 갖고 있습니다. 이제 이 정보로 얼마나 많은 사용자가 직접 구독하는 주제와 상관없는 링크에 방문하는지 알 수 있습니다. 이 작업을 위해서는 join() 연산을 적용해 봅시다.

  이 코드는 기대한 대로 동작은하겠지만 효율적이지 않습니다. 이는 processNewLogs()가 매번 호출될 때마다 불리는 join()이 데이터세트에서 키가 어떻게 파티션되어 있는지에 대해 알지 못하기 때문입니다. 기본적으로 이 연산은 양쪽 데이터세트를 모두 해싱하고 동일 해싱키의 데이터끼리 네트워크로 보내 동일 머신에 모이도록 한 후 해당 머신에서 동일한 키의 데이터끼리 조인을 수행하는게 좋습니다. userData 테이블은 5분마다 갱신되는 정도의 events 테이블의 로그양보다는 매우 클 것이 예상되므로 이는 심각한 리소스의 낭비입니다. 심지어 userData에 아무 변경이 없어도 함수가 호출될 때마다 userData는 해싱 작업을 거쳐 네트워크로 셔플링이 됩니다. 

  이를 개선하기 위한 방법은 간단합니다. 프로그램 시작 때 한번만 해시 파티션을 하도록 userData에 partitionBy()를 사용하는 것입니다. 이렇게 되면 processNewLogs()는 변경을 할 필요가 없습니다. processNewLogs에 있는 eventsRDD는 지역변수의 형태이므로 이 메소드에서 한 번 쓰이고 버려집니다. 그래서 events를 위한 파티셔너(partitioner) 지정은 이득 볼 것이 없습니다. 이제 userData를 만들면서 partitionBy()를 호출 했으므로 스파크는 이미 userData가 해시 파티션되어 있음을 알고 이 정보를 최대한 활용할 것입니다. 좀 더 자세히 보면, userData.join(events)를 호출할 때 스파크는 오직 eventsRDD만 셔플해서 이벤트 데이터를 각각의 UserId와 맞는 userData해시 파티션이 있는 머신으로 전송할 것입니다. 결과적으로 매우 적은 네트워크비용을 쓰지만 속도는 매우 빨라지게 됩니다. (스칼라와 자바에서는 HashPartitioner를 사용이 가능하지만 python에서는 partitionBy()에서 개수만 전달이 가능합니다.

RDD 파티셔너 정하기

  스칼라와 자바에서는 RDD가 어떻게 파티션이 될지 partitioner 속성을 사용해 결정을 할 수 있다. 최초에 pairs를 생성한 뒤에 pairs.partitioner를 통해 파티셔너가 어떻게 설정이 되어있는지 확인을 할 수 있다. None으로 존재하지 않는 것을 볼 수 있다. HashPartitioner를 추가한 뒤에 pairs를 partitionBy와 함께 HashPartitioner의 객체를 파티션의 개수와 함께 전달한다. partitioned의 파티셔너를 확인하면 HashPartitioner로 설정이 되어 있는 것을 알 수 있다. 만약 partitioned를 반복해서 사용한다면 partitionBy의 라인에 persist()를 사용해주면 된다. 


[참고] 서적 - 러닝스파크



RDD 영속화(캐싱) - 이론 및 예제 

동일한 RDD를 여러 번 사용하고 싶을 때도 있을 것이다. 생각없이 이를 시도한다면 스파크는 RDD와 RDD에서 호출하는 액션들에 대한 모든 의존성을 재연산하게 된다. 이는 데이터를 여러 번 스캔하는 반복 알고리즘들에 대해서는 매우 무거운 작업일 수 있다. 

RDD를 여러 번 반복 연산하는 것을 피하려면 스파크에 데이터 영속화(persist/persistence)를 요청을 할 수 있다. RDD 영속화에 대한 요청을 하면 RDD를 계산한 노드들은 그 파트션들을 저장하고 있게 된다. 영속화된 데이터를 갖고 있는 노드에 장애가 생기면 스파크는 필요 시 유실되ㄴ 데이터 파티션을 재연산한다. 만약 지연 없이 노드 장애에 대응하고 싶다면 데이터를 복제하는 정책을 선택할 수도 있다. 

스파크는 목적에 맞는 여러 수준의 영속화를 제공한다. 스칼라와 자바에서는 기본적으로 persist()가 데이터를 JVM 힙(heap)에 직렬화되지 않는 객체 형태로 저장한다. 파이썬에서는 영속화된 데이터는 늘 직렬화를 하므로 기본적으로 JVM 힙에 피클(파이썬 직렬화)된 객체가 저장된다. 데이터를 디스크나 오프힙(off-heap_ 저장 공간에 쓸 때는 데이터가 늘 직렬화된다.

MEMORY_ONLY는 공간 사용량 높고, cpu사용시간 낮고, 메모리에 저장하고, 디스크에 저장하지 않는다.

MEMORY_ONLY_SER 공간 사용이 낮고, cpu사용이 높고, 메모리에 저장하고, 디스크에 저장하지 않는다.

MEMORY_AND_DISK 공간 사용량이 높고, cpu사용이 중간, 메모리에 일부 저장, 디스크에 일부 저장 (메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장)

MEMORY_AND_DISK_SER 공간 사용량이 낮고, cpu사용이 높고, 메모리에 일부 저장, 디스크에 일부 저장 (메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장, 메모리에 직렬화된 형태로 저장)

DISK_ONLY 공간사용량이 낮고, cpu사용이 높고, 메모리에 저장하지 않고, 디스크에 저장



RDD의 persist()와 unpersist()를 통해 캐시에 데이터를 저장하고, 삭제를 한다. 파티션의 개수를 알고 싶으면 자바와 스칼라에서는 rdd.partitions.size, 파이썬에서는 rdd.getNumPartitions를 하면 된다.


[참고] 서적 - 러닝스파크

Caching and Serialization

아래 내용을 기준으로 학습을 진행했습니다.

  • 어떻게 언제 RDD를 cache를 해야 하는지?
  • Storage level과 사용은 어떻게 하는지?
  • memory 사용을 최적화 하기 위해서 어떻게 하는지? 
  • RDDs를 공유하려면 어떻게 해야 하는지?

Persistence

  스파크는 MapReduce와 가장 다른점이 disk I/O가 최소화 되기 때문에 interative한 연산에 적합하다고 말을 합니다. 그러나 추가적으로 성능향상을 얻기 위해서는 RDD를 다루는 방법에 대해서 정확하게 이해를 해야합니다. Spark에서는 memory chaching을 사용하는 것으 알고 있습니다. 예를 들어서 RDD데이터를 생성하고 계속 반복적으로 사용을 한다면, 사용한다는 말은 action의 연산을 반복적으로 한다면 spark에서는 반복적으로 RDD객체를 생성을 합니다. 한마디로 작은 데이터를 읽는 작업을 반복하는것도 비효율적인데, 큰 데이터를 반복적으로 action 연산을 한다면 이 보다 비효율적일수가 없습니다. 그렇기 때문에 spark에서는 persist를 지원합니다. 여기서도 persist를 어떻게 언제하는지가 굉장히 중요합니다. 사용하고자 하는 데이터의 크기가 메모리의 크기보다 더 클 경우에는 disk를 함께 사용이 가능합니다. disk를 사용하면 다시 메모리로 올릴때 reconstitute를 해야합니다. 

  persist를 하는 시기에 대해서 알아보면, 이상적으로 pruning, filtering 등의 downstream 처리를 필요로 하는 transformations을 한 뒤에 하는게 좋습니다. 예를 들어서 file을 로딩하고, parsing을 하고, key를 기반으로 파티셔닝을했을때, 만약 filter, map, 그리고 shuffle을 여러번 반복을하는 작업을 통해 RDD를 재 사용한다면, root RDD를 cache하는 것은 좋은 방법이 아닙니다. paritionBy를 한 뒤에 caching을 하는게 더 좋은 방법이라고 할 수 있습니다.

  더이상 persisted RDD를 사용하지 않을 경우에는, unpersist를 통해서 간단하게 삭제가 가능합니다. Spark에서는 unpersist를 하지 않아도 LRU(Least Recently Used)의 알고리즘을 통해서 새로운 RDDs의 공간을 생성을 합니다. 

Where to Persist

  RDD lineage에 하나의 RDD는 filter를 통해 pruning이 되었고, 다른 하나의 RDD와 join을 하고, 그 결과를 reduceByKey와 mapValues를 한다면 언제 어느 시점에 persist를 해야할지 생각을 해봐야 합니다. filter를 한 뒤의 RDD를? 아니면 join을 한뒤에? 아니면 reduceByKey를 한뒤에? 가장 좋은 위치는 RDD가 reduce와 map의 action을 하기 전인, join을 한 뒤에 입니다.

Storage Levels

  Storage Levels은 내가 어디의 저장소까지 사용을 할 것인가에 대한 정의를 할 수 있는 것을 의미합니다. Spark에서는 기본으로 MEMORY_ONLY로 동작을 합니다. 그렇기 때문에 cache()의 함수를 통해서 persist를 할 수 있습니다. 

  만약 RDD가 memory에 전부 로딩이 안될 경우에는 DISK옵션을 사용해 디스크와 메모리를 함께 사용을 할 수 있습니다. 하지만 위에서 언급했던것과 같이 디스크를 사용하게 되면 reconstitute를 해야합니다. 

Storage Cost

  RDD를 persist하게 되면 모든 객체를 메모리에 유지를 해야합니다. 그렇게 되면 RDD를 저장하는 메모리의양이 거의 대부분을 차지할 수도 있습니다. RDD의 사용 공간을 절약하기 위해 serialize를 하게 됩니다. RDD의 records는 large byte array의 형태로 저장이 되고, data를 deserialize하기 위해서는 CPU를 사용하게 됩니다. 그러나 각각의 record들의 작고 많은 데이터를 저장하는 것보다 하나의 객체(byte array)로 저장을 함으로써 garbage collection에 도움을 주게 됩니다.

  spark에서는 Java serializer를 기본 serializer로 사용을 하고 있고, python에서는 pickle을 사용하고 있습니다. 다른 옵션으로 Java와 Scala에서는 Kryo serializer를 사용할 수 있습니다. kryo는 기존 Java serializer보다 potential compatibility의 코스트 측면에서 더 효율적입니다. 

  또하나 Storage cost를 절약할 수 있는 방법은 java와 scala의 collections을 사용하기 보다는 primitive types을 사용하면 최적화가 가능합니다. 또한 작은 objects을 포함한 nested classes의 사용을 줄여야 합니다. 그렇지 않으면 garbage collection의 overhead가 발생하게 됩니다.

Storage Level Comparison

  Storage Cost가 Serialization의 수행 여부에 따라 달라지는 것을 확인하기 위해, 대략 36MB의 파일을 읽게 되면 파일을 Java objects의 사이즈로 변환이 되기 때문에 persisted dataset의 크기는 기존의 파일보다 더 큰 162.8MB가 되게 됩니다. 반대로 Java serializer를 사용해 Serialized data는 52.3MB로 기존 파일의 크기보다 약간 큰 것을 확인할 수 있습니다. Kryo Serializer를 사용하면, 기본 Java Serializer보다 더 작고, 기존 파일의 크기보다 작은 34.1MB의 크기를 보여주는 것을 확인할 수 있습니다.


Using Kryo

  Kryo Serializer를 사용하는 방법은 간단합니다. 첫번째로 SparkConf의 객체를 생성하고, spark.serializer의 파라미터에 KryoSerializer로 세팅을 하고, serialize를 할 class를 각각 등록을 해주면 됩니다. (여기서는 Trip과 Station의 class를 KryoClasses로 등록을 했습니다.) 마지막으로 SparkContext의 객체를 위에서 세팅하고, 등록한 SparkConf를 이용해 생성하면 됩니다. Kryo에 의해 Serialization을 하기 위해서는 storage level을 MEMORY_ONLY_SER로 사용을 하면 됩니다.

Sharing RDDs

  Spark에서 여러개의 Application을 제출(submit)을 하게 되면, RDD를 Appilcation사이에 공유가 필요할때가 생ㄱ비니다. 하지만 RDDs는 SparkContext에 속해있기 때문에 사실상 불가능합니다. 하지만 Tachyon Proeject를 사용하면 RDD를 효율적으로 공유가 가능합니다. Tachyon Project는 high-speed memory-based Distributed File System을 말합니다. 

Memory Configuration

  • Spark.rdd.compress는 serialized RDDs를 compress를 할지 안할지에 대한 설정값입니다.
  • spark.suffle.consolidateFiles는 shuffle 동안 생성된 중간 파일들을 통합하여, 기존 작고 많은 파일을 큰 파일의 형태로 생성을 합니다. 이렇게 하게 되면 disk I/O에 잠재적으로 성능 향상을 가져다 줄 수 있습니다. 문서를 보면 ex4또는 xfs filesystem에서는 true의 값으로, ext3에서는 false의 값을 설정하는게 좋다고 언급을 합니다. ext3은 8개의 코어를 갖은 machines에서는 성능 저하를 가져올 수 있기 때문입니다. 
  • spark.shuffle.spill은 disk로 data를 채우(spill)는 reduces의 작업을 하는동안 사용하는 memory의 양을 제한합니다. spill threshold는 spark.shuffle.memoryFraction에 의해 설정이 됩니다. 
  • spark.storage.memoryFraction은 in-memory persistence를 storage에 얼마나 사용을 할 것인가를 나타냅니다.

(여기는 사용해보지 않으니 정확히 어떤값을 의미하는지 잘모르겠다..... 한번 확인을 전체적으로 해볼 필요는 있을것 같구나.)

[참고] Big Data University - Spark Fundamentals II



+ Recent posts