Hadoop HDFS에서 디렉토리별 사용량 체크를 하는 방법

HDFS를 사용하다보면 directory 별로 디스크 사용량 체크가 필요하다. 아래와 같이 Permission denied 에러가 발생할것이다. 그 이유는 ROOT의 사용량 조회를 일반 유저 아이디로는 조회가 불가능하기 때문이다. 루트를 포함해 디스크 용량을 확인하기 위해서는 모든 키를 갖고 있는 HDFS 유저로 조회를 하면 된다.

du: Permission denied: user=icecream, access=READ_EXECUTE, inode="/user/hue/.Trash":hue:hue:drwx------

``` sudo -u hdfs hdfs dfs -du -s | sort -r -k 1 -g | awk '{ suffix="KMGT"; for(i=0; $1>1024 && i < length(suffix); i++) $1/=1024; print int($1) substr(suffix, i, 1), $3; }'

example

sudo -u hdfs hdfs dfs -du -s /data/* | sort -r -k 1 -g | awk '{ suffix="KMGT"; for(i=0; $1>1024 && i < length(suffix); i++) $1/=1024; print int($1) substr(suffix, i, 1), $3; }' ```

하둡(Hadoop)에서 데이터 복사하는 방법

  • HDFS에서 데이터를 복사하는 방법은 아주 다양하다.
  • 효율적으로 데이터를 복사하는게 중요하다

데이터를 복사하는 다양한 방법

  • 일단 데이터를 복사하는 방법은 다양하다

로컬 <-> 클러스터

  • 아래 속성을 통해서 로컬에 있는 데이터를, 클러스터에 있는 데이터를 옮길 수 있다.
    • copyToLocal
    • put
    • get

클러스터 <-> 클러스터

  • 클러스터간에 데이터를 옮기기 위해서는 아래와 같이 하면 된다.
  • 클러스터 -> 로컬 -> 클러스터
  • 과연 이렇게 하는 사람이 있을까 싶지만, 이렇게 하는 사람을 내가 보았음

클러스터 -> 클러스터 로 데이터를 옮기는 방법은 한가지가 아니다.

  • cp
  • distcp

두 가지 방법의 차이를 알지 못한다면, 작업에 엄청난 문제가 생길 것이다.

만약 distcp를 이용하면 아래와 같은 에러가 발생할 수 있다.
아래 에러는 distcp는 MR을 사용하기 때문에 block-size가 맞지 않아서 생기는 문제,
checksum-checks를 skip을 할 수 있으나, 문제가 생길 수 있기 때문에
-bp 옵션을 사용해 block-sizes를 보존(preserver)하는게 좋다.

Caused by: java.io.IOException: Check-sum mismatch between Source and target differ in block-size. Use -pb to preserve block-sizes during copy. Alternatively, skip checksum-checks altogether, using -skipCrc. (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)

cp는 하나의 pipeline으로 복사하기 때문에 상당히 속도가 느리다.
반면, distcp는 mapreduce를 이용하기 때문에 속도가 빠르다.
위에서 block-size를 고려해야 하는 이유도 mr을 이용하기 때문이다.

bash hadoop distcp hdfs://A/user/jslee/data hdfs://B/user/jslee/data

shell hadoop fs -cp hdfs://A/user/jslee/data hdfs://B/user/jslee/data

하둡(Hadoop)에서 데이터 복사하는 방법

  • HDFS에서 데이터를 복사하는 방법은 아주 다양하다.
  • 하지만 효율적으로 데이터를 복사하는게 중요하다

데이터를 복사하는 다양한 방법

  • 일단 데이터를 복사하는 방법은 다양하다

로컬 <-> 클러스터

  • 아래 속성을 통해서 로컬에 있는 데이터를, 클러스터에 있는 데이터를 옮길 수 있다.
    • copyToLocal
    • put
    • get

클러스터 <-> 클러스터

  • 클러스터간에 데이터를 옮기기 위해서는 아래와 같이 하면 된다.
  • 클러스터 -> 로컬 -> 클러스터

