์ํ์น ์์ด๋ธ๋ก๋ ?
https://dennyglee.com/2013/03/12/using-avro-with-hdinsight-on-azure-at-343-industries/
- ํน์ ์ธ์ด์ ์ข
์๋์ง ์๋ ์ธ์ด ์ค๋ฆฝ์ ๋ฐ์ดํฐ ์ง๋ ฌํ ์์คํ
- ํ๋ก Writable์ ์ฃผ์ ๋จ์ ์ธ ์ธ์ด ์ด์์ฑ ํด๊ฒฐ ์ํด ์๊ฒจ๋จ
์ํ์น ์ฐ๋ฆฌํํธ, ๊ตฌ๊ธ ํ๋กํ ์ฝ ๋ฒํผ์ ๋ค๋ฅธ ์ฐจ๋ณํ๋ ํน์ฑ๊ฐ์ง๊ณ ์์
๋ฐ์ดํฐ๋ ๋ค๋ฅธ ์์คํ
๊ณผ ๋น์ทํ๊ฒ ์ธ์ด ๋
๋ฆฝ ์คํค๋ง๋ก ๊ธฐ์ ๋จ
์์ด๋ธ๋ก์์ ์ฝ๋ ์์ฑ์ ์ ํ์ฌํญ์
๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ ์ฐ๋ ์์ ์ ์คํค๋ง๋ ํญ์ ์กด์ฌํ๋ค ๊ฐ์ ํจ - ๋งค์ฐ ๊ฐ๊ฒฐํ ์ฝ๋ฉ์ด ๊ฐ๋ฅ
์คํค๋ง์ ์์ฑ
JSON
๋ฐ์ดํฐ๋ ๋ฐ์ด๋๋ฆฌ ํฌ๋งท์ผ๋ก ์ธ์ฝ๋ฉ
์์ด๋ธ๋ก ๋ช
์ธ - ๋ชจ๋ ๊ตฌํ์ฒด๊ฐ ์ง์ํด์ผ ํ๋ ๋ฒ์ด๋๋ฆฌ ํฌ๋งท์ ๋ํ ์์ธํ ๋ด์ฉ
API - ์์ด๋ธ๋ก ๋ช
์ธ์์ ๋น ์ ธ์๋ ๋ด์ฉ์. ๊ฐ ํน์ ์ธ์ด์ ๋ฐ๋ผ ๋ค๋ฅด๊ฒ ์์ฑ๋จ. ์ธ์ด์ ๋ฐ์ธ๋ฉ ํธ์์ฑ ๋์ด๊ณ ์ํธ์ด์์ฑ ์ ํ ๋ฌธ์ ํด๊ฒฐ๋จ
์คํค๋งํด์ - ์ ์คํ๊ฒ ์ ์๋ ์ด๋ ํ ์ ์ฝ์กฐ๊ฑด์์๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋ ๋ฐ ์ฌ์ฉ๋๋ ์คํค๋ง์ ๋ฐ์ดํฐ๋ฅผ ์ฐ๋ ๋ฐ ์ฌ์ฉ๋๋ ์คํค๋ง๊ฐ ๊ฐ์ง ์์๋ ๋๋ค.(์คํค๋ง ๋ณํ ๋ฉ์ปค๋์ฆ)
ex ) ๊ณผ๊ฑฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ ๋ ์ฌ์ฉํ ์คํค๋ง์ ์๋ก์ด ํ๋๋ฅผ ์ถ๊ฐํ ์ ์๋ค. ์๋ก์ด ์ฌ์ฉ์์ ๊ธฐ์กด ์ฌ์ฉ์๋ ๋ชจ๋ ๊ณผ๊ฑฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ฌธ์ ์์ด ์ฝ์ ์ ์์ผ๋ฉฐ์๋ก์ด ์ฌ์ฉ์๋ ์๋ก์ด ํ๋๊ฐ ์ถ๊ฐ๋ ๋ฐ์ดํฐ๋ฅผ ์ธ์ ์๋ค. ๊ธฐ์กด ์ฌ์ฉ์๋ ์๋ก์ด ๋ฐ์ดํฐ๋ฅผ ๋ณด๊ฒ๋๋๋ฐ ์๋ก์ด ํ๋๋ ๋ฌด์ํ๊ณ ๊ธฐ์กด ๋ฐ์ดํฐ ์์
์ฒ๋ผ ์ฒ๋ฆฌํ ์ ์๋ค.
๊ฐ์ฒด ์ปจํ
์ด๋ ํฌ๋งท ์ ๊ณต(ํ๋ก ์ํ์ค ํ์ผ๊ณผ ์ ์ฌํจ)
์์ด๋ธ๋ก ๋ฐ์ดํฐ ํ์ผ์ ์คํค๋ง๊ฐ ์ ์ฅ๋ ๋ฉํ๋ฐ์ดํฐ ์น์
์ ํฌํจํ๊ณ ์์ด ์์ ์ ์ค๋ช
ํ๋ ํ์ผ์
์์ด๋ธ๋ก ๋ฐ์ดํฐ ํ์ผ์ ์์ถ๊ณผ ๋ถํ ๊ธฐ๋ฅ ์ ๊ณต
์์ด๋ธ๋ก ์๋ฃํ๊ณผ ์คํค๋ง
์์ด๋ธ๋ก์ ๊ธฐ๋ณธ ์๋ฃํ ํ์
|
{"type":"null"}
{"type":"boolean"}
{"type":"int"}
{"type":"long"}
{"type":"float"}
{"type":"double"}
{"type":"bytes"}
{"type":"string"}
|
cs |
์์ด๋ธ๋ก ๋ณตํฉ ์๋ฃํ
array ์์์๋ ๊ฐ์ฒด ์งํฉ, ๋์ผ ํ์
|
{
"type": "array",
"items": "long"
}
|
cs |
map ์์ ์๋ ํค-๊ฐ, ๋์ผ ํ์
|
{
"type": "map",
"values": "string"
}
|
cs |
record ์์์ ์๋ฃํ
|
{
"type": "record",
"name": "WeatherRecord",
"doc": "A weather reading.",
"fields": [
{"name": "year", "type": "int"},
{"name": "temperature", "type": "int"},
{"name": "stationId", "type": "string"}
]
}
|
cs |
enum ๋ช
๋ช
๋ ๊ฐ์ ์งํฉ
|
{
"type": "enum",
"name": "Cutlery",
"doc": "An eating utensil.",
"symbols": ["KNIFE", "FORK", "SPOON"]
}
|
cs |
fixed ๊ณ ์ ๊ธธ์ด 8๋นํธ ๋ถํธ ์๋ ๋ฐ์ดํธ
|
{
"type": "fixed",
"name": "Md5Hash",
"size": 16
}
|
cs |
union ์คํค๋ง์ ์ ๋์จ, ๋ฐฐ์ด์ ๊ฐ์์๋ ์คํค๋ง์
|
[
"null",
"string",
{"type": "map", "values": "string"}
]
|
cs |
์์ด๋ธ๋ก ์๋ฃํ๊ณผ ๋ค๋ฅธ ํ๋ก๊ทธ๋๋ฐ ์ธ์ด ์๋ฃํ ๋งคํ ํ์
์๋ฐ - ์ ๋ค๋ฆญ ๋งคํ : ์คํค๋ง๋ฅผ ๊ฒฐ์ ํ ์ ์์ ๋
์๋ฐ, C++ - ๊ตฌ์ฒด์ ๋งคํ : ์คํค๋ง ๋ฐ์ดํฐ ํํ ์ฝ๋ ์์ฑ
์๋ฐ - ๋ฆฌํ๋ ํธ ๋งคํ : ์์ด๋ธ๋ก ์๋ฃํ์ ๊ธฐ์กด ์๋ฐ ์๋ฃํ์ผ๋ก ๋งคํ
์ธ๋ฉ๋ชจ๋ฆฌ ์ง๋ ฌํ์ ์ญ์ง๋ ฌํ
์์ด๋ธ๋ก ์คํค๋ง ์์ - ํด๋น ์์ด๋ธ๋ก ์คํค๋ง๋ StringPair.avsc ์ ์ ์ฅ๋จ
|
{
"type": "record",
"name": "StringPair",
"doc": "A pair of strings.",
"fields": [
{"name": "left", "type": "string"},
{"name": "right", "type": "string"}
]
}
|
cs |
ํ์ผ์ ํด๋์ค ๊ฒฝ๋ก์ ์ ์ฅํ ํ ๋ก๋ฉํจ
|
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(
getClass().getResourceAsStream("StringPair.avsc"));
|
cs |
์ ๋ค๋ฆญ API ๋ฅผ ์ฌ์ฉํ์ฌ ์์ด๋ธ๋ก ๋ ์ฝ๋์ ์ธ์คํด์ค๋ฅผ ์์ฑํจ
|
GenericRecord datum = new GenericData.Record(schema);
datum.put("left", "L");
datum.put("right", "R");
|
cs |
์ถ๋ ฅ ์คํธ๋ฆผ์ ๋ ์ฝ๋๋ฅผ ์ง๋ ฌํํจ
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer =
new GenericDatumWriter<GenericRecord>(schema);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(datum, encoder);
encoder.flush();
out.close();
|
cs |
DatumReader : ๋ฐ์ดํฐ ๊ฐ์ฒด๋ฅผ ์ธ์ฝ๋๊ฐ ์ดํดํ ์ ์๋ ์๋ฃํ์ผ๋ก ๋ฐํ
GenericDatumReader : GenericRecord์ ํ๋๋ฅผ ์ธ์ฝ๋๋ก ์ ๋ฌ
์ด์ ์ ์์ฑ๋ ์ธ์ฝ๋๋ฅผ ์ฌ์ฌ์ฉํ์ง ์๊ธฐ ๋๋ฌธ์ ์ธ์ฝ๋ ํฉํ ๋ฆฌ์ null ์ ๋ฌ
writer() ์คํธ๋ฆผ ๋ซ๊ธฐ ์ ์ ํ์ํ๋ฉด ๋ ํธ์ถ ๊ฐ๋ฅ
encoder.flush(); write๋ฉ์๋ ํธ์ถ ํ ์ธ์ฝ๋ ํ๋ฌ์ํ๊ณ ์ถ๋ ฅ ์คํธ๋ฆผ ๋ซ์
ํด๋น ๊ณผ์ ๋ฐ๋๋ก ํ๋ฉด ๋ฐ์ดํธ ๋ฒํผ์์ ๊ฐ์ฒด ์ฝ์ ์ ์์
|
DatumReader<GenericRecord> reader =
new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(),
null);
GenericRecord result = reader.read(null, decoder);
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toString(), is("R"));
|
cs |
๊ตฌ์ฒด์ ์ธ API
๋ฉ์ด๋ธ์ ์ถ๊ฐํ์ฌ ์๋ฐ๋ก๋ ์คํค๋ง ์ฝ๋ ์์ฑํ ์ ์๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
<project>
...
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<includes>
<include>StringPair.avsc</include>
</includes>
<stringType>String</stringType>
<sourceDirectory>src/main/resources</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources/java
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...
</project>
|
cs |
์์ด๋ธ๋ก ๋ฐ์ดํฐ ํ์ผ
์ธ๋ฉ๋ชจ๋ฆฌ ์คํธ๋ฆผ์์ ํ์ผ ์ฝ๊ธฐ
๋ฐ์ดํฐ ํ์ผ = ์์ด๋ธ๋ก ์คํค๋ง + ํค๋ (๋ฉํ๋ฐ์ดํฐ (์ฑํฌ๋ง์ปค ํฌํจ)) +์ผ๋ จ์ ๋ธ๋ก (์ง๋ ฌํ๋ ์์ด๋ธ๋ก ๊ฐ์ฒด๊ฐ ์๋)
๋ฐ์ดํฐ ํ์ผ์ ๊ธฐ๋ก๋ ๊ฐ์ฒด๋ ๋ฐ๋์ ํ์ผ์ ์คํค๋ง์ ์ผ์นํด์ผํ๋ค.
์ผ์นํ์ง์๋ ๊ฒฝ์ฐ์ append์ ์์ธ ๋ฐ์ํ ์ ์๋ค.
|
File file = new File("data.avro");
DatumWriter<GenericRecord> writer =
new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter =
new DataFileWriter<GenericRecord>(writer);
dataFileWriter.create(schema, file);
dataFileWriter.append(datum);
dataFileWriter.close();
|
cs |
ํ์ผ์ ํฌํจ๋ ๋ฉํ๋ฐ์ดํฐ ์ฐธ์กฐํ์ฌ ์ฝ๊ธฐ ๋๋ฌธ์ ์คํค๋ง๋ฅผ ๋ฐ๋ก ์ ์ํ์ง์์๋๋จ
getSchema()๋ฅผ ์ด์ฉํ๋ฉด DataFileReader ์ธ์คํด์ค์ ์คํค๋ง ์ ๋ณด ์ป์ ์ ์๊ณ ์๋ณธ ๊ฐ์ฒด์ ์ฌ์ฉํ ์คํค๋ง์ ๊ฐ์์ง ํ์ธ๊ฐ๋ฅ
|
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<GenericRecord>(file, reader);
assertThat("Schema is the same", schema, is(dataFileReader.getSchema()));
|
cs |
DataFileReader๋ ์ ๊ท ์๋ฐ ๋ฐ๋ณต์๋ก hasNext, next๋ฉ์๋๋ฅผ ๋ฐ๋ณต์ ์ผ๋ก ํธ์ถํ์ฌ
๋ชจ๋ ๋ฐ์ดํฐ ๊ฐ์ฒด๋ฅผ ์ํํ ์ ์๋ค. ๋ ์ฝ๋๊ฐ ํ๊ฐ๋ง ์๋์ง ํ์ธํ๊ณ ๊ธฐ๋ํ ํ๋๊ฐ ์๋ ์ง ํ์ธ
|
assertThat(dataFileReader.hasNext(), is(true));
GenericRecord result = dataFileReader.next();
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toString(), is("R"));
assertThat(dataFileReader.hasNext(), is(false));
|
cs |
์ํธ ์ด์์ฑ
ํ์ด์ฌ API
|
import os
import string
import sys
from avro import schema
from avro import io
from avro import datafile
|
cs |
์์ด๋ธ๋ก ๋๊ตฌ
|
% java -jar $AVRO_HOME/avro-tools-*.jar tojson pairs.avro
|
cs |
์คํค๋ง ํด์
๊ธฐ๋กํ ๋ ์ฌ์ฉํ writer ์คํค๋ง์ ๋ค๋ฅธ reader์ ์คํค๋ง๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์ฝ์ ์ ์๋ค,.
์ถ๊ฐ๋ ํ๋ - reader ์ ๊ท์ผ ๋, reader๋ ์ ๊ท ํ๋์ ๊ธฐ๋ณธ๊ฐ์ด์ฉ
์ถ๊ฐ๋ ํ๋ - writer ์ ๊ท์ผ ๋, reader๋ ์ ๊ท ํ๋ ๋ชจ๋ฅด๊ธฐ ๋๋ฌธ์ ๋ฌด์ํจ
์ ๊ฑฐ๋ ํ๋ - reader ์ ๊ท์ผ ๋, ์ญ์ ๋ ํ๋ ๋ฌด์
์ ๊ฑฐ๋ ํ๋ - writer ์ ๊ท์ผ ๋, ์ ๊ฑฐ๋ ํ๋ ๊ธฐ๋กํ์ง ์์. reader ์ ์คํค๋ง๋ฅผ writer ์คํค๋ง์ ๊ฐ๊ฒ ๋ง์ถ๊ฑฐ๋ ์ด์ ์ผ๋ก ๊ฐฑ์ ํจ
์ ๋ ฌ ์์
record๋ฅผ ์ ์ธ์ ๋ชจ๋ ์๋ฃํ์๋ ์์๊ฐ ์ ํด์ ธ์์
record๋ order ์์ฑ ๋ช
์ํ์ฌ ์ ๋ ฌ ์ ์ดํ ์ ์๋ค.
์ค๋ฆ์ฐจ์ (๊ธฐ๋ณธ)
๋ด๋ฆผ์ฐจ์ : descending
๋ฌด์ : ignore
์์ด๋ธ๋ก ๋งต๋ฆฌ๋์ค
๋ ์จ ๋ ์ฝ๋
|
{
"type": "record",
"name": "WeatherRecord",
"doc": "A weather reading.",
"fields": [
{"name": "year", "type": "int"},
{"name": "temperature", "type": "int"},
{"name": "stationId", "type": "string"}
]
}
|
cs |
์ต๊ณ ๊ธฐ์จ์ ์ฐพ๋ ๋งต๋ฆฌ๋์ค ํ๋ก๊ทธ๋จ, ์์ด๋ธ๋ก ์ถ๋ ฅ ๋ง๋ฆ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
public class AvroGenericMaxTemperature extends Configured implements Tool {
private static final Schema SCHEMA = new Schema.Parser().parse(
"{" +
" \"type\": \"record\"," +
" \"name\": \"WeatherRecord\"," +
" \"doc\": \"A weather reading.\"," +
" \"fields\": [" +
" {\"name\": \"year\", \"type\": \"int\"}," +
" {\"name\": \"temperature\", \"type\": \"int\"}," +
" {\"name\": \"stationId\", \"type\": \"string\"}" +
" ]" +
"}"
);
public static class MaxTemperatureMapper
extends Mapper<LongWritable, Text, AvroKey<Integer>,
AvroValue<GenericRecord>> {
private NcdcRecordParser parser = new NcdcRecordParser();
private GenericRecord record = new GenericData.Record(SCHEMA);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value.toString());
if (parser.isValidTemperature()) {
record.put("year", parser.getYearInt());
record.put("temperature", parser.getAirTemperature());
record.put("stationId", parser.getStationId());
context.write(new AvroKey<Integer>(parser.getYearInt()),
new AvroValue<GenericRecord>(record));
}
}
}
public static class MaxTemperatureReducer
extends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>,
AvroKey<GenericRecord>, NullWritable> {
@Override
protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>>
values, Context context) throws IOException, InterruptedException {
GenericRecord max = null;
for (AvroValue<GenericRecord> value : values) {
GenericRecord record = value.datum();
if (max == null ||
360 | Chapter 12: Avro (Integer) record.get("temperature") > (Integer) max.get("temperature")) {
max = newWeatherRecord(record);
}
}
context.write(new AvroKey(max), NullWritable.get());
}
private GenericRecord newWeatherRecord(GenericRecord value) {
GenericRecord record = new GenericData.Record(SCHEMA);
record.put("year", value.get("year"));
record.put("temperature", value.get("temperature"));
record.put("stationId", value.get("stationId"));
return record;
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(getConf(), "Max temperature");
job.setJarByClass(getClass());
job.getConfiguration().setBoolean(
Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
AvroJob.setMapOutputValueSchema(job, SCHEMA);
AvroJob.setOutputKeySchema(job, SCHEMA);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);
System.exit(exitCode);
}
}
|
cs |
ํ๋ก๊ทธ๋จ ์คํ
|
% export HADOOP_CLASSPATH=avro-examples.jar
% export HADOOP_USER_CLASSPATH_FIRST=true # override version of Avro in Hadoop
% hadoop jar avro-examples.jar AvroGenericMaxTemperature \
input/ncdc/sample.txt output
|
๊ฒฐ๊ณผ๋ฌผ์ถ๋ ฅ
|
% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro
{"year":1949,"temperature":111,"stationId":"012650-99999"}
{"year":1950,"temperature":22,"stationId":"011990-99999"}
|
cs |
์์ด๋ธ๋ก ๋งต๋ฆฌ๋์ค ์ด์ฉํด ์ ๋ ฌ
์์ด๋ธ๋ก ๋ฐ์ดํฐ ํ์ผ์ ์ ๋ ฌํ๋ ๋งต๋ฆฌ๋์ค ํ๋ก๊ทธ๋จ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
public class AvroSort extends Configured implements Tool {
static class SortMapper<K> extends Mapper<AvroKey<K>, NullWritable,
AvroKey<K>, AvroValue<K>> {
@Override
protected void map(AvroKey<K> key, NullWritable value,
Context context) throws IOException, InterruptedException {
context.write(key, new AvroValue<K>(key.datum()));
}
}
static class SortReducer<K> extends Reducer<AvroKey<K>, AvroValue<K>,
AvroKey<K>, NullWritable> {
@Override
protected void reduce(AvroKey<K> key, Iterable<AvroValue<K>> values,
Context context) throws IOException, InterruptedException {
for (AvroValue<K> value : values) {
context.write(new AvroKey(value.datum()), NullWritable.get());
}
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 3) {
System.err.printf(
"Usage: %s [generic options] <input> <output> <schema-file>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
String input = args[0];
String output = args[1];
String schemaFile = args[2];
Job job = new Job(getConf(), "Avro sort");
job.setJarByClass(getClass());
job.getConfiguration().setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
AvroJob.setDataModelClass(job, GenericData.class);
Schema schema = new Schema.Parser().parse(new File(schemaFile));
AvroJob.setInputKeySchema(job, schema);
AvroJob.setMapOutputKeySchema(job, schema);
AvroJob.setMapOutputValueSchema(job, schema);
AvroJob.setOutputKeySchema(job, schema);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroSort(), args);
System.exit(exitCode);
}
}
|
cs |
์ ๋ ฌ์ ๋งต๋ฆฌ๋์ค ์
ํ ๊ณผ์ ์์ ์ผ์ด๋๋ฉฐ ์ ๋ ฌ๊ธฐ๋ฅ์ ์์ด๋ธ๋ก์ ์คํค๋ง์ ์ํด ์ ํด์ง
์
๋ ฅ๋ฐ์ดํฐ ์ ๊ฒ
|
% java -jar $AVRO_HOME/avro-tools-*.jar tojson input/avro/pairs.avro
{"left":"a","right":"1"}
{"left":"c","right":"2"}
{"left":"b","right":"3"}
{"left":"b","right":"2"}
|
cs |
ํ๋ก๊ทธ๋จ์ฌ์ฉํ์ฌ ์ ๋ ฌ
|
% hadoop jar avro-examples.jar AvroSort input/avro/pairs.avro output \
ch12-avro/src/main/resources/SortedStringPair.avsc
|
cs |
์ ๋ ฌ ํ ์ ์ฅ๋ ํ์ผ ์ถ๋ ฅ
|
% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro
{"left":"b","right":"3"}
{"left":"b","right":"2"}
{"left":"c","right":"2"}
{"left":"a","right":"1"}
|
cs |