BIGDATA/ν•˜λ‘‘μ—μ½”μ‹œμŠ€ν…œ

[ν•˜λ‘‘μ—μ½”μ‹œμŠ€ν…œ] Apache Avro / μ•„νŒŒμΉ˜ μ—μ΄λΈŒλ‘œ

🐰히히 2021. 3. 28. 21:32
λ°˜μ‘ν˜•

μ•„νŒŒμΉ˜ μ—μ΄λΈŒλ‘œλž€ ? 

https://dennyglee.com/2013/03/12/using-avro-with-hdinsight-on-azure-at-343-industries/

- νŠΉμ • 언어에 μ’…μ†λ˜μ§€ μ•ŠλŠ” μ–Έμ–΄ 쀑립적 데이터 직렬화 μ‹œμŠ€ν…œ

- ν•˜λ‘‘ Writable의 μ£Όμš” 단점인 μ–Έμ–΄ 이식성 ν•΄κ²° μœ„ν•΄ 생겨남

 

μ•„νŒŒμΉ˜ μ“°λ¦¬ν”„νŠΈ, ꡬ글 ν”„λ‘œν† μ½œ 버퍼와 λ‹€λ₯Έ μ°¨λ³„ν™”λœ νŠΉμ„±κ°€μ§€κ³  있음

 

λ°μ΄ν„°λŠ” λ‹€λ₯Έ μ‹œμŠ€ν…œκ³Ό λΉ„μŠ·ν•˜κ²Œ μ–Έμ–΄ 독립 μŠ€ν‚€λ§ˆλ‘œ 기술됨

μ—μ΄λΈŒλ‘œμ—μ„œ μ½”λ“œ 생성은 μ„ νƒμ‚¬ν•­μž„

데이터λ₯Ό 읽고 μ“°λŠ” μ‹œμ μ— μŠ€ν‚€λ§ˆλŠ” 항상 μ‘΄μž¬ν•œλ‹€ 가정함 - 맀우 κ°„κ²°ν•œ 코딩이 κ°€λŠ₯

 

μŠ€ν‚€λ§ˆμ˜ μž‘μ„±

JSON

λ°μ΄ν„°λŠ” λ°”μ΄λ„ˆλ¦¬ 포맷으둜 인코딩

 

μ—μ΄λΈŒλ‘œ λͺ…μ„Έ - λͺ¨λ“  κ΅¬ν˜„μ²΄κ°€ 지원해야 ν•˜λŠ” λ²„μ΄λ„ˆλ¦¬ 포맷에 λŒ€ν•œ μžμ„Έν•œ λ‚΄μš©

API - μ—μ΄λΈŒλ‘œ λͺ…μ„Έμ—μ„œ λΉ μ ΈμžˆλŠ” λ‚΄μš©μž„. 각 νŠΉμ •μ–Έμ–΄μ— 따라 λ‹€λ₯΄κ²Œ μž‘μ„±λ¨. μ–Έμ–΄μ˜ 바인딩 νŽΈμ˜μ„± 높이고 μƒν˜Έμš΄μ˜μ„± μ €ν•˜ 문제 해결됨

 

μŠ€ν‚€λ§ˆν•΄μ„ - μ‹ μ€‘ν•˜κ²Œ μ •μ˜λœ μ–΄λ– ν•œ μ œμ•½μ‘°κ±΄μ—μ„œλ„ 데이터λ₯Ό μ½λŠ” 데 μ‚¬μš©λ˜λŠ” μŠ€ν‚€λ§ˆμ™€ 데이터λ₯Ό μ“°λŠ” 데 μ‚¬μš©λ˜λŠ” μŠ€ν‚€λ§ˆκ°€ 같지 μ•Šμ•„λ„ λœλ‹€.(μŠ€ν‚€λ§ˆ λ³€ν˜• λ©”μ»€λ‹ˆμ¦˜)

 

