๋ฐ˜์‘ํ˜•

์•„ํŒŒ์น˜ ์—์ด๋ธŒ๋กœ๋ž€ ? 

https://dennyglee.com/2013/03/12/using-avro-with-hdinsight-on-azure-at-343-industries/

- ํŠน์ • ์–ธ์–ด์— ์ข…์†๋˜์ง€ ์•Š๋Š” ์–ธ์–ด ์ค‘๋ฆฝ์  ๋ฐ์ดํ„ฐ ์ง๋ ฌํ™” ์‹œ์Šคํ…œ

- ํ•˜๋‘ก Writable์˜ ์ฃผ์š” ๋‹จ์ ์ธ ์–ธ์–ด ์ด์‹์„ฑ ํ•ด๊ฒฐ ์œ„ํ•ด ์ƒ๊ฒจ๋‚จ

 

์•„ํŒŒ์น˜ ์“ฐ๋ฆฌํ”„ํŠธ, ๊ตฌ๊ธ€ ํ”„๋กœํ† ์ฝœ ๋ฒ„ํผ์™€ ๋‹ค๋ฅธ ์ฐจ๋ณ„ํ™”๋œ ํŠน์„ฑ๊ฐ€์ง€๊ณ  ์žˆ์Œ

 

๋ฐ์ดํ„ฐ๋Š” ๋‹ค๋ฅธ ์‹œ์Šคํ…œ๊ณผ ๋น„์Šทํ•˜๊ฒŒ ์–ธ์–ด ๋…๋ฆฝ ์Šคํ‚ค๋งˆ๋กœ ๊ธฐ์ˆ ๋จ

์—์ด๋ธŒ๋กœ์—์„œ ์ฝ”๋“œ ์ƒ์„ฑ์€ ์„ ํƒ์‚ฌํ•ญ์ž„

๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๊ณ  ์“ฐ๋Š” ์‹œ์ ์— ์Šคํ‚ค๋งˆ๋Š” ํ•ญ์ƒ ์กด์žฌํ•œ๋‹ค ๊ฐ€์ •ํ•จ - ๋งค์šฐ ๊ฐ„๊ฒฐํ•œ ์ฝ”๋”ฉ์ด ๊ฐ€๋Šฅ

 

์Šคํ‚ค๋งˆ์˜ ์ž‘์„ฑ

JSON

๋ฐ์ดํ„ฐ๋Š” ๋ฐ”์ด๋„ˆ๋ฆฌ ํฌ๋งท์œผ๋กœ ์ธ์ฝ”๋”ฉ

 

์—์ด๋ธŒ๋กœ ๋ช…์„ธ - ๋ชจ๋“  ๊ตฌํ˜„์ฒด๊ฐ€ ์ง€์›ํ•ด์•ผ ํ•˜๋Š” ๋ฒ„์ด๋„ˆ๋ฆฌ ํฌ๋งท์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ

API - ์—์ด๋ธŒ๋กœ ๋ช…์„ธ์—์„œ ๋น ์ ธ์žˆ๋Š” ๋‚ด์šฉ์ž„. ๊ฐ ํŠน์ •์–ธ์–ด์— ๋”ฐ๋ผ ๋‹ค๋ฅด๊ฒŒ ์ž‘์„ฑ๋จ. ์–ธ์–ด์˜ ๋ฐ”์ธ๋”ฉ ํŽธ์˜์„ฑ ๋†’์ด๊ณ  ์ƒํ˜ธ์šด์˜์„ฑ ์ €ํ•˜ ๋ฌธ์ œ ํ•ด๊ฒฐ๋จ

 