과연 이렇게 하는 사람이 있을까 싶지만, 이렇게 하는 사람을 내가 보았음 클러스터 -> 클러스터 로 데이터를 옮기는 방법은 한가지가 아니다.

  • cp
  • distcp

두 가지 방법의 차이를 알지 못한다면, 작업에 엄청난 문제가 생길 것이다.

  • 만약 distcp를 이용하면 아래와 같은 에러가 발생할 수 있다.
  • 아래 에러는 distcp는 MR을 사용하기 때문에 block-size가 맞지 않아서 생기는 문제,
  • checksum-checks를 skip을 할 수 있으나, 문제가 생길 수 있기 때문에
  • -bp 옵션을 사용해 block-sizes를 보존(preserver)하는게 좋다.

Caused by: java.io.IOException: Check-sum mismatch between Source and target differ in block-size. Use -pb to preserve block-sizes during copy. Alternatively, skip checksum-checks altogether, using -skipCrc. (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)

  • cp는 하나의 pipeline으로 복사하기 때문에 상당히 속도가 느리다.
  • 반면, distcp는 mapreduce를 이용하기 때문에 속도가 빠르다.
  • 위에서 block-size를 고려해야 하는 이유도 mr을 이용하기 때문이다.

bash hadoop distcp hdfs://A/user/jslee/data hdfs://B/user/jslee/data

shell hadoop fs -cp hdfs://A/user/jslee/data hdfs://B/user/jslee/data

Hadoop Cluster?

  노드(node)는 하나의 컴퓨터를 말합니다. 이처럼 30~40개의 노드가 모여 하나의 rack을 구성합니다. rack은 물리적으로 같은 network의 switch에 모두 연결이 되어 있습니다. 그렇기 때문에 두 노드의 badnwidth는 다른 rack에 있는 노드보다 크게 됩니다. 즉, 데이터의 이동을 할 수 있는 폭이 크기 때문에 데이터의 속도가 빠른것을 알 수 있습니다. rack이 모여서 하나의 Hadoop Cluster를 구축하게 됩니다. 이처럼 network의 다른 switch에 연결되어 있는 rack으로 인해 어떤 성능 저하가 나타는지 추후에 알아보도록 하겠습니다.

Hadoop의 주요한 컴포넌트는 HDFS와 MapReduce가 있습니다. 

