Hadoop HDFS에서 디렉토리별 사용량 체크를 하는 방법

HDFS를 사용하다보면 directory 별로 디스크 사용량 체크가 필요하다. 아래와 같이 Permission denied 에러가 발생할것이다. 그 이유는 ROOT의 사용량 조회를 일반 유저 아이디로는 조회가 불가능하기 때문이다. 루트를 포함해 디스크 용량을 확인하기 위해서는 모든 키를 갖고 있는 HDFS 유저로 조회를 하면 된다.

du: Permission denied: user=icecream, access=READ_EXECUTE, inode="/user/hue/.Trash":hue:hue:drwx------

``` sudo -u hdfs hdfs dfs -du -s | sort -r -k 1 -g | awk '{ suffix="KMGT"; for(i=0; $1>1024 && i < length(suffix); i++) $1/=1024; print int($1) substr(suffix, i, 1), $3; }'

example

sudo -u hdfs hdfs dfs -du -s /data/* | sort -r -k 1 -g | awk '{ suffix="KMGT"; for(i=0; $1>1024 && i < length(suffix); i++) $1/=1024; print int($1) substr(suffix, i, 1), $3; }' ```

Shell Script에서 외부 하이브 테이블 가져오는 방법

외부에서 하이브 테이블 가져오기

  1. (외부) 하이브 테이블 데이터 복사
  2. (내부) 하이브 테이블 생성
  3. (내부) 데이터 하이브 테이블로 로드(Load)

쉘(shell)에서 데이터를 덤프, 테이블 생성, 로드 까지 하는 방법

스크립트

  • 스크립트에서 첫번째로 hadoop distcp를 이용해서 데이터를 덤프해 오자.
  • cat을 이용해서 하이브 쿼리를 특정 디렉토리의 .hql을 생성
  • .hql의 파일을 hive -f를 통해서 실행