์Šคํ‚ค๋งˆํ•ด์„ - ์‹ ์ค‘ํ•˜๊ฒŒ ์ •์˜๋œ ์–ด๋– ํ•œ ์ œ์•ฝ์กฐ๊ฑด์—์„œ๋„ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๋Š” ๋ฐ ์‚ฌ์šฉ๋˜๋Š” ์Šคํ‚ค๋งˆ์™€ ๋ฐ์ดํ„ฐ๋ฅผ ์“ฐ๋Š” ๋ฐ ์‚ฌ์šฉ๋˜๋Š” ์Šคํ‚ค๋งˆ๊ฐ€ ๊ฐ™์ง€ ์•Š์•„๋„ ๋œ๋‹ค.(์Šคํ‚ค๋งˆ ๋ณ€ํ˜• ๋ฉ”์ปค๋‹ˆ์ฆ˜)

 

ex ) ๊ณผ๊ฑฐ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์„ ๋•Œ ์‚ฌ์šฉํ•œ ์Šคํ‚ค๋งˆ์— ์ƒˆ๋กœ์šด ํ•„๋“œ๋ฅผ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ƒˆ๋กœ์šด ์‚ฌ์šฉ์ž์™€ ๊ธฐ์กด ์‚ฌ์šฉ์ž๋Š” ๋ชจ๋‘ ๊ณผ๊ฑฐ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฌธ์ œ์—†์ด ์ฝ์„ ์ˆ˜ ์žˆ์œผ๋ฉฐ์ƒˆ๋กœ์šด ์‚ฌ์šฉ์ž๋Š” ์ƒˆ๋กœ์šด ํ•„๋“œ๊ฐ€ ์ถ”๊ฐ€๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์“ธ์ˆ˜ ์žˆ๋‹ค. ๊ธฐ์กด ์‚ฌ์šฉ์ž๋Š” ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๊ฒŒ๋˜๋Š”๋ฐ ์ƒˆ๋กœ์šด ํ•„๋“œ๋Š” ๋ฌด์‹œํ•˜๊ณ  ๊ธฐ์กด ๋ฐ์ดํ„ฐ ์ž‘์—…์ฒ˜๋Ÿผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

 

๊ฐ์ฒด ์ปจํ…Œ์ด๋„ˆ ํฌ๋งท ์ œ๊ณต(ํ•˜๋‘ก ์‹œํ€€์Šค ํŒŒ์ผ๊ณผ ์œ ์‚ฌํ•จ)

์—์ด๋ธŒ๋กœ ๋ฐ์ดํ„ฐ ํŒŒ์ผ์€ ์Šคํ‚ค๋งˆ๊ฐ€ ์ €์žฅ๋œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์„น์…˜์„ ํฌํ•จํ•˜๊ณ  ์žˆ์–ด ์ž์‹ ์„ ์„ค๋ช…ํ•˜๋Š” ํŒŒ์ผ์ž„

์—์ด๋ธŒ๋กœ ๋ฐ์ดํ„ฐ ํŒŒ์ผ์€ ์••์ถ•๊ณผ ๋ถ„ํ•  ๊ธฐ๋Šฅ ์ œ๊ณต

 

์—์ด๋ธŒ๋กœ ์ž๋ฃŒํ˜•๊ณผ ์Šคํ‚ค๋งˆ

์—์ด๋ธŒ๋กœ์˜ ๊ธฐ๋ณธ ์ž๋ฃŒํ˜• ํ‘œ์‹œ

1
2
3
4
5
6
7
8
{"type":"null"}
{"type":"boolean"}
{"type":"int"}
{"type":"long"}
{"type":"float"}
{"type":"double"}
{"type":"bytes"}
{"type":"string"}
cs

์—์ด๋ธŒ๋กœ ๋ณตํ•ฉ ์ž๋ฃŒํ˜•

array ์ˆœ์„œ์žˆ๋Š” ๊ฐ์ฒด ์ง‘ํ•ฉ, ๋™์ผ ํƒ€์ž…

1
2
3
4
5
{
 "type""array",
 "items""long"
}
 
cs

map ์ˆœ์„œ ์—†๋Š” ํ‚ค-๊ฐ’, ๋™์ผ ํƒ€์ž…

