본문 바로가기
Data Science/분산처리기술

하둡개념 - 정리전

by En.Lee 2014. 11. 23.

③ HDFS 명령어

HDFS는 일반 Unix/Linux의 파일시스템과는 전혀 별개이다. DataNode 데몬을 수행하는 기기에서 ls 명령을 수행하면 일반 Linux 파일시스템의 내용은 보이지만 HDFS의 파일은 보이지 않는다. 마찬가지로 fopen() 또는 fread()같은 표준의 읽기/쓰기 작업도 불가능하다. 요컨대 파일시스템으로서의 각종 작업에 대해서 HDFS는 HDFS 나름의 독자적인 명령어와 shell 구조를 가지고 있다는 말이다. 이러한 현상은 HDFS가 별도의 독립된 namespace,를 가지기 때문이다. HDFS (정확히는 HDFS를 구성하는 블록) 내의 파일은 DataNode 서비스가 관리하는 별도의 디렉토리에 저장된다. 그리고 이들 파일은 block id로만 표시된다. HDFS에 저장된 파일에 Linux의 일반 파일수정방식을 (예: ls, cp, mv, etc) 적용할 수도 없다. 대신 HDFS는 자체의 파일관리 방식을 가지는데 그 모습은 기존의 방식과 매우 유사하다.

한편 HDFS 파일시스템을 이용하려면 처음 한 차례에 한해 다음 HDFS 명령어를 통해 포맷팅이 되어야 한다.

 

user@namenode:hadoop$ bin/hadoop namenode -format

 

이하에서는 이러한 선행작업이 되었다고 전제한다.

한편 Hadoop의 작업에서는 일반적으로 데이터 파일이 Hadoop 이외의 곳에서 생성되는 것을 전제로 하는 경우가 많다. 즉, 별도의 텍스트 파일이 있다거나, 또는 각종의 log 파일이 외부에서 이미 생성되었다고 보고 이를 HDFS로 복사하는 방식으로 읽어 들이는 것이다. 그리고 파일을 HDFS로 읽어 들인 후에도 MapReduce 프로그램이 이를 처리할 때는 MapReduce 응용프로그램에서 직접 읽는 대신 Hadoop의 MapReduce 프레임워크를 통해 HDFS 파일을 (key/value pair형태의) 개별 레코드 형식으로 파싱(parsing)하여 사용하게 된다.

기본적인 파일 shell 명령어

Hadoop의 파일 명령어의 표준형은 다음과 같다.

 

Hadoop fs –cmd <args>

 

여기서 cmd는 구체적인 명령어를 그리고 <args>는 매개변수(argument)를 나타낸다.

다만 명령어 cmd는 일반 Unix/Linux의 명령어와 그 형태가 매우 유사하다. 예를 들어서 특정 디렉토리 내에서의 파일의 목록을 보려면:

 

Hadoop fs –ls

 

라고 하면 된다.

HDFS의 디폴트 작업 디렉토리는 /usr/$USER 이지만 (단, $USER는 login 한 사용자 이름을 뜻함) 그렇다고 그 디렉토리가 자동으로 만들어지지는 않으므로 다음의 명령어를 통해 이를 직접 만들어 주어야 한다.

 

Hadoop fs –mkdir /user/hkyoon

 

Linux/Unix 상에 존재하는 로컬파일을 HDFS로 복사해 오는 명령어는 다음과 같다.

 

Hadoop fs –put example.txt

 

앞서 ls 명령을 재귀적으로 사용하여 하위 디렉토리 내의 목록까지 함께 보려면 lsr 명령어를 이용한다.

 

$ hadoop fs -lsr /

drwxr-xr-x – hkyoon supergroup 0 2013-01-14 10:23 /user

drwxr-xr-x – hkyoon supergroup 0 2013-01-14 11:02 /user/hkyoon

-rw-r–r– 1 hkyoon supergroup 264 2013-01-14 11:02 /user/hkyoon/example.txt

 

위 목록에서 마지막 줄의 소유자 이름 (hkyoon) 앞의 1이란 숫자가 복제본 개수(replication factor)를 나타낸다.

앞서의 put과 반대로 HDFS로부터 로컬시스템의 파일로 가져오는 명령으로 get이 있다. 한편 파일의 내용을 보려면 cat이라는 명령어를 사용한다. Unix/Linux의 catalog에 해당한다. 다음에서는 여기에 head라는 Linux 명령을 pipeline으로 연결하여 적용했다.

 

hadoop fs -cat example.txt | head

 

이외에도 다음과 같은 다양한 HDFS의 파일명령어가 제공된다.

catchgrpchmodchown
copyFromLocalcopyToLocalcpdu
dusexpungegetgetmerge
lslsrmkdirmovefromLocal
mvputrmRmr
setrepStattailtest
texttouchz  

이들의 용법에 대해서는 http://hadoop.apache.org/docs/r0.18.3/hdfs_shell.html 참조.

프로그램을 이용한 HDFS 파일의 이용