HDFS(Hadoop Distributed File System)

  HDFS는 각 노드의 파일시스템의 위에서 동작을 하게 됩니다. 분산시스템에서 문제로 다루어지고 있는 fault tolerant를 해결하기 위해 HDFS에서는 데이터를 복제(replication)을 하게 됩니다. Hadoop은 데이터가 큰 파일을 처리하도록 디자인 되었기 때문에, 데이터의 파일이 크면 클수록 다음 읽어야하는 데이터의 위치를 찾는 시간이 줄어들게 됩니다. 그렇기 때문에 큰 파일을 읽어서 처리하는게 효율적입니다. 데이터의의 서브셋을 분석할때도 데이터의 위치를 찾는 연산의 코스트가 크기때문에 피하는게 좋습니다. Hadoop은 streaming, sequential 데이터를 접근하는게 임의의 접근보다 더 효율적으로 디자인이 되어 있습니다. 잘 생각해보면 데이터의 위치를 찾는 연산의 코스트가 크다고 했는데, sequenctial 데이터나, streaming 데이터는 데이터를 시작하는 위치만 찾으면 되기 때문에 찾는 작업의 코스트가 크지 않습니다. 

  HDFS에서는 데이터를 블록(block)의 사이즈로 저장을 하고 있습니다. 기본으로는 64MB가 하나의 블록 사이즈인데, 거의 대부분은 128MB의 이상의 블록사이즈로 데이터를 저장을 하고 있습니다. 그냥 데이터를 통째로 저장하면 되는데, 왜 블록으로 나누어서 저장을 하게 될까요? 그 이유는 첫번째로 데이터가 얼마나 디스크에 맞는지, 저장이 가능한지 계산하기가 쉽습니다. 두번째로 HDFS에서는 fault tolerant를 방지하기 위해서 데이터를 복제합니다. 그렇다면 데이터를 부분적으로 복제해 여러 분산된 노드에 저장이 가능합니다. 세번째로 블록은 데이터 공간을 낭비하지 않습니다. 그 예로 450MB의 데이터를 128MB의 블록으로 나누어 져장할때, 4개의 블록이 생기게 됩니다. 하지만 마지막 4번째 블록은 128MB의 공간을 전부 소비하지 않고, 데이터 사이즈에 맞도록 소비하게 됩니다. 

  아래 예제는 블록이 각 여러 노드에 복제된 것을 나타낸 그림입니다. 블록1은 노드1, 노드2에 저장이 되어있고, 블록2는 노드1과 노드3에 저장이 되어 있습니다. 그리고 블록3은 노드2와 노드3에 저장이 되어 있습니다. 이처럼 데이터 복제는 한개의 노드가 fail이 나도 데이터 손실없이 데이터를 처리 할 수 있습니다. 예를 들어 만약 노드1이 충돌이 발생하면 노드2는 여저니 블록1을 갖고 있기 때문에 데이터를 계속 처리가 가능합니다.

  HDFS는 NameNode와 DataNodes를 가지고 있습니다. NameNode는 실제 데이터를 저장하고 있는 노드가 아닌, 어느 노드에 어떤 데이터가 저장되어 있는지에 대한 filesystem의 metadata를 가지고 있습니다. filesystem의 모든 metadata를 메모리에 저장을 하고 있어야 하기 때문에 NameNode는 가능한 큰 메모리를 갖고 있는게 좋습니다. HDFS는 많은 DataNodes를 가지고 있습니다. DataNodes는 데이터의 블록을 저장하고 있습니다. 

  만약 데이터를 찾는 클라이언트의 요청이 오면, NameNode로 부터 데이터의 위치를 찾습니다. 해당 DataNode의 데이터 위치를 확인한 클라이언트는 직접적으로 DataNode의 데이터에 접근을 해서 연산을 하게 됩니다. 그렇기 때문에 각각 DataNode에서 생성되거나 바뀐 데이터의 정보가 발생하는데, 변경된 정보는 NameNode로 주기적으로 리포트를 보냅니다.

  만약에 NameNode가 fail이 났을 경우를 생각해서 JournalNodes가 있습니다. JournalNodes는 최소 3개, 홀수개로 존재하게 됩니다. NameNode는 무조건 한개가 active인데, JournalNodes는 함께 동작하면서 NameNode가 죽게 되면 다른 NameNode를 active시키는 역할을 합니다. 

  Hadoop은 topology network를 갖고 있기 때문에 데이터를 보내고, 처리하는 과정을 최적화를 해야합니다. 최적활를 하는 방법은 간단합니다. 데이터가 있는 곳에서 처리를 하거나, 데이터를 이동해서 처리를 해야한다면 bandwidth가 최대가 되는 노드 사이에서 데이터를 이동하는게 효율적입니다. 예를들어서 블록 B1을 처리를 해야할때 rack1의 노드 n1에서 처리하는게 가장 좋습니다. 하지만 만약 같은 노드에 데이터가 없다면, 같은 rack에 있는 노드에서 처리하는게 좋습니다. 가장 최악은 다른 rack2에 있는 노드에서 처리하는게 최악입니다. 

  HDFS에 데이터를 생성하는 방법은, NameNode에 "create"라는 request를 보내게 됩니다. 그럼 NameNode에서는 해당 파일이 있는지, 이 클라이언트가 권한이 있는지 확인을 하고, DataNode에 데이터를 생성합니다. 만약 클라이언트가 DataNode에서 동작을 하고 있으면 해당 노드에 데이터를 생성합니다. 만약 그게 아니라면 랜덤하게 DataNode를 선택하고 데이터를 생성합니다. HDFS는 기본적으로 2개의 복제(replication)를 하게 됩니다. DataNodes사이에 pipeline을 생성을 하고, 복제 데이터를 생성합니다. 파일을 다 생성하게 되면 DataNodes에서는 데이터가 다 생성되었다는 메시지를 순차적으로 전송을 합니다. 모든 노드에 데이터가 복제가 되면 NameNode는 acknowlegement를 받게 됩니다.