1
2
3
4
{
 "type""map",
 "values""string"
}
cs

record ์ž„์˜์˜ ์ž๋ฃŒํ˜•

1
2
3
4
5
6
7
8
9
10
11
{
 "type""record",
 "name""WeatherRecord",
 "doc""A weather reading.",
 "fields": [
 {"name""year""type""int"},
 {"name""temperature""type""int"},
 {"name""stationId""type""string"}
 ]
}
 
cs

enum ๋ช…๋ช…๋œ ๊ฐ’์˜ ์ง‘ํ•ฉ

1
2
3
4
5
6
7
{
 "type""enum",
 "name""Cutlery",
 "doc""An eating utensil.",
 "symbols": ["KNIFE""FORK""SPOON"]
}
 
cs

fixed ๊ณ ์ •๊ธธ์ด 8๋น„ํŠธ ๋ถ€ํ˜ธ ์—†๋Š” ๋ฐ”์ดํŠธ

1
2
3
4
5
6
{
 "type""fixed",
 "name""Md5Hash",
 "size"16
}
 
cs

union ์Šคํ‚ค๋งˆ์˜ ์œ ๋‹ˆ์˜จ, ๋ฐฐ์—ด์˜ ๊ฐ์š”์†Œ๋Š” ์Šคํ‚ค๋งˆ์ž„

1
2
3
4
5
[
 "null",
 "string",
 {"type""map""values""string"}
]
cs

 

 

์—์ด๋ธŒ๋กœ ์ž๋ฃŒํ˜•๊ณผ ๋‹ค๋ฅธ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ์–ธ์–ด ์ž๋ฃŒํ˜• ๋งคํ•‘ ํ•„์š”

 

์ž๋ฐ” - ์ œ๋„ค๋ฆญ ๋งคํ•‘ : ์Šคํ‚ค๋งˆ๋ฅผ ๊ฒฐ์ •ํ•  ์ˆ˜ ์—†์„ ๋•Œ

์ž๋ฐ”, C++ - ๊ตฌ์ฒด์  ๋งคํ•‘ : ์Šคํ‚ค๋งˆ ๋ฐ์ดํ„ฐ ํ‘œํ˜„ ์ฝ”๋“œ ์ƒ์„ฑ

์ž๋ฐ” - ๋ฆฌํ”Œ๋ ‰ํŠธ ๋งคํ•‘ : ์—์ด๋ธŒ๋กœ ์ž๋ฃŒํ˜•์„ ๊ธฐ์กด ์ž๋ฐ” ์ž๋ฃŒํ˜•์œผ๋กœ ๋งคํ•‘ 

 

 

์ธ๋ฉ”๋ชจ๋ฆฌ ์ง๋ ฌํ™”์™€ ์—ญ์ง๋ ฌํ™”

์—์ด๋ธŒ๋กœ ์Šคํ‚ค๋งˆ ์˜ˆ์‹œ - ํ•ด๋‹น ์—์ด๋ธŒ๋กœ ์Šคํ‚ค๋งˆ๋Š” StringPair.avsc ์— ์ €์žฅ๋จ 

1
2
3
4
5
6
7
8
9
{
 "type""record",
 "name""StringPair",
 "doc""A pair of strings.",
 "fields": [
 {"name""left""type""string"},
 {"name""right""type""string"}
 ]
}
cs

ํŒŒ์ผ์„ ํด๋ž˜์Šค ๊ฒฝ๋กœ์— ์ €์žฅํ•œ ํ›„ ๋กœ๋”ฉํ•จ

1
2
3
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(
getClass().getResourceAsStream("StringPair.avsc"));
cs

์ œ๋„ค๋ฆญ API ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์—์ด๋ธŒ๋กœ ๋ ˆ์ฝ”๋“œ์˜ ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•จ

