Before data enters the reduce phase, it is sorted on its key. The reduce function takes a key/value pair and performs an aggregation on each value associated with the same key and writes it to disk (or HDFS with Hadoop) in some format specified by the user. In the output, the “key” is a word and the “value” is the number of times that word appeared in all tweets in the file.
HadoopStreaming: Anatomy of a HadoopStreaming Job
First, we create a file that will contain the map and reduce functions. 1 #! /u s r/b i n/e n v R s c r i p t # a l l o w s s c r i p t to be E X E C U T A B L E . 2 3 l i b r a r y( H a d o o p S t r e a m i n g ) # n e e d t h e p a c k a g e . 4 5 # u s e r c a n c r e a t e o w n c o m m a n d l i n e a r g u m e n t s . 6 # By d e f a u l t , c e r t a i n a r g u m e n t s a r e a l r e a d y p a r s e d f o r y o u . 7 o p t s < - c() 8 # G e t s a r g u m e n t s f r o m t h e e n v i r o n m e n t 9 op < - h s C m d L i n e A r g s ( opts , o p e n C o n n e c t i o n s = T R U E )
HadoopStreaming: Anatomy of a HadoopStreaming Job
When running an R script, we can pass command line arguments. For example:
./mapReduce.R -m
Short Name Character Argument Type Description
mapper m None logical Runs the mapper.
reducer r None logical Runs the reducer.
infile i Required character Input file, otherwise STDIN.
outfile o Required character Output file, otherwise STDOUT.
HadoopStreaming: Anatomy of a HadoopStreaming Job
op is populated with a bunch of command line arguments for you. If -m is passed to a script, op$mapper is TRUE and the mapper is run.
10 if ( op$m a p p e r ) { 11 m a p p e r < - f u n c t i o n( x ) { 12 # t o k e n i z e e a c h t w e e t 13 w o r d s < - u n l i s t(s t r s p l i t( x , " [[: p u n c t : ] [ : s p a c e : ] ] + ") ) 14 w o r d s < - w o r d s [!( w o r d s ==’ ’) ] 15 # C r e a t e a d a t a f r a m e w i t h 1 c o l u m n : t h e w o r d s . 16 df < - d a t a.f r a m e( W o r d = w o r d s ) 17 # A d d a c o l u m n c a l l e d count , i n i t i a l i z e d to 1. 18 df[ ,’ C o u n t ’] = 1 19 # S e n d t h i s o u t to t h e c o n s o l e f o r t h e r e d u c e r . 20 h s W r i t e T a b l e (df[ ,c(’ W o r d ’,’ C o u n t ’) ] , f i l e= op $outcon , sep =’ , ’) 21 } 22 # R e a d a l i n e f r o m IN .
HadoopStreaming: Anatomy of a HadoopStreaming Job
op is populated with a bunch of command line arguments for you. If -r is passed to a script, op$reducer is TRUE and the reducer is run.
25 e l s e if ( op$r e d u c e r ) {
26 # D e f i n e t h e r e d u c e r f u n c t i o n .
27 # It j u s t p r i n t s t h e word , t h e s u m of t h e c o u n t s 28 # s e p a r a t e d by c o m m a .
29 r e d u c e r < - f u n c t i o n( d ) {
30 cat( d [1 ,’ W o r d ’] , sum( d$C o u n t ) , ’ \ n ’, sep =’ , ’)
31 } 32 # D e f i n e t h e c o l u m n n a m e s a n d t y p e s f o r o u t p u t . 33 c o l s = l i s t( W o r d =’ ’, C o u n t =0) 34 h s T a b l e R e a d e r ( op$incon , cols , c h u n k S i z e = op$c h u n k S i z e , s k i p =0 , 35 sep =’ , ’, k e y C o l =’ W o r d ’, s i n g l e K e y = T , i g n o r e K e y = F , 36 FUN = r e d u c e r ) 37 }
HadoopStreaming: Anatomy of a HadoopStreaming Job
Finally, we clean after ourselves and close the connections to the in and out connections.
38 if (! is.na( o p t s$i n f i l e ) ) { 39 c l o s e( opt$i n c o n ) 40 } 41 42 if (! is.na( opt$o u t f i l e ) ) { 43 c l o s e( opt$o u t c o n ) 44 }
HadoopStreaming: Running a HadoopStreaming Job
HadoopStreaming allows the user to pass data to map/reduce from the command line using a pipe, or using a file. To input data using a pipe:
cat twitter.tsv | ./count.R -m | sort | ./count.R -r
To input data using a file:
HadoopStreaming: Running a HadoopStreaming Job
Or we can run the script using, you know, Hadoop Streaming!
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar \ -input /home/ryan/hdfs/in \
-output ~/hdfs/out \ -mapper "count.R -m" \ -reducer "count.R -r" \ -file ./count.R
Hadoop produces a lot of status and progress output and provides a web interface that you can explore when using it.
HadoopStreaming: Running a HadoopStreaming Job
Output looks as follows. a,13 about,2 action,1 acutely,1 adapting,1 affairs,1 after,1 again,2 ah,2 Ah,1
HadoopStreaming vs. Rhipe
Rhipe is another R interface for Hadoop that provides a more “native” feel to it.
1 incorporates an rhlapply function similar to the standard
apply variants.
2 uses Google Protocol Buffers.
3 seems to have great flexibility in modifying Hadoop
parameters.
4 has more use cases and flexibility in how to run jobs and how
to transmit and receive data.
5 but... is more complicated to use and requires a more