빅데이터/Spark

Spark Shell을 이용한 간단한 예제 및 앱 배포 방법

쌍쌍바나나 2016. 6. 16. 07:36
반응형

Interactive Analysis with the Spark Shell

  API를 쉽게 습득하기 위해서는 spark's shell만큼 좋은게 없습니다. python과 scala를 통해서 할 수 있습니다. 이번에는 RDD를 생성하는 방법, MapReduce를 Spark에서 구현하는 방법과, caching하는 방법에 대해서 설명하려고 합니다. 마지막에는 내가 구현한 앱을 사용자들에게 배포하기 위해 sbt build tool을 이용해 배포하는 방법에 대해서 설명합니다. 

Resilient Distributed Dataset(RDD)

  생성하기 위해서는 hadoop hdfs file을 읽거나, 기존에 있는 rdds를 transforming을 통해서 변환이 가능합니다. RDD에서 제공하는 method는 action, transformation이 있습니다. action [참고] https://spark.apache.org/docs/latest/programming-guide.html#actions transformation [참고] https://spark.apache.org/docs/latest/programming-guide.html#transformations

$SPARK_HOME/bin/spark-shell
scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[3] at textFile at :27

scala> textFile.count() // Number of items in this RDD
res1: Long = 95

scala> textFile.first() // First item in this RDD
res2: String = # Apache Spark

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contian "Spark"
res3: Long = 17

map과 reduce를 사용하는 예제 가장 많은 단어를 포함한 줄의 단어 개수를 구하는 예제입니다.

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 14

  각 line 별로 띄어쓰기를 기반으로 split을 진행하고, reduce할때 가장 많은 단어를 포함한 줄의 단어 개수를 갱신하고 결과를 리턴 여기서 map은 transformation이고, reduce는 action이기 때문에, map을 했을때는 결과가 나오지 않고, reduce할때 최종 결과를 리턴합니다. 즉, map을 할때는 어떤 처리를 하지 않고, reduce할때 rdd의 데이터를 처리합니다. 여기서 코드를 좀 정리하면,

scala> import java.lang.Math
if (a > b) a else b  => Math.max(a,b)로 변환이 가능합니다.
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a,b))
res4: Int = 14

  Hadoop MapReduce의 가장 유명한 예제인 단어별 개수 세기를 spark에서 scala를 이용하면 아래와 같습니다. 여기서도 flatMap, map, 그리고 reduceByKey의 transformation operation을 하고, 마지막에 collect()라는 action을 통해서 결과를 얻을 수 있습니다. flatMap은 map과 유사하지만 0또는 그 이상의 outputitems에 mapping이 될 수 있습니다. 위 예제에서 line.split(" ").size는 하나의 값으로 매핑이 되지만 flatMap은 여러개의 아이템으로 mapping이 가능하기 때문에 여러 단어를 보유하고, 그 각 단어들을 기반으로 map을 통해 각 하나의 단어별로 key, value pair로 값을 생성하고, key를 기반으로 reduceByKey를 하면 단어별 개수 세기가 가능합니다.

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at :29

scala> wordCounts.collect()
res8: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (YARN,,1), (locally,2), (changed,1), (locally.,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (first,1), (graph,1), (Hive,2), (["Specifying,1), ("yarn",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), ([project,2), (prefer,1), (SparkPi,2), (,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (are,1), (systems.,1), (params,1), (scala>,1), (DataFrames,,1), (provides,1), (refer,2)...

Caching

캐시를 사용하면 반복적으로 데이터에 접근을 할 때 유용합니다.

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at :29

scala> linesWithSpark.cache()
res0: linesWithSpark.type = MapPartitionsRDD[2] at filter at :29

scala> linesWithSpark.count()
res1: Long = 17

scala> linesWithSpark.count()
res2: Long = 17

Self-Contained Applications

Application으로 작성하는 방법이다. build하기 위해서는 dependency를 고려하기 위해서 package manager를 사용하는데, scala의 경우에는 sbt를 사용하고, java는 maven을 사용합니다.

sbt 설치 방법

[참고] http://www.scala-sbt.org/0.13/docs/Installing-sbt-on-Linux.html

echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

simple.sbt 파일 작성

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"

  파일 경로 확인 및, sbt를 이용해 build를 하는 과정입니다. sbt package를 하면 관련된 dependency의 파일을 다운로드 받기 때문에 시간이 좀 걸립니다. build를 하고 나면, .jar가 나오고, jar파일을 spark-submit을 통해 실행하면 됩니다.

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 58, Lines with b: 26


반응형