1
2
3
4
GenericRecord datum = new GenericData.Record(schema);
datum.put("left""L");
datum.put("right""R");
cs

์ถœ๋ ฅ ์ŠคํŠธ๋ฆผ์— ๋ ˆ์ฝ”๋“œ๋ฅผ ์ง๋ ฌํ™”ํ•จ

1
2
3
4
5
6
7
8
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer =
new GenericDatumWriter<GenericRecord>(schema);
Encoder encoder = EncoderFactory.get().binaryEncoder(outnull);
writer.write(datum, encoder);
encoder.flush();
out.close();
cs

DatumReader : ๋ฐ์ดํ„ฐ ๊ฐ์ฒด๋ฅผ ์ธ์ฝ”๋”๊ฐ€ ์ดํ•ดํ•  ์ˆ˜ ์žˆ๋Š” ์ž๋ฃŒํ˜•์œผ๋กœ ๋ฐ˜ํ™˜

GenericDatumReader : GenericRecord์˜ ํ•„๋“œ๋ฅผ ์ธ์ฝ”๋”๋กœ ์ „๋‹ฌ

์ด์ „์˜ ์ƒ์„ฑ๋œ ์ธ์ฝ”๋”๋ฅผ ์žฌ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ์ธ์ฝ”๋” ํŒฉํ† ๋ฆฌ์— null ์ „๋‹ฌ

writer() ์ŠคํŠธ๋ฆผ ๋‹ซ๊ธฐ ์ „์— ํ•„์š”ํ•˜๋ฉด ๋” ํ˜ธ์ถœ ๊ฐ€๋Šฅ

encoder.flush(); write๋ฉ”์„œ๋“œ ํ˜ธ์ถœ ํ›„ ์ธ์ฝ”๋” ํ”Œ๋Ÿฌ์‹œํ•˜๊ณ  ์ถœ๋ ฅ ์ŠคํŠธ๋ฆผ ๋‹ซ์Œ

 

ํ•ด๋‹น ๊ณผ์ • ๋ฐ˜๋Œ€๋กœ ํ•˜๋ฉด ๋ฐ”์ดํŠธ ๋ฒ„ํผ์—์„œ ๊ฐ์ฒด ์ฝ์„ ์ˆ˜ ์žˆ์Œ

1
2
3
4
5
6
7
 DatumReader<GenericRecord> reader =
 new GenericDatumReader<GenericRecord>(schema);
 Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(),
 null);
 GenericRecord result = reader.read(null, decoder);
 assertThat(result.get("left").toString(), is("L"));
 assertThat(result.get("right").toString(), is("R"));
cs

 

 

๊ตฌ์ฒด์ ์ธ API

๋ฉ”์ด๋ธ์— ์ถ”๊ฐ€ํ•˜์—ฌ ์ž๋ฐ”๋กœ๋œ ์Šคํ‚ค๋งˆ ์ฝ”๋“œ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<project>
 ...
 <build>
 <plugins>
 <plugin>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro-maven-plugin</artifactId>
 <version>${avro.version}</version>
 <executions>
 <execution>
 <id>schemas</id>
 <phase>generate-sources</phase>
 <goals>
 <goal>schema</goal>
 </goals>
 <configuration>
 <includes>
 <include>StringPair.avsc</include>
 </includes>
 <stringType>String</stringType>
 <sourceDirectory>src/main/resources</sourceDirectory>
 <outputDirectory>${project.build.directory}/generated-sources/java
 </outputDirectory>
 </configuration>
 </execution>
 </executions>
 </plugin>
 </plugins>
 </build>
 ...
</project>
 
cs

์—์ด๋ธŒ๋กœ ๋ฐ์ดํ„ฐ ํŒŒ์ผ

์ธ๋ฉ”๋ชจ๋ฆฌ ์ŠคํŠธ๋ฆผ์—์„œ ํŒŒ์ผ ์ฝ๊ธฐ