ex ) 과거의 데이터λ₯Ό 읽을 λ•Œ μ‚¬μš©ν•œ μŠ€ν‚€λ§ˆμ— μƒˆλ‘œμš΄ ν•„λ“œλ₯Ό μΆ”κ°€ν•  수 μžˆλ‹€. μƒˆλ‘œμš΄ μ‚¬μš©μžμ™€ κΈ°μ‘΄ μ‚¬μš©μžλŠ” λͺ¨λ‘ 과거의 데이터λ₯Ό λ¬Έμ œμ—†μ΄ 읽을 수 μžˆμœΌλ©°μƒˆλ‘œμš΄ μ‚¬μš©μžλŠ” μƒˆλ‘œμš΄ ν•„λ“œκ°€ μΆ”κ°€λœ 데이터λ₯Ό μ“Έμˆ˜ μžˆλ‹€. κΈ°μ‘΄ μ‚¬μš©μžλŠ” μƒˆλ‘œμš΄ 데이터λ₯Ό λ³΄κ²Œλ˜λŠ”λ° μƒˆλ‘œμš΄ ν•„λ“œλŠ” λ¬΄μ‹œν•˜κ³  κΈ°μ‘΄ 데이터 μž‘μ—…μ²˜λŸΌ μ²˜λ¦¬ν•  수 μžˆλ‹€.

 

객체 μ»¨ν…Œμ΄λ„ˆ 포맷 제곡(ν•˜λ‘‘ μ‹œν€€μŠ€ 파일과 μœ μ‚¬ν•¨)

μ—μ΄λΈŒλ‘œ 데이터 νŒŒμΌμ€ μŠ€ν‚€λ§ˆκ°€ μ €μž₯된 메타데이터 μ„Ήμ…˜μ„ ν¬ν•¨ν•˜κ³  μžˆμ–΄ μžμ‹ μ„ μ„€λͺ…ν•˜λŠ” νŒŒμΌμž„

μ—μ΄λΈŒλ‘œ 데이터 νŒŒμΌμ€ μ••μΆ•κ³Ό λΆ„ν•  κΈ°λŠ₯ 제곡

 

μ—μ΄λΈŒλ‘œ μžλ£Œν˜•κ³Ό μŠ€ν‚€λ§ˆ

μ—μ΄λΈŒλ‘œμ˜ κΈ°λ³Έ μžλ£Œν˜• ν‘œμ‹œ

1
2
3
4
5
6
7
8
{"type":"null"}
{"type":"boolean"}
{"type":"int"}
{"type":"long"}
{"type":"float"}
{"type":"double"}
{"type":"bytes"}
{"type":"string"}
cs

μ—μ΄λΈŒλ‘œ 볡합 μžλ£Œν˜•

array μˆœμ„œμžˆλŠ” 객체 집합, 동일 νƒ€μž…

1
2
3
4
5
{
 "type""array",
 "items""long"
}
 
cs

map μˆœμ„œ μ—†λŠ” ν‚€-κ°’, 동일 νƒ€μž…

1
2
3
4
{
 "type""map",
 "values""string"
}
cs

record μž„μ˜μ˜ μžλ£Œν˜•

1
2
3
4
5
6
7
8
9
10
11
{
 "type""record",
 "name""WeatherRecord",
 "doc""A weather reading.",
 "fields": [
 {"name""year""type""int"},
 {"name""temperature""type""int"},
 {"name""stationId""type""string"}
 ]
}
 
cs

enum λͺ…λͺ…λœ κ°’μ˜ 집합

1
2
3
4
5
6
7
{
 "type""enum",
 "name""Cutlery",
 "doc""An eating utensil.",
 "symbols": ["KNIFE""FORK""SPOON"]
}
 
cs

fixed 고정길이 8λΉ„νŠΈ λΆ€ν˜Έ μ—†λŠ” λ°”μ΄νŠΈ

1
2
3
4
5
6
{
 "type""fixed",
 "name""Md5Hash",
 "size"16
}
 