위 과정에서 Hive 쿼리를 직접 입력하지 않고, hql을 생성한 이유는 이후에 스크립트에서가 아닌 스크립트 내에 있는 여러 쿼리들 중에서 하나만 실행하고 싶을때 사용하면 좋다. 또한 cat을 이용하는 이유는 하이브 쿼리를 순수하게 그대로 사용이 가능하기 때문이다. 예를 들어 cat을 사용하지 않고 """로 여러 행을 입력하게 하기 위해서는 \를 개행하는 곳마다 입력을 해야 한다. 그렇게 되면 이후에 쿼리 날릴때 \를 또 수정해야 하는 문제가 있기 때문에 쉘에서 hive 쿼리를 실행할때는 cat을 이용해서 hql 파일을 생성하고 hql파일을 실행하는게 좋은것 같다.

Pyspark NoneType, Null, Blank, Empty String 필터링 하는 방법

pyspark에서 NoneType, NULL, Blank, Empty String 등 사용지 않는 값을 필털이 하는 방법에 대해서 설명을 한다.

  • SQL로 진행하면 컬럼 하나 하나에 대해서 WHERE 절을 이용해서 필터링을 해야한다.
  • 하지만 spark에서 Condition을 생성하고, 해당 Condition을 filter() 함수의 인자로 넘겨주면, 동일한 조건으로 모든 컬럼에 적용이 가능하다.
  • 코드에서 살펴볼 내용
    • None, Null Empty String이 포함된 Row만 추출이 가능
    • None, Null, Empty String을 제외한 모든 Row만 추출도 가능
  • 코드 설명
    • 간단하게 각각 컬럼에 공통적으로 적용할 condition을 입력을 하고, map함수를 이용해 모든 컬럼에 적용을 한다.
    • 마지막에는 reduce함수를 이용해 condition을 모두 "&"(and) 또는 "|"(or) 로 연결하면 condition은 정의 완성
    • 정의된 condition을 filter함수에 넘기면 내가 원하는 Row만 추출이 가능하다.
    • Condition 예)Column<(((CODE IS NOT NULL) AND (NOT (CODE = ))) AND ((TYPE IS NOT NULL) AND (NOT (TYPE = ))))>

소스코드

PySpark 에서 NoneType을 Filtering 하는 방법 (any, all, subset)

pyspark에서 drop method는 NULL을 가진 행을 제거하는데 가장 간단한 함수다. 기본적으로 NULL 값을 가진 행을 모두 제거를 한다. 모든 컬럼이 NULL인 경우 제거를 하고, 하나의 컬럼이 NULL인 경우 제거를 하고 싶은 경우가 있을것이다. 이런 경우에 어떻게 다르게 진행하는지 "any", "all"을 통해 설명

위 작업을 SQL에서 진행한다면 WHERE 절에 해당 컬림이 NULL인지 체크하는 구문을 넣어야 한다. 만약 모든 컬럼에 대해서 해야 하면? 모든 컬럼을 명시해야하는 단점이 있다. (SELECT * FROM TABLE WHERE COL1 IS NOT NULL)

  • drop 메소드에 인수
    • any: 모든 행의 컬럼값 중 하나라도 NULL의 값을 가지면 해당 행을 제거
    • all: 모든 컬럼 값이 NULL이거나 NaN인 경우에만 해당 행을 제거

위에서 보면 항상 "모든"이 조건이 된다. 하지만 drop 인자중에 subset을 이용하면 특정 컬럼에 대해서만 drop을 진행할 수 있다. 아래 소스코드 참고

소스코드

하둡(Hadoop)에서 데이터 복사하는 방법

  • HDFS에서 데이터를 복사하는 방법은 아주 다양하다.
  • 효율적으로 데이터를 복사하는게 중요하다

데이터를 복사하는 다양한 방법

  • 일단 데이터를 복사하는 방법은 다양하다

로컬 <-> 클러스터

  • 아래 속성을 통해서 로컬에 있는 데이터를, 클러스터에 있는 데이터를 옮길 수 있다.
    • copyToLocal
    • put
    • get

클러스터 <-> 클러스터

  • 클러스터간에 데이터를 옮기기 위해서는 아래와 같이 하면 된다.
  • 클러스터 -> 로컬 -> 클러스터
  • 과연 이렇게 하는 사람이 있을까 싶지만, 이렇게 하는 사람을 내가 보았음

클러스터 -> 클러스터 로 데이터를 옮기는 방법은 한가지가 아니다.

  • cp
  • distcp

두 가지 방법의 차이를 알지 못한다면, 작업에 엄청난 문제가 생길 것이다.

만약 distcp를 이용하면 아래와 같은 에러가 발생할 수 있다.
아래 에러는 distcp는 MR을 사용하기 때문에 block-size가 맞지 않아서 생기는 문제,
checksum-checks를 skip을 할 수 있으나, 문제가 생길 수 있기 때문에
-bp 옵션을 사용해 block-sizes를 보존(preserver)하는게 좋다.

Caused by: java.io.IOException: Check-sum mismatch between Source and target differ in block-size. Use -pb to preserve block-sizes during copy. Alternatively, skip checksum-checks altogether, using -skipCrc. (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)

cp는 하나의 pipeline으로 복사하기 때문에 상당히 속도가 느리다.
반면, distcp는 mapreduce를 이용하기 때문에 속도가 빠르다.
위에서 block-size를 고려해야 하는 이유도 mr을 이용하기 때문이다.

bash hadoop distcp hdfs://A/user/jslee/data hdfs://B/user/jslee/data

shell hadoop fs -cp hdfs://A/user/jslee/data hdfs://B/user/jslee/data

하이브에서 랜덤 샘플링 하는 방법 (셔플링)

하이브에서 수백만 수천만의 행이 있다고 생각을 해보자,
효율적으로 고정된 숫자를 임의로 샘플링을 하고 싶다면, 어떻게 효율적으로 할 수 있을까?

샘플링(Sampling)

sql select * from my_table limit 1000;

아래와 같이 샘플링을 하면, 고정된 1000개의 rows를 위에서 부터 순서대로 데이터를 가져올 수 있다.
하지만, 동일한 방법으로 1000개를 가져오는 쿼리를 수행하면 같은 데이터가 샘플링이 된다.

order by

sql select * from my_table order by rand() limit 1000;

그렇다면, 임의로 정렬을 한뒤에 샘플링을 하면 어떨까?
아래와 같이 샘플링을 하면, 임의로 데이터를 정렬을 한 뒤에 1000개를 추출하기 때문에 매번 다른 결과를 준다.

sort by

sql select * from my_table sort by rand() limit 1000;

하지만 하이브에서는 데이터의 양이 많기 때문에 성능면에서 좋지 않다.
order by 는 하나의 reducer를 사용해서 전체 데이터를 정렬하기 때문에 성능이 상당히 좋지 않다.
order by 대신에 sort by 를 사용하면, 각각의 reducer에서 정렬을 하기 때문에
전체 데이터를 순서는 보장하지 않지만, 각각의 reducer내의 데이터의 순서는 보장하게 된다.

하지만, 하이브에서 여러개의 reducers로 데이터를 splitting하는게 정의가 되어 있지 않기 때문에
파일의 순서에 따라서 랜덤하게 파일이 나뉘어 진다.
limit 절을 사용할 경우에는 reducres에 대해서 정의되어 있지 않기 때문에, round-robins을 통해서
각각의 reducers에 있는 데이터를 mixes해서 가져오게 될 것이다.

데이터의 colummn기반으로 reduce key를 사용하면 limit 절은 reducers의 순서가 된다.
그렇게 되면 샘플들은 extremely skewed 될것이다.

distribute by

sql select * from my_table distribute by rand() sort by rand() limit 10000;

위 문제를 해결하기 위해 "distribute by"를 사용하면 된다.
query의 구조에 따라 reduce key가 결정이 되지 않기 때문에, 정확하게 reduce key를 내가 원하는 값으로 specify 가능하다.
더이상 이제 limit을 생각할 필요가없이 reducer내에 정렬이 가능하다.

filtering map-side

sql select * from my_table where rand() <= 0.0001 distribute by rand() sort by rand() limit 10000;

만약 테이블의 전체 사이즈를 안다면, 쉽게 data에서 랜덤하게 proportion을 가져올 수 있다.
total size가 10,000,000,000이라고 하고, sample을 1,000개 한다고 할때,
전체 데이터에서 0.000001을 가져오면 된다. 만약 where에서 "rand() < 0.000001"을 하면,
10000개의 rows를 ouput으로 출력하게 된다.
결국 병목 현상이 작업을 시작하기위한 간단한 전체 테이블 스캔이되기 때문에 중요하지 않으며,
reducer로 보내지는 볼륨을 기반으로하는 것이 아니기 때문에 더 효율적으로 sampling이 가능하다.

  • [참고]
    • http://www.joefkelley.com/736/

하이브(Hive)에서 특정 컬럼 제외하고 추출

테이블에서 컬럼이 엄청 많은데, 특정 컬럼을 제외하고 추추출하고 싶을때가 있는데
아래와 같이 쿼리를 하면 ds, hr을 제외하고 나머지 컬럼을 가져올 수 있다.

쿼리

sql SELECT `(ds|hr)?+.+` FROM sales

하이브(Hive) 테이블 join 성능 올리기, 최적화, 튜닝하는 방법

들어가며

하이브에서 테이블 조인의 성능을 올리기 위해서는 다양한 방법이 있지만, 크게 두가지 방법에 대해서 설명을 한다. 하이브에서 큰 두개의 테이블을 만약 그냥 조인을 한다면 쿼리를 날리고 다음 날 아침에 와도, 쿼리는 동작하고 있을 것이다.

두개의 테이블을 조인을 해보자

두개의 테이블을 조인을 하기 전에, 각 테이블의 성격을 알아야 한다. 한개의 테이블의 metadata의 성격이 있는 테이블, 즉 작은 테이블의 사이즈라면, 해당 테이블을 메모리에 올린 이후에, 조인을 하면 성능이 올라갈 것이고, 두 테이블이 너무 커서 메모리에 올리기 어렵다면, 조인할 키를 뭉탱이로 뭉쳐서 뭉탱이 끼리 조인을 하면 빠르게 할 수 있다.

  • Map-Side Joins
    • join(큰 테이블, 작은테이블)
    • 여기서 말하는 작은 테이블을, 지역 정보, 유저 정보 등등
    • 여기서 말하는 큰 테이블은, 노출된 로그, 클릭 로그 등등
    • 제목 그대로 Map, Reduce에서 Map side에서 조인을 하는 것을 말한다.
    • 작은 테이블(ex,지역정보)를 메모리에 올리고 큰테이블에 붙이면 끝
  • SMB (Sort Merge Bucket)
    • join(큰 테이블, 큰테이블)
    • 여기서 말하는 큰 테이블은, 노출된 로그, 클릭 로그 등등
    • 여기서 말하는 큰 테이블은, 노출된 로그, 클릭 로그 등등
    • 제목 그대로 cluster에 내가 조인할 키를 뭉탱이로 뭉쳐놓고 bucket을 통째로 조인 (느릴 수가 없다)
    • 이 방법에서 단점은, 순서가 유지되지 않는다는 사실, 만약 내가 timestamp로 row의 order를 갖고 있고 싶은데, 그 전에 조인할 대상으로 bucket을 생성했기 때문에 순서가 망가진다.

References

  • https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization
  • https://acadgild.com/blog/map-side-joins-in-hive/
  • https://grisha.org/blog/2013/04/19/mapjoin-a-simple-way-to-speed-up-your-hive-queries/

하이브(Hive) 테이블 생성

하이브 테이블이 만약 textfile로 되어 있다면,
테이블을 load하거나, hue에서 테이블을 로딩할때 에러가 발생하는 경우가 있다.
malformed ORC 에러가 뜬다면, 하이브 테이블의 타입을 확인할 필요가 있다.
이럴 경우에는 아래와 같이 기존 textfile을 orc테이블의 형태로 생성한 뒤에 export/import를 하면 에러 없이 진행이 된다.

코드

하이브(Hive) 테이블 클러스터간 복사하기

두개의 클러스터(Cluster) A, B가 있다고 가정하고,
A에서 B로 하이브 테이블을 복사하는 방법에 대해서 설명한다.

순서

  • [A]에서 [A]의 HDFS로 테이블 export
  • [A]에서 [B]의 HDFS로 데이터 copy
  • [B]의 HDFS에서 Hive로 데이터 import

코드

+ Recent posts