๋ฐ์ดํ„ฐ ํŒŒ์ผ = ์—์ด๋ธŒ๋กœ ์Šคํ‚ค๋งˆ +  ํ—ค๋” (๋ฉ”ํƒ€๋ฐ์ดํ„ฐ (์‹ฑํฌ๋งˆ์ปค ํฌํ•จ)) +์ผ๋ จ์˜ ๋ธ”๋ก (์ง๋ ฌํ™”๋œ ์—์ด๋ธŒ๋กœ ๊ฐ์ฒด๊ฐ€ ์žˆ๋Š”)

๋ฐ์ดํ„ฐ ํŒŒ์ผ์— ๊ธฐ๋ก๋œ ๊ฐ์ฒด๋Š” ๋ฐ˜๋“œ์‹œ ํŒŒ์ผ์˜ ์Šคํ‚ค๋งˆ์™€ ์ผ์น˜ํ•ด์•ผํ•œ๋‹ค.

์ผ์น˜ํ•˜์ง€์•Š๋Š” ๊ฒฝ์šฐ์— append์‹œ ์˜ˆ์™ธ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค.

1
2
3
4
5
6
7
8
9
10
File file = new File("data.avro");
DatumWriter<GenericRecord> writer =
 new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter =
new DataFileWriter<GenericRecord>(writer);
dataFileWriter.create(schema, file);
dataFileWriter.append(datum);
dataFileWriter.close();
 
cs

 

ํŒŒ์ผ์— ํฌํ•จ๋œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ฐธ์กฐํ•˜์—ฌ ์ฝ๊ธฐ ๋•Œ๋ฌธ์— ์Šคํ‚ค๋งˆ๋ฅผ ๋”ฐ๋กœ ์ •์˜ํ•˜์ง€์•Š์•„๋„๋จ

getSchema()๋ฅผ ์ด์šฉํ•˜๋ฉด DataFileReader ์ธ์Šคํ„ด์Šค์˜ ์Šคํ‚ค๋งˆ ์ •๋ณด ์–ป์„ ์ˆ˜ ์žˆ๊ณ  ์›๋ณธ ๊ฐ์ฒด์— ์‚ฌ์šฉํ•œ ์Šคํ‚ค๋งˆ์™€ ๊ฐ™์€์ง€ ํ™•์ธ๊ฐ€๋Šฅ 

1
2
3
4
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<GenericRecord>(file, reader);
assertThat("Schema is the same", schema, is(dataFileReader.getSchema()));
cs

DataFileReader๋Š” ์ •๊ทœ ์ž๋ฐ” ๋ฐ˜๋ณต์ž๋กœ hasNext, next๋ฉ”์„œ๋“œ๋ฅผ ๋ฐ˜๋ณต์ ์œผ๋กœ ํ˜ธ์ถœํ•˜์—ฌ

๋ชจ๋“  ๋ฐ์ดํ„ฐ ๊ฐ์ฒด๋ฅผ ์ˆœํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค. ๋ ˆ์ฝ”๋“œ๊ฐ€ ํ•œ๊ฐœ๋งŒ ์žˆ๋Š”์ง€ ํ™•์ธํ•˜๊ณ  ๊ธฐ๋Œ€ํ•œ ํ•„๋“œ๊ฐ’ ์žˆ๋Š” ์ง€ ํ™•์ธ

1
2
3
4
5
assertThat(dataFileReader.hasNext(), is(true));
GenericRecord result = dataFileReader.next();
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toString(), is("R"));
assertThat(dataFileReader.hasNext(), is(false));
cs

์ƒํ˜ธ ์šด์˜์„ฑ

ํŒŒ์ด์ฌ API

1
2
3
4
5
6
import os
import string
import sys
from avro import schema
from avro import io
from avro import datafile
cs

 

์—์ด๋ธŒ๋กœ ๋„๊ตฌ