cs

union μŠ€ν‚€λ§ˆμ˜ μœ λ‹ˆμ˜¨, λ°°μ—΄μ˜ κ°μš”μ†ŒλŠ” μŠ€ν‚€λ§ˆμž„

1
2
3
4
5
[
 "null",
 "string",
 {"type""map""values""string"}
]
cs

 

 

μ—μ΄λΈŒλ‘œ μžλ£Œν˜•κ³Ό λ‹€λ₯Έ ν”„λ‘œκ·Έλž˜λ° μ–Έμ–΄ μžλ£Œν˜• 맀핑 ν•„μš”

 

μžλ°” - μ œλ„€λ¦­ 맀핑 : μŠ€ν‚€λ§ˆλ₯Ό κ²°μ •ν•  수 없을 λ•Œ

μžλ°”, C++ - ꡬ체적 맀핑 : μŠ€ν‚€λ§ˆ 데이터 ν‘œν˜„ μ½”λ“œ 생성

μžλ°” - λ¦¬ν”Œλ ‰νŠΈ 맀핑 : μ—μ΄λΈŒλ‘œ μžλ£Œν˜•μ„ κΈ°μ‘΄ μžλ°” μžλ£Œν˜•μœΌλ‘œ 맀핑 

 

 

인메λͺ¨λ¦¬ 직렬화와 역직렬화

μ—μ΄λΈŒλ‘œ μŠ€ν‚€λ§ˆ μ˜ˆμ‹œ - ν•΄λ‹Ή μ—μ΄λΈŒλ‘œ μŠ€ν‚€λ§ˆλŠ” StringPair.avsc μ— μ €μž₯됨 

1
2
3
4
5
6
7
8
9
{
 "type""record",
 "name""StringPair",
 "doc""A pair of strings.",
 "fields": [
 {"name""left""type""string"},
 {"name""right""type""string"}
 ]
}
cs

νŒŒμΌμ„ 클래슀 κ²½λ‘œμ— μ €μž₯ν•œ ν›„ λ‘œλ”©ν•¨

1
2
3
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(
getClass().getResourceAsStream("StringPair.avsc"));
cs

μ œλ„€λ¦­ API λ₯Ό μ‚¬μš©ν•˜μ—¬ μ—μ΄λΈŒλ‘œ λ ˆμ½”λ“œμ˜ μΈμŠ€ν„΄μŠ€λ₯Ό 생성함

1
2
3
4
GenericRecord datum = new GenericData.Record(schema);
datum.put("left""L");
datum.put("right""R");
cs

좜λ ₯ μŠ€νŠΈλ¦Όμ— λ ˆμ½”λ“œλ₯Ό 직렬화함

1
2
3
4
5
6
7
8
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer =
new GenericDatumWriter<GenericRecord>(schema);
Encoder encoder = EncoderFactory.get().binaryEncoder(outnull);
writer.write(datum, encoder);
encoder.flush();
out.close();
cs

DatumReader : 데이터 객체λ₯Ό 인코더가 이해할 수 μžˆλŠ” μžλ£Œν˜•μœΌλ‘œ λ°˜ν™˜

GenericDatumReader : GenericRecord의 ν•„λ“œλ₯Ό μΈμ½”λ”λ‘œ 전달

μ΄μ „μ˜ μƒμ„±λœ 인코더λ₯Ό μž¬μ‚¬μš©ν•˜μ§€ μ•ŠκΈ° λ•Œλ¬Έμ— 인코더 νŒ©ν† λ¦¬μ— null 전달

writer() 슀트림 λ‹«κΈ° 전에 ν•„μš”ν•˜λ©΄ 더 호좜 κ°€λŠ₯

encoder.flush(); writeλ©”μ„œλ“œ 호좜 ν›„ 인코더 ν”ŒλŸ¬μ‹œν•˜κ³  좜λ ₯ 슀트림 λ‹«μŒ

 