MapReduce

MapReduce 프로그램은 map tasks와 reduce tasks로 나누어져서 각 노드에서 parallel하게 동작하는 것을 말합니다.

 

Yarn

  Hadoop위에서 동작하는 여러 어플리케이션이 등장하기 시작하면서 고려를 해야하는게 생겼습니다. 바로 리소스와 스케줄입니다. 각 어플리케이션은 각자의 리소스매니저와 스케줄러를 사용했습니다. 그렇게 되면 다른 어플리케이션에서 공유되는 자원을 사용할때 문제가 발생하게 됩니다. 그래서 등장한게 Yarn입니다. Yarn은 Hadoop Cluster의 리소스를 관리해주는 관리자입니다. 각자의 어플리케이션에서 관리하던 리소스와 스케줄러를 통합해서 관리하게 됩니다. 

  Yarn은 각 노드에서 동작하고 있는 NodeManager를 통해서 각 노드에 사용가능한 자원에 대해서 인지를 하고 있습니다. 동작순서는 어플리케이션이 실행이 되면 Application Master가 시작되고, ResourceManager로 부터 어떤 리소스를 사용할 수 있는지에 대해서 알게 됩니다. 그렇게 되면 Application Manager는 각 노드에 있는 Containers에게 자원을 할당합니다. 자원이 할당되면 각 tasks는 Containers에서 작업을 수행하게 됩니다. 



[참고] Big Data University - Hadoop Fundamental I 


들어가며

  우리에게 주어진 데이터가 1GB라고 생각해보자, 1GB의 크기를 갖는 데이터를 관계형 데이터베이스에 저장을 하고, 데이터를 처리하는데는 큰 어려움이 없을 것입니다. 하지만 데이터의 크기가 10GB, 100GB로 증가한다고 생각하면, 우리가 갖고 있는 컴퓨터의 성능을 업데이트를 해야 합니다. 하지만 만약 1TB, 10TB, 100TB로 데이터의 크기가 커진다면 어떻게 해야할까요? 그렇게 된다면 우리는 여러 컴퓨터를 이용해 분산으로 처리하는 방법을 사용해야 합니다. 하루에 처리할 내용을 몇시간에 처리하는게 일하는데 더 효율적이니 분산으로 처리하는게 좋을것 같습니다. 

  그렇다면 데이터가 어떻게 갑자기 늘어났을까요? 최근 스마트폰이 등장하면서 Facebook, Twitter, RFID readers, sensor 등의 데이터가 기하급수적으로 늘어나고 있습니다. 이러한 데이터들은 unstructed 데이터의 형태로 수집이 되고, 수집된 데이터에서 유용한 정보를 추출하는 데이터의 처리의 양 또한 상당히 커지고 있습니다. 그렇다면 어떻게 처리를 할 수 있을까요? 아마도 Hadoop이 답이 될 수 있습니다.

Big Data란 무엇일까요?

  인터넷이 널리 보급됨에 따라 32억명이 인터넷을 사용한다고 합니다. 전세계 인구가 75억정도면 거의 절반은 인터넷을 하고 있습니다. 이처럼 인터넷이 보급되고, 스마트폰이 등장함에 따라 생산되는 데이터의 양은 엄청납니다. 하루에 Twitter는 7TB의 데이터를 생산하고, Facebook에서는 600TB의 데이터가 생산이 되고 있습니다. 하지만 여기서 주목해야할 점은 이렇게 수집되는 모든 데이터의 약 80%정도는 unstructed 데이터의 형태를 나타내고 있습니다. 이말은 즉, 우리가 보유하고 있는 데이터는 많지만, 이 많은 데이터에서 유용한 정보를 추출하기 위해서는 어마어마한 데이터 처리가 필요합니다.