1
% java -jar $AVRO_HOME/avro-tools-*.jar tojson pairs.avro
cs

 

์Šคํ‚ค๋งˆ ํ•ด์„

๊ธฐ๋กํ•  ๋•Œ ์‚ฌ์šฉํ•œ writer ์Šคํ‚ค๋งˆ์™€ ๋‹ค๋ฅธ reader์˜ ์Šคํ‚ค๋งˆ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์ฝ์„ ์ˆ˜ ์žˆ๋‹ค,.

์ถ”๊ฐ€๋œ ํ•„๋“œ - reader ์‹ ๊ทœ์ผ ๋•Œ, reader๋Š” ์‹ ๊ทœ ํ•„๋“œ์˜ ๊ธฐ๋ณธ๊ฐ’์ด์šฉ

์ถ”๊ฐ€๋œ ํ•„๋“œ - writer ์‹ ๊ทœ์ผ ๋•Œ, reader๋Š” ์‹ ๊ทœ ํ•„๋“œ ๋ชจ๋ฅด๊ธฐ ๋•Œ๋ฌธ์— ๋ฌด์‹œํ•จ

์ œ๊ฑฐ๋œ ํ•„๋“œ - reader ์‹ ๊ทœ์ผ ๋•Œ, ์‚ญ์ œ๋œ ํ•„๋“œ ๋ฌด์‹œ

์ œ๊ฑฐ๋œ ํ•„๋“œ - writer ์‹ ๊ทœ์ผ ๋•Œ, ์ œ๊ฑฐ๋œ ํ•„๋“œ ๊ธฐ๋กํ•˜์ง€ ์•Š์Œ. reader ์˜ ์Šคํ‚ค๋งˆ๋ฅผ writer ์Šคํ‚ค๋งˆ์™€ ๊ฐ™๊ฒŒ ๋งž์ถ”๊ฑฐ๋‚˜ ์ด์ „์œผ๋กœ ๊ฐฑ์‹ ํ•จ

์ •๋ ฌ ์ˆœ์„œ

record๋ฅผ ์ œ์™ธ์•ˆ ๋ชจ๋“  ์ž๋ฃŒํ˜•์—๋Š” ์ˆœ์„œ๊ฐ€ ์ •ํ•ด์ ธ์žˆ์Œ

record๋Š” order ์†์„ฑ ๋ช…์‹œํ•˜์—ฌ ์ •๋ ฌ ์ œ์–ดํ•  ์ˆ˜ ์žˆ๋‹ค.

์˜ค๋ฆ„์ฐจ์ˆœ (๊ธฐ๋ณธ)

๋‚ด๋ฆผ์ฐจ์ˆœ : descending

๋ฌด์‹œ : ignore

 

์—์ด๋ธŒ๋กœ ๋งต๋ฆฌ๋“€์Šค

๋‚ ์”จ ๋ ˆ์ฝ”๋“œ 

1
2
3
4
5
6
7
8
9
10
{
 "type""record",
 "name""WeatherRecord",
 "doc""A weather reading.",
 "fields": [
 {"name""year""type""int"},
 {"name""temperature""type""int"},
 {"name""stationId""type""string"}
 ]
}
cs