ν•΄λ‹Ή κ³Όμ • λ°˜λŒ€λ‘œ ν•˜λ©΄ λ°”μ΄νŠΈ λ²„νΌμ—μ„œ 객체 읽을 수 있음

1
2
3
4
5
6
7
 DatumReader<GenericRecord> reader =
 new GenericDatumReader<GenericRecord>(schema);
 Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(),
 null);
 GenericRecord result = reader.read(null, decoder);
 assertThat(result.get("left").toString(), is("L"));
 assertThat(result.get("right").toString(), is("R"));
cs

 

 

ꡬ체적인 API

메이븐에 μΆ”κ°€ν•˜μ—¬ μžλ°”λ‘œλœ μŠ€ν‚€λ§ˆ μ½”λ“œ 생성할 수 μžˆλ‹€.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<project>
 ...
 <build>
 <plugins>
 <plugin>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro-maven-plugin</artifactId>
 <version>${avro.version}</version>
 <executions>
 <execution>
 <id>schemas</id>
 <phase>generate-sources</phase>
 <goals>
 <goal>schema</goal>
 </goals>
 <configuration>
 <includes>
 <include>StringPair.avsc</include>
 </includes>
 <stringType>String</stringType>
 <sourceDirectory>src/main/resources</sourceDirectory>
 <outputDirectory>${project.build.directory}/generated-sources/java
 </outputDirectory>
 </configuration>
 </execution>
 </executions>
 </plugin>
 </plugins>
 </build>
 ...
</project>
 
cs

μ—μ΄λΈŒλ‘œ 데이터 파일

인메λͺ¨λ¦¬ μŠ€νŠΈλ¦Όμ—μ„œ 파일 읽기

데이터 파일 = μ—μ΄λΈŒλ‘œ μŠ€ν‚€λ§ˆ +  헀더 (메타데이터 (μ‹±ν¬λ§ˆμ»€ 포함)) +일련의 블둝 (μ§λ ¬ν™”λœ μ—μ΄λΈŒλ‘œ 객체가 μžˆλŠ”)

데이터 νŒŒμΌμ— 기둝된 κ°μ²΄λŠ” λ°˜λ“œμ‹œ 파일의 μŠ€ν‚€λ§ˆμ™€ μΌμΉ˜ν•΄μ•Όν•œλ‹€.

μΌμΉ˜ν•˜μ§€μ•ŠλŠ” κ²½μš°μ— appendμ‹œ μ˜ˆμ™Έ λ°œμƒν•  수 μžˆλ‹€.

1
2
3
4
5
6
7
8
9
10
File file = new File("data.avro");
DatumWriter<GenericRecord> writer =
 new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter =
new DataFileWriter<GenericRecord>(writer);
dataFileWriter.create(schema, file);
dataFileWriter.append(datum);
dataFileWriter.close();
 
cs

 

νŒŒμΌμ— ν¬ν•¨λœ 메타데이터 μ°Έμ‘°ν•˜μ—¬ 읽기 λ•Œλ¬Έμ— μŠ€ν‚€λ§ˆλ₯Ό λ”°λ‘œ μ •μ˜ν•˜μ§€μ•Šμ•„λ„λ¨

getSchema()λ₯Ό μ΄μš©ν•˜λ©΄ DataFileReader μΈμŠ€ν„΄μŠ€μ˜ μŠ€ν‚€λ§ˆ 정보 얻을 수 있고 원본 객체에 μ‚¬μš©ν•œ μŠ€ν‚€λ§ˆμ™€ 같은지 확인가λŠ₯ 

1
2
3
4
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<GenericRecord>(file, reader);
assertThat("Schema is the same", schema, is(dataFileReader.getSchema()));
cs

DataFileReaderλŠ” μ •κ·œ μžλ°” 반볡자둜 hasNext, nextλ©”μ„œλ“œλ₯Ό 반볡적으둜 ν˜ΈμΆœν•˜μ—¬