Hadoop이란 무엇일까요?

  Hadoop은 Apache Foundation의 오픈소스 프로젝트로써, Java로 개발된 framework를 말합니다. Hadoop은 commodity hardware를 사용해 대용량의 unstructed, semi-structed 데이터를 처리하는데 최적화 되어 있습니다. (여기서 언급하는 commodity hardware는 범용적으로 우리가 사용하는 컴퓨터를 말합니다.) 하지만 batch처리로 많은 데이터를 저장하지만, 요청한 처리에 대해서는 즉각적으로 답을 받을 받는 framework는 아닙니다. 

Hadoop과 연관되어 있는 오픈소스 프로젝트



Lucene, Hbase Hive, Pig, Spark, ZooKeeper, Ambari, Avro, UIMA, YARN, MapReduce

  • Lucene - Java로 구현된 텍스트 검색 엔진
  • Hbase - Hadoop의 데이터베이스
  • Hive - Hadoop files에 저장되어 있는 데이터를 쿼리할때 사용
  • Pig - 대용량의 데이터를 처리하기 위해 필요한 MapReduce Code를 생산해 주는 high level language
  • Spark - 인메모리 클러스터 컴퓨팅 framework
  • Zookeeper - 분산시스템에서 naming, configuration service를 중앙에서 관리
  • Ambari - Web UI를 통해서 hadoop clusters를 관리하고 머니터링
  • Avro - 데이터 Serialization system
  • UIMA - unstructed 데이터의 분석을 위한 architecture
  • YARN - 빅데이터 applications을 위한 large-scale operating system
  • MapReduce - 대용량의 데이터를 처리하기위한 소프트웨어 프레임워크

[참고] Big Data University - Hadoop Fundamentals I

Yarn?

  Hadoop 1.0에서는 JobTracker가 클러스터의 자원 배분과 Job관리를 함께 수행했기 때문에 JobTracker에서 병목현상이 일어났다. JobTracker가 하던 두 가지 역할-자원관리를 Resource Manager와 Application Master로 분리해서 JobTracker에 몰리던 병목을 제거했습니다. Resource Management, Scheduling/Monitoring을 구분하는 daemon의 기능을 분할하였습니다. global하게 ResourceManager, per-application ApplicationManager를 두자라는 아이디어에서 부터 시작했습니다. 범용 컴퓨팅 클러스터가 가능 (MapReduce)외에도 다양한 어플리케이션을 실행할 수 있으며, 어플리케이션 마다 자원(CPU, 메모리)을 할당 받는다. Hadoop 2.0의 Cluster Resource Management 플랫폼인 Yarn은 하둡 클러스터의 각 어플리케이션에 필요한 리소스를 할당하고, 모니터링하는 업무에 집중함으로써 다양한 어플리케이션이 하둡 클러스터들의 리소스를 공유할 수 있도록 하는 핵심 요소이다.