Hadoop에서는 파일열기(open), 읽기(read), 기록하기 (write), 파일 닫기(close) 등을 비롯해 각종의 작업을 할 수 있는 HDFS API가 제공되므로 이를 이용할 수 있다. 이를 위해서는 우선 org.apache.hadoop.fs 의 package와 함께 필요한 API를 프로그램 내에서 이용하는데 여기서는 세부 내용을 생략한다.

hadoop이 제공하는 다양한 api는 apache 내 hadoop 사이트의 관련 문건을 보면 된다. (http://hadoop.apache.org/docs/current/api/ )

아래에서는 hadoop.txt라는 이름의 파일을 만든 후 “My First Hadoop API call!\n”라는 문자열을 기록하고 다시 이를 읽어서 화면에 출력하는 프로그램이다. (전체적 흐름에 대한 이해를 돕기 위해 주석 및 package import 부분은 생략하였음)

 

public class HDFSExample {

public static final String FileName = “hadoop.txt”;

public static final String message = “My First Hadoop API call!\n”;

 

public static void main (String [] args) throws IOException {

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);

Path filenamePath = new Path(theFilename);

 

try {

if (fs.exists(filenamePath)) {

fs.delete(filenamePath);

}

 

FSDataOutputStream out = fs.create(filenamePath);

out.writeUTF(message);

out.close();

 

//Open Config file to read

FSDataInputStream in = fs.open(filenamePath);

String messageIn = in.readUTF();

System.out.print(messageIn);

in.close();

} catch (IOException ioe) {

System.err.println(“IOException during operation: ”
+ ioe.toString());

System.exit(1);

}

}

}

(2) MapReduce

①MapReduce 기본개념

MapReduce는 Hadoop의 프로그래밍 패러다임으로서 입력되는 데이터 리스트(list)를 출력 리스트(list)로 변형(transform)시킴에 있어 과정 전체를 한번에 처리하지 않고 크게 두 덩어리로 나눈 후 한 단계의 작업이 완료되면 그 다음의 단계로 이어지도록 하는 것을 말한다. 마치 Unix/Linux에서의 pipeline이 여러 개의 명령어를 연결시켜서 차례로 수행하도록 하는 것처럼 MapReduce 프로그램은 데이터 처리작업을 Map과reduce라는 두 번에 걸쳐 실시한다. 그리고 이 과정에서 partitioning과 shuffling 작업이 보조적으로 수행된다.

 

그림: 입력 list의 각 항목에 Mapping 함수를 적용되어 새로운 출력 list가 만들어진다.

프로그래머는 mapper와 reducer 에 자신의 필요한 작업내용을 프로그램 형태로 지정한 후 Hadoop 에서 수행시키면 이들 mapper와 reducer가 partitioning 및 shuffling의 지원을 받아가면서 분산 처리되는 것이다. 구체적으로는:

mapper 단계에서 먼저 입력 파일을 분할하여 여러 node에 (중복) 배분한 후 각각 할당된 데이터를 처리한다.

 

Reducer는 mapper의 처리결과를 넘겨받은 후 이들을 종합하여 최종결과물을 만들어 내다.

 

그림: 색깔 별로 각각의 key를 나타낸다고 가정하였다. 결국 같은 key를 가진 value는 하나로 통합된다.

다음의 예에서는 몇 가지 단어가 수록된 입력파일을 분석해서 단어별 빈도수를 조사하는 모습을 그림으로 표현하였다.

 

Haddop의 데이터 타입

MapReduce에서는 사용자가 임의로 타입을 지정하거나 또는 Java의 기본 데이터 타입을 그대로 이용하는 것을 허용하지 않으므로 반드시 MapReduce 프레임워크가 지원하는 데이터 타입을 이용해야 한다. Hadoop에서 key 와 value에 대해 적용할 수 있는 데이터 타입의 규칙은 다음과 같다.

n   Writable interface를 실행하는 class는 value가 될 수 있다.

n   WritableComparable<T> interface를 실행하는 class는 key 또는 value 모두가 될 수 있다.