λͺ¨λ“  데이터 객체λ₯Ό μˆœν™˜ν•  수 μžˆλ‹€. λ ˆμ½”λ“œκ°€ ν•œκ°œλ§Œ μžˆλŠ”μ§€ ν™•μΈν•˜κ³  κΈ°λŒ€ν•œ ν•„λ“œκ°’ μžˆλŠ” 지 확인

1
2
3
4
5
assertThat(dataFileReader.hasNext(), is(true));
GenericRecord result = dataFileReader.next();
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toString(), is("R"));
assertThat(dataFileReader.hasNext(), is(false));
cs

μƒν˜Έ μš΄μ˜μ„±

파이썬 API

1
2
3
4
5
6
import os
import string
import sys
from avro import schema
from avro import io
from avro import datafile
cs

 

μ—μ΄λΈŒλ‘œ 도ꡬ

1
% java -jar $AVRO_HOME/avro-tools-*.jar tojson pairs.avro
cs

 

μŠ€ν‚€λ§ˆ 해석

기둝할 λ•Œ μ‚¬μš©ν•œ writer μŠ€ν‚€λ§ˆμ™€ λ‹€λ₯Έ reader의 μŠ€ν‚€λ§ˆλ₯Ό μ‚¬μš©ν•˜μ—¬ 데이터λ₯Ό λ‹€μ‹œ 읽을 수 μžˆλ‹€,.

μΆ”κ°€λœ ν•„λ“œ - reader μ‹ κ·œμΌ λ•Œ, readerλŠ” μ‹ κ·œ ν•„λ“œμ˜ κΈ°λ³Έκ°’μ΄μš©

μΆ”κ°€λœ ν•„λ“œ - writer μ‹ κ·œμΌ λ•Œ, readerλŠ” μ‹ κ·œ ν•„λ“œ λͺ¨λ₯΄κΈ° λ•Œλ¬Έμ— λ¬΄μ‹œν•¨

제거된 ν•„λ“œ - reader μ‹ κ·œμΌ λ•Œ, μ‚­μ œλœ ν•„λ“œ λ¬΄μ‹œ

제거된 ν•„λ“œ - writer μ‹ κ·œμΌ λ•Œ, 제거된 ν•„λ“œ κΈ°λ‘ν•˜μ§€ μ•ŠμŒ. reader 의 μŠ€ν‚€λ§ˆλ₯Ό writer μŠ€ν‚€λ§ˆμ™€ κ°™κ²Œ λ§žμΆ”κ±°λ‚˜ μ΄μ „μœΌλ‘œ 갱신함

μ •λ ¬ μˆœμ„œ

recordλ₯Ό μ œμ™Έμ•ˆ λͺ¨λ“  μžλ£Œν˜•μ—λŠ” μˆœμ„œκ°€ μ •ν•΄μ ΈμžˆμŒ

recordλŠ” order 속성 λͺ…μ‹œν•˜μ—¬ μ •λ ¬ μ œμ–΄ν•  수 μžˆλ‹€.

μ˜€λ¦„μ°¨μˆœ (κΈ°λ³Έ)

λ‚΄λ¦Όμ°¨μˆœ : descending

λ¬΄μ‹œ : ignore

 

μ—μ΄λΈŒλ‘œ λ§΅λ¦¬λ“€μŠ€

날씨 λ ˆμ½”λ“œ 

1
2
3
4
5
6
7
8
9
10
{
 "type""record",
 "name""WeatherRecord",
 "doc""A weather reading.",
 "fields": [
 {"name""year""type""int"},
 {"name""temperature""type""int"},
 {"name""stationId""type""string"}
 ]
}
cs