์ตœ๊ณ  ๊ธฐ์˜จ์„ ์ฐพ๋Š” ๋งต๋ฆฌ๋“€์Šค ํ”„๋กœ๊ทธ๋žจ, ์—์ด๋ธŒ๋กœ ์ถœ๋ ฅ ๋งŒ๋“ฆ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class AvroGenericMaxTemperature extends Configured implements Tool {
 
 private static final Schema SCHEMA = new Schema.Parser().parse(
 "{" +
 " \"type\": \"record\"," +
 " \"name\": \"WeatherRecord\"," +
 " \"doc\": \"A weather reading.\"," +
 " \"fields\": [" +
 " {\"name\": \"year\", \"type\": \"int\"}," +
 " {\"name\": \"temperature\", \"type\": \"int\"}," +
 " {\"name\": \"stationId\", \"type\": \"string\"}" +
 " ]" +
 "}"
 );
 public static class MaxTemperatureMapper
 extends Mapper<LongWritable, Text, AvroKey<Integer>,
 AvroValue<GenericRecord>> {
 private NcdcRecordParser parser = new NcdcRecordParser();
 private GenericRecord record = new GenericData.Record(SCHEMA);
 @Override
 protected void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {
 parser.parse(value.toString());
 if (parser.isValidTemperature()) {
 record.put("year", parser.getYearInt());
 record.put("temperature", parser.getAirTemperature());
 record.put("stationId", parser.getStationId());
 context.write(new AvroKey<Integer>(parser.getYearInt()),
 new AvroValue<GenericRecord>(record));
 }
 }
 }
 
 public static class MaxTemperatureReducer
 extends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>,
 AvroKey<GenericRecord>, NullWritable> {
 @Override
 protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>>
 values, Context context) throws IOException, InterruptedException {
 GenericRecord max = null;
 for (AvroValue<GenericRecord> value : values) {
 GenericRecord record = value.datum();
 if (max == null ||
360 | Chapter 12: Avro (Integer) record.get("temperature"> (Integer) max.get("temperature")) {
 max = newWeatherRecord(record);
 }
 }
 context.write(new AvroKey(max), NullWritable.get());
 }
 private GenericRecord newWeatherRecord(GenericRecord value) {
 GenericRecord record = new GenericData.Record(SCHEMA);
 record.put("year", value.get("year"));
 record.put("temperature", value.get("temperature"));
 record.put("stationId", value.get("stationId"));
 return record;
 }
 }
 @Override
 public int run(String[] args) throws Exception {
 if (args.length != 2) {
 System.err.printf("Usage: %s [generic options] <input> <output>\n",
 getClass().getSimpleName());
 ToolRunner.printGenericCommandUsage(System.err);
 return -1;
 }
 Job job = new Job(getConf(), "Max temperature");
 job.setJarByClass(getClass());
 job.getConfiguration().setBoolean(
 Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
 FileInputFormat.addInputPath(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job, new Path(args[1]));
 AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
 AvroJob.setMapOutputValueSchema(job, SCHEMA);
 AvroJob.setOutputKeySchema(job, SCHEMA);
 job.setInputFormatClass(TextInputFormat.class);
 job.setOutputFormatClass(AvroKeyOutputFormat.class);
 job.setMapperClass(MaxTemperatureMapper.class);
 job.setReducerClass(MaxTemperatureReducer.class);
 return job.waitForCompletion(true) ? 0 : 1;
 }
 
 public static void main(String[] args) throws Exception {
 int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);
 System.exit(exitCode);
 }
}
cs

ํ”„๋กœ๊ทธ๋žจ ์‹คํ–‰

1
2
3
4
export HADOOP_CLASSPATH=avro-examples.jar
export HADOOP_USER_CLASSPATH_FIRST=true # override version of Avro in Hadoop
% hadoop jar avro-examples.jar AvroGenericMaxTemperature \
 input/ncdc/sample.txt output

๊ฒฐ๊ณผ๋ฌผ์ถœ๋ ฅ

1
2
3
% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro
{"year":1949,"temperature":111,"stationId":"012650-99999"}
{"year":1950,"temperature":22,"stationId":"011990-99999"}
cs

 

์—์ด๋ธŒ๋กœ ๋งต๋ฆฌ๋“€์Šค ์ด์šฉํ•ด ์ •๋ ฌ