YARN의 Components



  • Resource Manager
  •   클러스터에 1개 존재하며, 클러스터의 전반적인 자원 관리와 태스크트들의 스케줄링을 담당한다 .클라이언트로부터 어플리케이션 실행요청을 받으면 그 어플리케이션의 실행을 책임질 Application Master를 실행한다. 또한 클러스터 내에 설치된 모든 Node Manager와 통신을 통해서 각 서버마다 할당된 자원과 사용중인 자원의 상황을 알 수 있으며, Application Master들과의 통신을 통해 필요한 자원이 무엇인지 알아내어 관리하게 된다. Resource Manager내부에는 여러개의 컴포넌트들이 존재하며, Scheduler, Application Manager, Resource Tracker 세개의 메인 컴포넌트가 있다.

    • Scheduler
    •   Node Manager들의 자원 상태를 관리하며 부족한 리소스들을 배정한다. 노드 당 1개가 있고, Scheduler는 프로그램의 상태를 검사하거나 모니터링하지 않으며, 순수하게 스케줄링 작업만 담당한다. 스케줄링이란 자원 상태에 따라서 태스크들의 실행 여부를 허가해주는 역할만 담당하며, 그 이상의 책임은 지지 않는다. 즉, 프로그램 오류나 하드웨어의 오류로 문제가 발생한 프로그램을 재 시작시켜주지 않으며, 프로그램에서 요구하는 리소스(CPU, Disk, 네트워크 등)에 관련된 기능만 처리한다. 노드 컨테이너의 자원 상태를 Resource Manager에게 통지한다.

    • Application Manager
    •   Node Manager에서 특정 작업을 위해서 Application Master를 실행하고, Application의 실행 상태를 관리하고 상태를 Resource Manager에게 통지한다. 여기서 Application Master라는 용어가 나오는데, YARN에서 실행되는 하나의 태스크를 관리하는 마스터 서버를 말한다. 어플리케이션 당 1개가 있다.

    • Resource Tracker
    •   Container가 아직 살아있는지 확인하기 위해서 Application Master재 시도 최대 횟수, 그리고 Node Manager가 죽은 것으로 간주 될 때까지 얼마나 기다려야 하는지 등과 같은 설정 정보를 가지고 있다.

  • Node Manager
  •   노드 당 한개씩 존재합니다. 해당 Container의 리소스 사용량을 모니터링하고, 관련 정보를 Resource Manager에게 알리는 역할을 담당합니다. Application Master와 Container로 구성되어 있습니다.

    • Application Master
    •   하나의 프로그램에 대한 마스터 역할을 수행하며, Scheduler로 부터 적절한 Container를 할당 받고, 프로그램 실행 상태를 모니터링하고 관리합니다.

    • Container
    •   CPU, 디스크(Disk), 메모리(Memory) 등과 같은 속성으로 정의된다. 이 속성은 그래프 처리(Graph processing)와 MPI와 같은 여러 응용 프로그램을 지원하는데 도움이 된다. 모든 작업은 결국 여러 개의 태스크로 세분화되며, 각 테스크는 하나의 Container 안에서 실행이 됩니다. 필요한 자원의 요청은 Application Master가 담당하며, 승인 여부는 Resource Manager가 담당합니다. Container안에서 실행할 수 있는 프로그램은 자바프로그램뿐만 아니라, 커맨드 라인에서 실행할 수 있는 프로그램이라면 모두 가능합니다.


Hadoop 2.0 동작 방식

  1. 클라이언트는 Application Master 자체를 실행하는 필요한 데이터를 포함하는 응용프로그램을 Resource Manager에게 제출
  2. Resource Manager는 Node Manager를 통해 Container 할당을 책임지는 Application Master를 시작
  3. Application Master가 Resource Manager에 등록되고, 클라이언트가 Resource Manager와 통신이 가능.
  4. Application Master는 resource-request 프로토콜을 통해 Resource Manager를 통해 적절한 리소스의 Container할당을 요청
  5. Container가 성공적으로 할당되면, Application Master는 실행 스펙을 Node Manager에게 제공하여, Container를 실행시킨다. 실행 스펙을 일반적으로 Container가 Application Master 그 자체와 통신하기 위해 필요한 정보를 포함
  6. 응용프로그램 코드는 Container에서 실행되고, 진행률, 상태 등의 정보를 응용프로그램-스펙 프로토콜을 통해 응용프로그램의 Application Master에 전송
  7. 응용프로그램 실행 중 클라이언트는 상태, 진행률 등을 얻기 위해 Application Master와 응용프로그램-스펙 프로토콜을 통해 직접 통신
  8. 일단 응용프로그램이 완료되고, 모든 필요한 작업이 종료되면, Application Master는 Resource Manager에서 등록된 자신을 제거하고, 자신의 컨테이너를 다른 용도로 사용이 가능하도록 종료


[참고] http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html
[참고] http://www.edureka.co/blog/introduction-to-hadoop-2-0-and-advantages-of-hadoop-2-0/

+ Recent posts