n   (참고로 WritableComparable<T> interface는

n   Writable과 java.lang.Comparable<T> interface를 합한 것이다.

n   이처럼 key를 정의할 때 비교가능성(comparability)가 필요한 것은 value와는 달리 이들이 reduce 단계에서 정렬되기 때문이다.

다음은 Hadoop이 기본으로 제공하는 데이터 타입인데 이들은 Java 표준 데이터타입에 대한 wrapper로서 WritableComparable interface를 구현하며 key/value pair로 많이 이용된다.

BooleanWritableByteWritableDoubleWritable
FloatWritableIntWritableLongWritable
TextNullWritable 

 

물론 이 밖에 필요에 따라 위의 interface를 구현하여 자신만의 데이터타입을 만들어 사용할 수 있다.

Mapper 함수와 Reducer 함수

Mapper로 사용하기 위해서 해당 class를 작성할 때는 Mapper interface를 구현하면서 MapReduce base class를 확장해야 한다. MapReduce class는 mapper 및 reducer 모두의 base class로서 다음의 2개 method를 가진다.

n   void configure(JobConf job)

n   void close( )

Mapper interface는 map() 이라는 단 하나의 method를 가지는데 그 signature는 다음과 같다.

 

void map(K1 key,

V1 value,

OutputCollector<K2,V2> output,

Reporter reporter

) throws IOException

 

다음은 Hadoop에서 미리 제공하는 Mapper의 실행 class이다. (각각의 의미는http://hadoop.apache.org/docs 참조.)

 

IdentityMapper<K,V>InverseMapper<K,V>
RegexMapper<K>TokenCounterMapper<K>

 

Reducer 역시 MapReduce base classs를 확장하면서 Reducer interface를 구현해야 한다. Reducer interface 역시 reduce() method를 가지며 그 signature는 다음과 같다.

void reduce(K2 key,

Iterator<V2> values,

OutputCollector<K3,V3> output,

Reporter reporter

) throws IOException

다음은 Hadoop에서 미리 제공하는 Reducer의 실행 class이다.

(http://hadoop.apache.org/docs 참조.)

 

IdentityReducer<K,V>LongSumReducer<K>
Word Count의 예

이제 조금 구체적인 예제 프로그램을 살펴 본다. 파일의 내용을 분석하여 출현하는 단어 별 빈도를 출력하는 것으로서 Hadoop을 설치하면 함께 오며 위치는 src/examples/org/apache/hadoop/examples/WordCount.java 이다. 다만, 여기서는 한글 데이터 파일을 대상으로 하는 것으로 가정하였고 프로그램은 대폭 단순화 하였다.

우선 다음 시(詩)를 내용으로 하는 파일이 있다고 가정한다.

 

때로 마추친 책과 사람들에게서..

주눅이 들게 하는 글을 만납니다..

주눅이 들게 하는 사람을 만납니다..

 

이 시에 나오는 단어 별 빈도는 다음과 같다.[5]

 

때로1
마주친1
책과1
사람들에게서1
주눅이2
들게2
글을1
만납니다2
사람을1

 

이러한 단어의 빈도분석 프로그램의 로직은 다음과 같이 표현할 수 있다.

 

define WordFrequency as FrequencyTable;

for each document in documentSet {

T = tokenize(document);

for each token in T {

WordFrequency [Word]++;

}

}

display(WordFrequency);

 

위의 분배단계에서 빈도를 저장할 WordFrequency라는 테이블을 지정한 후 각각의 문서 (여기는 하나의 문서)에 대해 분석을 하였다. Tokenize는 문장을 단어별로 분절하는 함수로서 각각의 단어가 발견될 때마다 WordFrequency[Word]를 1씩 증가시켰다.

이제 이러한 작업을 500페이지의 책 1,000권에 대해 수행한다고 하자. 이런 단순한 작업만도 웬만한 컴퓨터로 며칠씩 걸릴 것이다. 심지어 데이터가 수시로 추가/변경되고 또는 더 복잡하고 세련된 분석을 한다면 뭔가 다른 방법을 강구하지 않으면 안 된다. 따라서 위 로직을 프로그램을 여러 대의 컴퓨터에서 동시에 수행시킨 후 결과를 취합하려면 다음과 같이 수정한다.

 

(분배처리 단계)

define WordFrequency as FrequencyTable;

for each document in documentSubset {

T = tokenize(document);

for each Word in T {

WordFrequency[Word]++;

}

}

partition_and_sort(WordFrequency);

sendToSecondPhase(WordFrequency);

 

(결과합산 단계)

 

define totalWordFrequency as FrequencyTable;

for each WordFrequency received from firstPhase {

FrequencyTableAdd (totalWordFrequency, WordFrequency);

}

 

여기서는 Partition_and_sort()을 통해 앞의 처리결과 WordFrequency[Word] 를 가/나/다/… 순서로 나눈 후 정렬하고 (여기서는 중간합산하는 등의 추가작업은 생략하였다.) 그 결과를 다음 단계에 전달하는 것으로 하였다. 한편 합산단계에서는 이들을 전체목록 (totalWordFrequency)에 추가하였다.

위 “분배처리 단계”가 MapReduce 에서의 mapping 단계에 해당하고, “결과합산 단계”는 reducing 단계에 해당한다. 결국 mapping 단계는 filter 및 변환의 단계이고 reducing 단계는 종합(합산)의 단계라고 할 수 있다.

한편 mapper와 reducer 그리고 partitioning과 shuffling의 제반 작업이 매끄럽게 이루어지려면 이들 각각이 처리하고 다음 단계로 넘겨주는 데이터가 일정한 구조를 가지는 것이 필요하다. 실제로 이들 mapping 단계와 reducing 단계에 각각 그리고 이들 단계의 연결과정에서 모든 데이터는 (key/value)의 list로 처리된다. 대상 데이터 내용이 어떠하든, mapper, reducer에 지정된 업무로직이 어떠하든 효율적으로 수행되기 위해서 MapReduce에서는 주된 데이터 포맷(data primitive)으로 list와 (key/value) pair를 이용하는 것이다. 다시 말하면 MapReduce에서는 그 어떤 값도 독립해서 존재하지 않으며 모든 값에는 자신에 적용되는 key 가 존재한다. 또한 모든 Key는 관련된 값을 식별해준다. 다음 그림에 단계별로 전달되는 데이터의 (key/value) pair의 형태와 그 설명과 함께 나타나 있다.

 

 

얼핏 보면 매우 복잡해 보이지만 우리는 여기서 기본 규칙을 발견할 수 있다. 즉, 데이터 처리 작업을 map과 reduce의 단계로 나눈 후 각각의 처리 로직을 Hadoop이 지정한 API를 이용하여 프로그램 작성한다. 이때 데이터에 대해서는 포맷을 (kevy/value) pair로 하되 이들이 단계별로 이동할 때 미리 정한 규칙을 따르게 하면 이후의 전반적인 처리는 Hadoop 의 MapReduce에서 관리하고 조정한다. 즉, 개발자가 이들 규칙에 충실히 프로그램을 작성하면 하나의 작업을 수백, 수천 대 컴퓨터에 동시병렬적으로 처리할 수도 있고 결과적으로 전체적인 처리성능은 무한히 커질 수 있는 것이다.

이제 WordCount 프로그램에 대해 이전에 설계했던 작업 로직을 좀 더 MapReduce 규칙에 부합하는 용어로 (여전히 pseudo 코드로) 작성한 것이 다음에 나와 있다.

 

map(String filename, String document) {

for each document in documentSets{

line = readLine(document)

List<String> T = tokenize(document);

for each token in T {

emit ((String)token, (Integer) 1);

}

}

}

reduce(String token, List<Integer> values) {

Integer sum = 0;

for each value in values {

sum = sum + value;

}

emit ((String)token, (Integer) sum);

}

map()과 reduce()함수 모두 출력물은 리스트(list) 형태를 가진다는 점만 유념한다면 이 코드는 앞서의 것과 매우 유사함을 알 수 있다. 즉, 이제 분산환경에서 여러 컴퓨터에서 이들 작업을 나누어 할 수 있는 기초가 마련된 것이다. 이제 pseudo 코드가 아닌 실제 구동되는 Hadoop에서의 Java 코드를 보기로 한다.

이제 위의 로직을 구현한 최종적인 프로그램의 모습은 다음과 같다. (단, 이해 편의를 위해 package import, exception, compiler annotation 및 주석 부분을 생략하였다.)

 

public class WordCount extends Configured implements Tool {

public static class WordCountMap

extends Mapper<LongWritable, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value, Context context) {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while(tokenizer.hasMoreTokens()){

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

}

public static class WordCountReducer

extends Reducer<Text, IntWritable, Text, IntWritable>{

public void reduce(Text key,

Iterable<IntWritable> values,

Context context){

int sum = 0;

for(IntWritable value: values){

sum += value.get();

}

context.write(key, new IntWritable(sum));

}

}

public int run(String[] args) {

Job job = new Job(getConf());

job.setJarByClass(WordCount.class);

job.setJobName(“wordcount”);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setMapperClass(WordCountMap.class);

job.setCombinerClass(WordCountReducer.class);

job.setReducerClass(WordCountReducer.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job,

new Path(args[0]));

FileOutputFormat.setOutputPath(job,

new Path(args[1]));

boolean success = job.waitForCompletion(true);

return success ? 0: 1;

}

 

public static void main(String[] args) {

int result = ToolRunner.run(new WordCount(), args);

System.exit(result);

}

}

Hadoop의 모든 MapReduce 프로그래밍은 실제로 위와 같은 모습을 가진다. 따라서 일일이 새로 작성하지 않고 앞에서와 같은 프로그램 template을 변형하여 이용하는 것이 보통이다.

 

또 다른 예로서 여러 대 자동차에서 측정된 각각의 시간별 속도계 정보가 log 파일에 담겼다고 하자. 아마도 번호판을 key로 하고 관련된 정보가 기록되어 있을 것이다..

AAA-123   65mph, 12:00pm

ZZZ-789   50mph, 12:02pm

AAA-123   40mph, 12:05pm

CCC-456   25mph, 12:15pm

Hadoop에서의 mapping 및 reducing 함수는 값(values)만을 받을 수는 없고 언제나 (key, value) 의 pair를 받는다. 이들 각각의 함수의 출력 역시 동일한 원칙이 적용되며 이들 모두 key 와 value 가 다음 단계의 데이터 흐름에서의 list 형태로 산출(emitted)된다.

다만 MapReduce에서는 Mapper과 Reducer의 동작방식이 다른 언어보다는 덜 엄격하다고 할 수 있다. 엄격한 의미의 functional mapping과 reducing에서는 하나의 mapper가 반드시 각각의 입력 항목에 대해 하나의 출력 항목을 산출하고 reducer 역시 하나의 입력 list에 대해 하나의 출력항목만을 만들어 내야한다. 그러나 MapReduce에서는 각각의 단계에서 임의의 수의 (0,1, 또는 심지어 수 백개의) 값들이 산출될 수 있다. reducer 도 마찬가지로 임의의 수의 값을 출력할 수 있다. 단계별 처리과정에서 Key를 기준으로 reduce space가 분할되며 reducing 함수는 많은 값들의 list를 하나의 값으로 변환시킨다.

② MapReduce의 주요 데몬

JobTracker

실제 컴퓨팅 작업을 하는 MapReduce 데몬들도 HDFS와 마찬가지로 master-slave의 구조를 가지며 JobTracker는 master의 역할을, 그리고 TaskTracker는 slave node의 역할을 담당한다.

JobTracker는 응용프로그램과 Hadoop사이의 중간연락을 하는 daemon으로서 파일처리를 위한 실행계획, node할당, task monitoring 등을 담당한다. 즉, 프로그램을 수행시키면 JobTracker가 대상 파일을 찾아낸 후 각각의 node에게 작업할 내용을 할당한 후 이것이 제대로 수행되고 있는지 감독한다. 그리고 이 과정에서 작업이 실패하면 다른 node에게 재 작업을 지시한다. JobTracker는 master node가 이를 수행하는데 Hadoop 클러스터 내에는 단 1개의 JobTracker 데몬이 존재한다.

TaskTracker

(JobTracker가 MapReduce의 master기능을 수행하는 것에 대응하여) TaskTracker는 JobTracker가 지시한 사항을 성실하게 집행한다. 그리고 (HDFS에서의 NameNode와 DataNode와 마찬가지로) 이들 TaskTracker는 JobTracker와 지속적으로 통신하면서 정상적인 동작 유무를 확인할 수 있게 하는데 이상 발생 시 JobTracker는 해당 작업을 클러스터 내의 다른 node에게 중복하여 수행할 것을 지시하게 된다. 특기할 것은 각각의 slave node에는 한 개의 TaskTracker가 존재하지만 이 하나의 TaskTracker는 여러 개의 JVM을 생성시켜서 각각의 slave node 내에서 여러 개의 map task와 reduce task를 병렬적으로 수행시키게 된다.

다음 그림은 1대의 master (NameNode와 JobTracker가 수행됨)와 4대의 slave node (DataNode와 TaskTracker가 수행됨)가 이용되는 모습으로서 백업용의 secondary NameNode가 존재함을 알 수 있다. 한편 이들 master와 slave node들 상호간 통신에서는 SSH 채널이 이용된다.

 

그동안의 Map/Reduce 전체를 정리하여 도식화 한 그림은 다음과 같다.

n   HDFS 클러스터에 파일이 적재됨으로써 MapReduce 입력이 시작된다. 이들 파일은 전체 node에 균등하게 배분되는데 이에 대해 MapReduce 프로그램이 수행되면서 node에서는 mapping task가 시작된다.

n   Mapping작업에서 각각의 task는 동등한 것으로서 서로 구별되지 않으며 각 mapper는 그 어떤 입력파일도 처리 가능하다. 각각의 mapper는 자신 컴퓨터 근처에 존재하는 파일을 적재한 후 곧바로 처리에 들어간다.

n   mapping 이 끝나면 중간산출물로서의 intermediate (key, value) pair가 각 컴퓨터 사이에서 교환되고 같은 key를 가지는 모든 value들은 하나의 reducer에게 보내진다.

n   Reducer에서는 서로 다른 여러 개의 mapper로부터의 산출물이 하나로 병합된다.[6]

 

 

 

끝으로 2004년 Google에서 발표한 MapReduce 논문(http://research.google.com/archive/mapreduce-osdi04.pdf ) 속의 그림을 소개한다. 앞서의 설명과 그림 안에 기재되어 있는 각 번호의 설명을 통해 그 내용을 이해할 수 있다.

 

④ MapReduce의 데이터 흐름

Hadoop에서의 MapReduce의 중요성을 감안하여 작업 프로세스가 아닌 데이터의 흐름을 중심으로 정리하여 본다. 아래 그림에서는 각 단계에서의 데이터 흐름을 pipeline으로 보여주고 있다.

 

 

Figure 4.5: Hadoop MapReduce의 데이터 흐름에 대한 상세한 모습

입력 파일:

MapReduce task를 위한 데이터는 보통 HDFS상에 존재한다. 파일 포맷은 그때 그때 다른데 line기반의 log 파일, binary 포맷, multiline의 입력 레코드 등 그 어떤 것도 가능하다. 다만, 용량은 매우 클 것이다.

InputFormat:

입력파일이 분할(split)되는 방식이나 읽어 들이는 방식을 정의하는 class이다.

n   입력으로 사용될 파일 또는 기타의 object를 선택한다.

n   파일을 task로 분해할 InputSplits 을 정의한다.

n   파일을 읽어 들이는 RecordReader 를 생성하는 factory를 제공한다.

Hadoop은 여러 개의 InputFormat 관련 class을 제공한다. FileInputFormat 라는 이름의abstract type이 존재하며; 파일을 대상으로 하는 모든 InputFormat은 이 class로부터 파생된다. Hadoop의 job을 수행하기 시작할 때 FileInputFormat에 읽으려는 파일의 경로를 제시하면 FileInputFormat이 이 디렉토리에 있는 모든 파일을 읽어 들인다. 그런 후 각각의 파일을 여러 개의 InputSplit으로 분해한다. 개발자는 입력파일에 어떤 InputFormat을 적용할지를 JobConf object의 setInputFormat() method를 호출하는 방식으로 정의한다. 표준 InputFormat이 다음 표에 나와 있다.

 

InputFormat

설명

Key

Value

TextInputFormatDefault 포맷이며 텍스트 파일의 각 line을 읽어들인다.각 line의 byte offset각 line의 내용
KeyValueInputFormat각 line을 key, val pair로 parse한다.첫째 tab 문자까지의 모든 내용line의 나머지 내용
SequenceFileInputFormatHadoop고유의 고성능 바이너리 포맷사용자 정의사용자 정의

n   TextInputFormat은 입력 파일의 각 line을 별개의 레코드로 취급하지만 별도의 parsing작업은 하지 않는다. 이것은 특히 unformatted 데이터 또는 로그파일과 같은 line기반의 레코드 등에 유용하다.

n   KeyValueInputFormat 역시 입력파일의 각각의 line을 별개의 레코드로 취급한다. 다만 TextInputFormat이 line 전체를 하나의 값(value)으로 여기는 반면 KeyValueInputFormat은 각 line을 tab 문자를 기준으로 key와 value로 분해한다는 점이 다르다.

n   SequenceFileInputFormat 은 Hadoop에 특수한 바이너리 파일을 읽어 들이는데 Hadoop mapper에 고속으로 읽어 들일 수 있는 몇 가지 부가기능이 제공된다.

InputSplits:

InputSplit은 MapReduce 프로그램에서 map task의 작업단위가 된다. Data set에 적용되는 MapReduce 프로그램은 이를 총체적으로 Job이라고 부르는데 이 job은 수 백 개의 task로 구성된다. Map task는 파일 전체 또는 일부분만도 읽을 수 있다. 디폴트 상태에서 FileInputFormat과 하위 class는 파일을 64 MB 단위의 chunk (HDFS에서의 블록 크기와 동일)로 분할한다. 이 값은 hadoop-site.xml에서 mapred.min.split.size 파라미터를 이용하거나 특정 JobConf object에서 파라미터를 override함으로써 변경할 수 있다. 파일을 chunk 단위로 처리하면 하나의 파일에 대해 여러 개의 map task를 병렬 수행할 수 있어서 성능을 획기적으로 높일 수 있다. 또한 각각의 블록을 한 node에서 다른 node로 옮길 필요 없이 파일을 구성하는 다양한 블록을 클러스터 내의 여러 node에 분배하여 해당 node에서 직접 (locally) 처리할 수 있다. 파일포맷이 chunk단위로 쪼개기 어려울 경우에는 custom의 InputFormat을 작성해서 파일분할의 조건과 방법을 지정할 수도 있다.

RecordReader:

InputSplit을 통해 일의 단위가 지정되지만 그 액세스하는 방법은 정의되지 않는다. 데이터를 source에서 실제로 적재한 후 이를 Mapper가 읽기 수월한 (key, value) pair로 변환하는 일은 RecordReader class가 담당한다. RecordReader instance는 InputFormat에 의해 정의된다. 디폴트의 InputFormat인 TextInputFormat 은 LineRecordReader를 제공하는데 여기서는 입력파일의 각각의 line을 새로운 값(value)로 취급한다. 각 line에 대한 key는 파일에서의 byte offset이다. 입력 시 InputSplit 전체가 완료될 때까지 RecordReader는 계속 호출(invoke)된다. RecordReader가 호출되면 Mapper의 map() method 역시 호출된다.

Mapper

Key와 value가 주어지면 map() method는 (key, value) pair(s)를 Reducer에게 전달한다. 새로운 Mapper instance는 각각의 map task (InputSplit)에 대한 별도의 Java 프로세스 속에서 만들어진다. 이러한 map task는 전체적으로 한의 job input을 구성한다. 각각의 mapper는 다른 mapper와 어떤 통신도 하지 않는다. 이를 통해 각 map task의 신뢰성이 로컬 기기의 신뢰성에 전적으로 좌우된이다. map() method는 key와 value 이외에 2개의 파라미터를 전달받는다.

n   OutputCollector object는 collect() 를 통해 (key, value) pair를 job의 reduce 단계로 전달해 준다.

n   Reporter object 는 현재 task에 대한 정보를 제공한다. Reporter의 getInputSplit() 는 현재의 InputSplit을 설명하는 object를 반환한다. 또한 map task로 하여금 진행상태에 대한 정보를 시스템 내 다른 요소에게 제공할 수도 있다. setStatus() method 를 통해 사용자에게 상태메시지를 제공할 수도 있다. incrCounter()를 통해서는 shared performance counter를 증가시킬 수도 있다.

Partition & Shuffle:

첫 번째 map task가 종료했지만 각 node들은 여러 개의 다른 map task들을 수행하는 경우에도 map task로부터의 중간산출물을 이를 필요로 하는 reducer에게로 전달하기 시작한다. 이처럼 map의 산출물을 reducer에게로 옮기는 것을 shuffling한다고 한다. 중간단계 key space의 일부가 각각의 reduce node에 할당된다. 이들 subset (이를 “partition”이라고 함)들은 reduce task에게 입력된다. 각 map task는 그 어떤 partition에도 (key, value) pair를 전달할 수 있다. 하나의 key에 대한 모든 값은 항상 그 origin이 어떤 mapper였든 상관없이 병합(reduced together)된다. 따라서 중간산출 데이터의 각 항목을 어디로 보낼지에 대해 map node는 의견 일치를 보아야 한다. Partitioner class는 주어진 (key, value) pair가 어떤 partition으로 갈지를 결정하는데 디폴트의 partitioner는 key에 대한 hash 값을 계산한 후 그 결과에 따라 partition을 할당한다.

Combiner:

Combiner를 이용하면 MapReduce job이 사용하는 대역폭을 절감할 수 있다. Combiner 는 Mapper와 Reducer사이에서 진행되는 것으로서 선택적 적용이 가능한데 이 경우 map task가 수행되는 모든 node에 대해 Combiner class의 instance가 적용된다. 각각의 node에서 Mapper instance가 산출한 데이터를 입력 받고 Combiner가 지정된 작업을 하면 그 결과물을 Reducer에게 보낸다, Combiner는 일종의 “mini-reduce” 프로세스로서 하나의 단위 컴퓨터에서 생성된 데이터만을 대상으로 한다. 이 밖에 Fault Tolerant기능과 Checkpoint 기능이 있다.

앞서의 Word count 프로그램을 예로 들면 각 단어가 발견될 때마다 (word, 1) pair를 산출했는데 예컨대 “주눅이”라는 단어가 2번 발견되면 (“주눅이”, 1) pair가 2번 출력되고 따라서 Reducer에게도 2번 전달된다. 그러나, Combiner를 통해 이들이 단위 컴퓨터에서 합산되어서 (“주눅이”, 2) pair가 단 한번만 Reducer에게 전달된다. 이처럼 모든 node 에서 여러 번 반복해서 전달되던 항목을 중간합산 하여 각 단어 당 한번씩만 전달됨으로써 shuffle 프로세스에서 요구되는 대역폭을 획기적으로 줄일 수 있게 되고 결과적으로 job 처리속도가 개선된다. 이것 역시 별도의 프로그래밍 작업 없이 driver 프로그램에 다음의 한 줄만 삽입하면 MapReduce 프레임워크가 자동 진행해 준다.

conf.setCombinerClass(Reduce.class);

 

 

Combiner step이 MapReduce의 데이터 흐름에 포함되었다.

Sort:

각각의 reduce task는 여러 개의 중간 key에 관련된 value를 합산(reduce)한다. 개별 node에서의 일련의 중간 key는 Hadoop이 이를 자동으로 정렬한 후 Reducer에게 보내진다.

Reduce:

각각의 reduce task에 대해 Reducer instance가 만들어진다. Reducer instance는 사용자가 제공하는 instance로서 job별로 중요한 2번째 단계가 된다. Reducer에게 할당된 partition에서의 각각의 key에 대해 Reducer의 reduce() method 는 단 한번 호출되는데 이를 통해 key에 연결된 모든 value에 대한 iterator와 key를 받는다. iterator에 의해 하나의 key와 관련된 value들이 반환될 때 그 순서는 무작위이다. Reducer는 또한 OutputCollector 와 Reporter object를 파라미터의 형식으로 받게 되는데 이들은 map() method에서와 같은 방식으로 이용된다.

OutputFormat:

OutputCollector에게 제공되는 (key, value) pair는 출력파일에 기록된다. 실제 기록되는 방식은 OutputFormat에 의해 결정된다. OutputFormat 은 앞서의 InputFormat class 와 같은 방식으로 동작한다. Hadoop이 제공하는 OutputFormat의 instance는 로컬디스크 또는 HDFS상의 파일에 기록된다. 이들 모두 일반적인 FileOutputFormat에서 상속된 것이다. 각각의 Reducer는 각각의 파일을 일반적인 출력 디렉토리에 기록한다. 이들 파일은 통상 part-nnnnn 라는 이름을 가진다. (nnnnn 는 reduce task와 관련된 partition id이다.) 출력 디렉토리는 FileOutputFormat.setOutputPath() method에 의해 결정된다. 특별한 OutputFormat을 사용하려는 경우에는 MapReduce job을 정의하는 JobConf object 의 setOutputFormat() method를 통해 지정한다. 제공되는 OutputFormat은 다음과 같다.

 

OutputFormat설명
TextOutputFormatDefault; line을 “key \t value” 형태로 기록한다
SequenceFileOutputFormat뒤에 오는 MapReduce job으로 읽어 들이기에 적당한 형태의 바이너리 파일로 기록한다.
NullOutputFormat입력을 무시한다

Hadoop은 파일에 기록하기 위한 몇 가지 OutputFormat을 제공한다. 디폴트 상태의 instance는 TextOutputFormat으로서 텍스트파일의 각 line에 (key, value) pair를 기록한다. 이를 나중에 MapReduce task를 통해 KeyValueInputFormat class로 다시 읽어들일 수 있는데 이는 사람도 읽을 수도 있다. MapReduce job들 상호간에 이용할 수 있는 더 좋은 중간 포맷이 SequenceFileOutputFormat 인데 이는 임의의 데이터 타입을 파일로 신속하게 serialize해 준다; 이에 대응되는 SequenceFileInputFormat 은 파일을 같은 타입으로 deserialize 하고 앞서의 Reducer가 산출했던 것과 같은 방식으로 다음 Mapper에게 전달한다. NullOutputFormat 은 아무런 출력파일을 만들지 않으며 OutputCollector에 의해 전달받은 (key, value) pair를 무시한다. 이는 reduce() method에서 독자의 출력파일에 기록하고 Hadoop 프레임워크에 의해 추가의 빈 출력파일이 만들지 않으려는 경우 유용하다.

RecordWriter:

InputFormat이 실제로 개별 레코드를 RecordReader 실행을 통해 읽는 것과 마찬가지로 OutputFormat class도 RecordWriter object에 대한 factory역할을 한다. 이들은 OutputFormat에 지정된 대로.개별 레코드를 파일에 기록하는데 이용된다. Reducer에 의해 작성된 출력파일은 HDFS에 남아있으므로 다른 MapReduce job 또는 별도의 프로그램 또는 사용자의 직접개입을 통해 이용할 수 있다.



[1] HDFS에서의 데이터 분배 방식; HDFS는 입력 데이터가 발생하면 즉시 그 파일을 쪼개서 각각의 node에 분배한다. 이때 하나의 파일로부터 만들어진 여러 개 조각을 각각 chunk라고 부르는데 특기할 것은 이들 chunk를 각각의 node에 분배함에 있어 여러 개로 복제하여 분배한다는 점이다. 즉, A라는 이름의 파일을 3조각 내었다면 (이들 각각을 A0, A1, A2라면) 이들 각각의 chunk는 여러 개 (통상 3개 이상)로 복제하여 즉, A0’/A0’’/A0’’’, A1’/A1’’/A1’’’, A2’/A2’’/A2’’’의 여러 복제본을 node에 분배하는 것이다. 그리고 한 걸음 나아가 이들 각각의 복제된 조각에 대해서 이를 분배받은 node는 아무런 차별 없이 동일하게 지시 받은 작업을 수행한다. 따라서 특정 node에 장애가 발생하였을 경우 동일한 내용의 파일조각 (즉, chunk)이 다른 node에 존재할 뿐만 아니라 이미 동시에 수행되고 있는 상태여서 얼마든지 장애node의 내용을 다른 node에서의 동일내용 chunk로 대체할 수 있는 것이다. 물론 이처럼 장애에 대처하기 위해서 전체적인 상황을 감독하는 컴퓨터 (monitoring system)는 데이터를 re-replicate하는 등 끊임없이 관리작업을 수행한다.

[2] 원래 dataset는 특히 행(row)과 열(column) 형식으로 체계화된 데이터를 말하지만 여기서는 (즉, Hadoop에 관한 논의 전체에서) Hadoop의 응용프로그램의 작업대상이 되는 데이터를 뜻한다고 보면 된다.

[3] 단, secondary NameNode의 작업은 실제 HDFS에 대해 이루어지는 것이 아니고 NameNode와의 연락만을 유지한다. 단지, 주기적으로 HDFS의 메타데이터를 유지관리함으로써 유사시 NameNode가 제 역할을 하지 못할 경우를 대비한다. 실제로 이상 시에는 관리자가 직접 secondary NameNode를 구동시켜 주어야 한다.

[4] 사용자가 직접 제어하지 않고, 백그라운드에서 돌면서 여러 작업을 하는 프로그램을 말한다.

[5] 문장에서 단어를 추출하는 것을 tokenize한다고 하는데 이는 검색을 위한 색인추출 (indexing) 의 핵심과정이다. 다만 실제에서는 어근과 어미의 분리, 단수형과 복수형의 처리 및 현재형/과거형/미래형의 처리 등 형태소 분석 문제가 발생할 수 있지만 여기서는 이를 생략하였다. 논의의 주제를 해칠 수 있기 때문이다.

[6] 사실 Hadoop에서 node들 상호간에 통신을 전혀 하지 않는 것은 아니다. 단지 기존 분산시스템에서는 node들 사이의 byte스트림에 대해 프로그래머가 socket을 이용하거나 MPI 버퍼를 통해 명시적으로 marshaling 해야 했던데 반해서 Hadoop에서의 node간 통신은 묵시적으로 이루어진다는 점에 차이가 있을 뿐이다. 데이터는 key 이름 별로 tagging되어서 Hadoop으로 하여금 관련 정보를 목표 node에 어떻게 전달할 것인가를 알려준다. 다른 한편으로 Hadoop은 내부적으로 모든 데이터 전송 및 클러스터의 topology 문제를 관리한다. Node 들 사이의 통신을 억제함으로써 Hadoop은 보다 안정적이 된다. 개별 node의 장애 발생 시 Hadoop은 다른 기기에서 task를 통째로 재 수행시키는 방식으로 문제를 우회한다. 사용자 차원의 task가 다른 task와 명시적 통신을 하지 않으므로 응용프로그램 상호간 메시지 교환도 없고 checkpoint로 rollback 후 재수행 할 필요도 없다.