์—์ด๋ธŒ๋กœ ๋ฐ์ดํ„ฐ ํŒŒ์ผ์„ ์ •๋ ฌํ•˜๋Š” ๋งต๋ฆฌ๋“€์Šค ํ”„๋กœ๊ทธ๋žจ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class AvroSort extends Configured implements Tool {
 static class SortMapper<K> extends Mapper<AvroKey<K>, NullWritable,
 AvroKey<K>, AvroValue<K>> {
 @Override
 protected void map(AvroKey<K> key, NullWritable value,
 Context context) throws IOException, InterruptedException {
 context.write(key, new AvroValue<K>(key.datum()));
 }
 }
 static class SortReducer<K> extends Reducer<AvroKey<K>, AvroValue<K>,
 AvroKey<K>, NullWritable> {
 @Override
 protected void reduce(AvroKey<K> key, Iterable<AvroValue<K>> values,
 Context context) throws IOException, InterruptedException {
 for (AvroValue<K> value : values) {
 context.write(new AvroKey(value.datum()), NullWritable.get());
 }
 }
 }
 @Override
 public int run(String[] args) throws Exception {
 
 if (args.length != 3) {
 System.err.printf(
 "Usage: %s [generic options] <input> <output> <schema-file>\n",
 getClass().getSimpleName());
 ToolRunner.printGenericCommandUsage(System.err);
 return -1;
 }
 
 String input = args[0];
 String output = args[1];
 String schemaFile = args[2];
 Job job = new Job(getConf(), "Avro sort");
 job.setJarByClass(getClass());
 job.getConfiguration().setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
 FileInputFormat.addInputPath(job, new Path(input));
 FileOutputFormat.setOutputPath(job, new Path(output));
 AvroJob.setDataModelClass(job, GenericData.class);
 Schema schema = new Schema.Parser().parse(new File(schemaFile));
 AvroJob.setInputKeySchema(job, schema);
 AvroJob.setMapOutputKeySchema(job, schema);
 AvroJob.setMapOutputValueSchema(job, schema);
 AvroJob.setOutputKeySchema(job, schema);
 job.setInputFormatClass(AvroKeyInputFormat.class);
 job.setOutputFormatClass(AvroKeyOutputFormat.class);
 job.setOutputKeyClass(AvroKey.class);
 job.setOutputValueClass(NullWritable.class);
 job.setMapperClass(SortMapper.class);
 job.setReducerClass(SortReducer.class);
 return job.waitForCompletion(true) ? 0 : 1;
 }
 
 public static void main(String[] args) throws Exception {
 int exitCode = ToolRunner.run(new AvroSort(), args);
 System.exit(exitCode);
 }
}
cs

์ •๋ ฌ์€ ๋งต๋ฆฌ๋“€์Šค ์…”ํ”Œ ๊ณผ์ •์—์„œ ์ผ์–ด๋‚˜๋ฉฐ ์ •๋ ฌ๊ธฐ๋Šฅ์€ ์—์ด๋ธŒ๋กœ์˜ ์Šคํ‚ค๋งˆ์— ์˜ํ•ด ์ •ํ•ด์ง

์ž…๋ ฅ๋ฐ์ดํ„ฐ ์ ๊ฒ€

1
2
3
4
5
% java -jar $AVRO_HOME/avro-tools-*.jar tojson input/avro/pairs.avro
{"left":"a","right":"1"}
{"left":"c","right":"2"}
{"left":"b","right":"3"}
{"left":"b","right":"2"}
cs

ํ”„๋กœ๊ทธ๋žจ์‚ฌ์šฉํ•˜์—ฌ ์ •๋ ฌ

1
2
% hadoop jar avro-examples.jar AvroSort input/avro/pairs.avro output \
 ch12-avro/src/main/resources/SortedStringPair.avsc
cs

์ •๋ ฌ ํ›„ ์ €์žฅ๋œ ํŒŒ์ผ ์ถœ๋ ฅ

1
2
3
4
5
% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro
{"left":"b","right":"3"}
{"left":"b","right":"2"}
{"left":"c","right":"2"}
{"left":"a","right":"1"}
cs
728x90
๋ฐ˜์‘ํ˜•

+ Recent posts