• No se han encontrado resultados

Adaptación del contenido de las pruebas para personas con discapacidad

Projection and predicate pushdowns involve an execution engine pushing the projec- tion and predicates down to the storage format to optimize the operations at the lowest level possible. This yields space and time advantages, as columns that aren’t required for the query don’t need to be fetched and supplied to the execution engine.

This is especially useful for columnar stores, as pushdowns allow the storage for- mat to skip over entire column groups that aren’t required for the query, and colum- nar formats can perform this operation very efficiently.

In this technique you’ll look at the steps required to use these pushdowns in your Hadoop pipelines.

■ Problem

You want to use pushdowns in Hadoop to optimize your jobs.

■ Solution

Using Hive and Pig in conjunction with Parquet provides out-of-the-box projection pushdowns. With MapReduce there are some manual steps that you need to take in the driver code to enable pushdowns.

■ Discussion

Once again our focus with this technique is Avro. The AvroParquetInputFormat has two methods that you can use for predicate and projection pushdowns. In the following example, only two fields of the Stock object are projected, and a predicate is added so that only Google stocks are selected:53

public static class GoogleStockFilter implements UnboundRecordFilter {

private final UnboundRecordFilter filter; public GoogleStockFilter() {

filter = ColumnRecordFilter.column("symbol", ColumnPredicates.equalTo("GOOG")); }

@Override

public RecordFilter bind(Iterable<ColumnReader> readers) { return filter.bind(readers);

} }

public void run(Path inputPath, Path outputPath) { Configuration conf = super.getConf();

Job job = new Job(conf);

job.setJarByClass(AvroProjectionParquetMapReduce.class);

53GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/parquet/ AvroProjectionParquetMapReduce.java.

Create a class that implements the predicate.

Define the predicate as a filter on Google stocks.

127

TECHNIQUE 24 Pushdown predicates and projection with Parquet

job.setInputFormatClass(AvroParquetInputFormat.class); AvroParquetInputFormat.setInputPaths(job, inputPath); // predicate pushdown AvroParquetInputFormat.setUnboundRecordFilter( job, GoogleStockFilter.class); // projection pushdown

Schema projection = Schema.createRecord(Stock.SCHEMA$.getName(), Stock.SCHEMA$.getDoc(), Stock.SCHEMA$.getNamespace(), false); List<Schema.Field> fields = Lists.newArrayList();

for (Schema.Field field : Stock.SCHEMA$.getFields()) { if ("symbol".equals(field.name()) ||

"open".equals(field.name())) {

fields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order())); } } projection.setFields(fields); AvroParquetInputFormat.setRequestedProjection(job, projection); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputFormatClass(AvroParquetOutputFormat.class); FileOutputFormat.setOutputPath(job, outputPath); AvroParquetOutputFormat.setSchema(job, StockAvg.SCHEMA$); return job.waitForCompletion(true) ? 0 : 1; }

public static class Map extends

Mapper<Void, Stock, Text, DoubleWritable> { @Override

public void map(Void key, Stock value, Context context) { if (value != null) { context.write(new Text(value.getSymbol().toString()), new DoubleWritable(value.getOpen())); } } }

public static class Reduce extends Reducer<Text, DoubleWritable, Void, StockAvg> { @Override

protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) {

Mean mean = new Mean();

for (DoubleWritable val : values) {

Set the predicate pushdown for the job.

Define a new schema for the projection, based on the original Stock schema.

Project just the stock symbol and opening price.

Set the projection for the job.

The original Stock object is still supplied to the mapper.

Check for null in the case that a record is filtered out due to the predicate.

Only use the stock and open fields—all other fields are null due to the projection.

mean.increment(val.get()); }

StockAvg avg = new StockAvg(); avg.setSymbol(key.toString()); avg.setAvg(mean.getResult()); context.write(null, avg); }

}

Predicate filter null values When the predicate that you supply filters out a record, a null value is supplied to your mapper. That’s why you have to check for null before working with the mapper input.

If you run the job and examine the output, you’ll only find the average for the Google stock, demonstrating that the predicate worked:

$ hip hip.ch3.parquet.AvroProjectionParquetMapReduce \ --input stocks.parquet \

--output output

$ hip --nolib parquet.tools.Main cat output/part-r-00000.parquet symbol = GOOG

avg = 417.47799999999995

■ Summary

This technique doesn’t include any Hive or Pig pushdown details, as both tools auto- matically perform these pushdowns as part of their execution. Pushdowns are an important part of your job-optimization work, and if you’re using a third-party library or tool that doesn’t expose pushdowns when working with Parquet, you can help the community by opening a feature request.

3.4.4 Parquet limitations

There are a number of points that you should be aware of when working with Parquet:

■ Parquet requires a lot of memory when writing files because it buffers writes in memory to optimize the encoding and compressing of the data. Either increase the heap size (2 GB is recommended), or decrease the parquet.block.size con- figurable if you encounter memory issues when writing Parquet files.

Using a heavily nested data structure with Parquet will likely limit some of the

optimizations that Parquet makes for pushdowns. If possible, try to flatten your schema.

Hive doesn’t yet support decimal and timestamp data types when working with Par- quet because Parquet doesn’t support them as native types. Work is being tracked in a JIRA ticket titled “Implement all Hive data types in Parquet” (https:// issues.apache.org/jira/browse/HIVE-6384).

■ Impala doesn’t support nested data in Parquet or complex data types such as maps, structs, or arrays. This should be fixed in the Impala 2.x release.

129

Documento similar