졜고 κΈ°μ˜¨μ„ μ°ΎλŠ” λ§΅λ¦¬λ“€μŠ€ ν”„λ‘œκ·Έλž¨, μ—μ΄λΈŒλ‘œ 좜λ ₯ λ§Œλ“¦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class AvroGenericMaxTemperature extends Configured implements Tool {
 
 private static final Schema SCHEMA = new Schema.Parser().parse(
 "{" +
 " \"type\": \"record\"," +
 " \"name\": \"WeatherRecord\"," +
 " \"doc\": \"A weather reading.\"," +
 " \"fields\": [" +
 " {\"name\": \"year\", \"type\": \"int\"}," +
 " {\"name\": \"temperature\", \"type\": \"int\"}," +
 " {\"name\": \"stationId\", \"type\": \"string\"}" +
 " ]" +
 "}"
 );
 public static class MaxTemperatureMapper
 extends Mapper<LongWritable, Text, AvroKey<Integer>,
 AvroValue<GenericRecord>> {
 private NcdcRecordParser parser = new NcdcRecordParser();
 private GenericRecord record = new GenericData.Record(SCHEMA);
 @Override
 protected void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {
 parser.parse(value.toString());
 if (parser.isValidTemperature()) {
 record.put("year", parser.getYearInt());
 record.put("temperature", parser.getAirTemperature());
 record.put("stationId", parser.getStationId());
 context.write(new AvroKey<Integer>(parser.getYearInt()),
 new AvroValue<GenericRecord>(record));
 }
 }
 }
 
 public static class MaxTemperatureReducer
 extends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>,
 AvroKey<GenericRecord>, NullWritable> {
 @Override
 protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>>
 values, Context context) throws IOException, InterruptedException {
 GenericRecord max = null;
 for (AvroValue<GenericRecord> value : values) {
 GenericRecord record = value.datum();
 if (max == null ||
360 | Chapter 12: Avro (Integer) record.get("temperature"> (Integer) max.get("temperature")) {
 max = newWeatherRecord(record);
 }
 }
 context.write(new AvroKey(max), NullWritable.get());
 }
 private GenericRecord newWeatherRecord(GenericRecord value) {
 GenericRecord record = new GenericData.Record(SCHEMA);
 record.put("year", value.get("year"));
 record.put("temperature", value.get("temperature"));
 record.put("stationId", value.get("stationId"));
 return record;
 }
 }
 @Override
 public int run(String[] args) throws Exception {
 if (args.length != 2) {
 System.err.printf("Usage: %s [generic options] <input> <output>\n",
 getClass().getSimpleName());
 ToolRunner.printGenericCommandUsage(System.err);
 return -1;
 }
 Job job = new Job(getConf(), "Max temperature");
 job.setJarByClass(getClass());
 job.getConfiguration().setBoolean(
 Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
 FileInputFormat.addInputPath(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job, new Path(args[1]));
 AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
 AvroJob.setMapOutputValueSchema(job, SCHEMA);
 AvroJob.setOutputKeySchema(job, SCHEMA);
 job.setInputFormatClass(TextInputFormat.class);
 job.setOutputFormatClass(AvroKeyOutputFormat.class);
 job.setMapperClass(MaxTemperatureMapper.class);
 job.setReducerClass(MaxTemperatureReducer.class);
 return job.waitForCompletion(true) ? 0 : 1;
 }
 
 public static void main(String[] args) throws Exception {
 int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);
 System.exit(exitCode);
 }
}
cs

ν”„λ‘œκ·Έλž¨ μ‹€ν–‰

1
2
3
4
export HADOOP_CLASSPATH=avro-examples.jar
export HADOOP_USER_CLASSPATH_FIRST=true # override version of Avro in Hadoop
% hadoop jar avro-examples.jar AvroGenericMaxTemperature \
 input/ncdc/sample.txt output

결과물좜λ ₯

1
2
3
% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro
{"year":1949,"temperature":111,"stationId":"012650-99999"}
{"year":1950,"temperature":22,"stationId":"011990-99999"}
cs

 

μ—μ΄λΈŒλ‘œ λ§΅λ¦¬λ“€μŠ€ μ΄μš©ν•΄ μ •λ ¬

μ—μ΄λΈŒλ‘œ 데이터 νŒŒμΌμ„ μ •λ ¬ν•˜λŠ” λ§΅λ¦¬λ“€μŠ€ ν”„λ‘œκ·Έλž¨

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class AvroSort extends Configured implements Tool {
 static class SortMapper<K> extends Mapper<AvroKey<K>, NullWritable,
 AvroKey<K>, AvroValue<K>> {
 @Override
 protected void map(AvroKey<K> key, NullWritable value,
 Context context) throws IOException, InterruptedException {
 context.write(key, new AvroValue<K>(key.datum()));
 }
 }
 static class SortReducer<K> extends Reducer<AvroKey<K>, AvroValue<K>,
 AvroKey<K>, NullWritable> {
 @Override
 protected void reduce(AvroKey<K> key, Iterable<AvroValue<K>> values,
 Context context) throws IOException, InterruptedException {
 for (AvroValue<K> value : values) {
 context.write(new AvroKey(value.datum()), NullWritable.get());
 }
 }
 }
 @Override
 public int run(String[] args) throws Exception {
 
 if (args.length != 3) {
 System.err.printf(
 "Usage: %s [generic options] <input> <output> <schema-file>\n",
 getClass().getSimpleName());
 ToolRunner.printGenericCommandUsage(System.err);
 return -1;
 }
 
 String input = args[0];
 String output = args[1];
 String schemaFile = args[2];
 Job job = new Job(getConf(), "Avro sort");
 job.setJarByClass(getClass());
 job.getConfiguration().setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
 FileInputFormat.addInputPath(job, new Path(input));
 FileOutputFormat.setOutputPath(job, new Path(output));
 AvroJob.setDataModelClass(job, GenericData.class);
 Schema schema = new Schema.Parser().parse(new File(schemaFile));
 AvroJob.setInputKeySchema(job, schema);
 AvroJob.setMapOutputKeySchema(job, schema);
 AvroJob.setMapOutputValueSchema(job, schema);
 AvroJob.setOutputKeySchema(job, schema);
 job.setInputFormatClass(AvroKeyInputFormat.class);
 job.setOutputFormatClass(AvroKeyOutputFormat.class);
 job.setOutputKeyClass(AvroKey.class);
 job.setOutputValueClass(NullWritable.class);
 job.setMapperClass(SortMapper.class);
 job.setReducerClass(SortReducer.class);
 return job.waitForCompletion(true) ? 0 : 1;
 }
 
 public static void main(String[] args) throws Exception {
 int exitCode = ToolRunner.run(new AvroSort(), args);
 System.exit(exitCode);
 }
}
cs

정렬은 λ§΅λ¦¬λ“€μŠ€ μ…”ν”Œ κ³Όμ •μ—μ„œ μΌμ–΄λ‚˜λ©° μ •λ ¬κΈ°λŠ₯은 μ—μ΄λΈŒλ‘œμ˜ μŠ€ν‚€λ§ˆμ— μ˜ν•΄ 정해짐

μž…λ ₯데이터 점검

1
2
3
4
5
% java -jar $AVRO_HOME/avro-tools-*.jar tojson input/avro/pairs.avro
{"left":"a","right":"1"}
{"left":"c","right":"2"}
{"left":"b","right":"3"}
{"left":"b","right":"2"}
cs

ν”„λ‘œκ·Έλž¨μ‚¬μš©ν•˜μ—¬ μ •λ ¬

1
2
% hadoop jar avro-examples.jar AvroSort input/avro/pairs.avro output \
 ch12-avro/src/main/resources/SortedStringPair.avsc
cs

μ •λ ¬ ν›„ μ €μž₯된 파일 좜λ ₯

1
2
3
4
5
% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro
{"left":"b","right":"3"}
{"left":"b","right":"2"}
{"left":"c","right":"2"}
{"left":"a","right":"1"}
cs
728x90
λ°˜μ‘ν˜•