001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter;
031
032import java.io.IOException;
033
034import org.apache.commons.lang.StringUtils;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.io.LongWritable;
038import org.apache.hadoop.io.NullWritable;
039import org.apache.hadoop.io.Text;
040import org.apache.hadoop.mapreduce.Counter;
041import org.apache.hadoop.mapreduce.InputFormat;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.Mapper;
044import org.apache.hadoop.mapreduce.Reducer;
045import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
046import org.apache.hadoop.util.Tool;
047import org.apache.hadoop.util.ToolRunner;
048import org.openimaj.hadoop.tools.HadoopToolsUtil;
049import org.openimaj.text.nlp.TweetTokeniser;
050import org.openimaj.text.nlp.TweetTokeniserException;
051import org.openimaj.twitter.GeneralJSONTwitter;
052import org.openimaj.twitter.USMFStatus;
053
054public class HadoopLZOTest extends Configured implements Tool {
055        enum CounterEnum {
056                CHEESE, FLEES;
057        }
058
059        public static class CounterMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
060                public CounterMapper() {
061                        // TODO Auto-generated constructor stub
062                }
063
064                @Override
065                protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
066                                throws java.io.IOException, InterruptedException
067                {
068                        final USMFStatus status = new USMFStatus(GeneralJSONTwitter.class);
069                        status.fillFromString(value.toString());
070
071                        context.getCounter(CounterEnum.CHEESE).increment(10);
072                        context.getCounter(CounterEnum.FLEES).increment(20);
073                        if (status.isInvalid())
074                                return;
075                        try {
076                                final TweetTokeniser tok = new TweetTokeniser(status.text);
077                                context.write(key, new Text(StringUtils.join(tok.getTokens(), ",")));
078                        } catch (final TweetTokeniserException e) {
079                        }
080                }
081        }
082
083        public static class CounterReducer extends Reducer<LongWritable, Text, NullWritable, Text> {
084                public CounterReducer() {
085                        // TODO Auto-generated constructor stub
086                }
087
088                @Override
089                protected void reduce(LongWritable key, Iterable<Text> values,
090                                Reducer<LongWritable, Text, NullWritable, Text>.Context context)
091                {
092                        final Counter cheeseCounter = context.getCounter(CounterEnum.CHEESE);
093                        final Counter fleesCounter = context.getCounter(CounterEnum.FLEES);
094                        System.out.println(cheeseCounter.getName() + ": " + cheeseCounter.getValue());
095                        System.out.println(fleesCounter.getName() + ": " + fleesCounter.getValue());
096                        for (final Text text : values) {
097                                try {
098                                        context.write(NullWritable.get(), text);
099                                } catch (final IOException e) {
100                                } catch (final InterruptedException e) {
101                                }
102
103                        }
104
105                }
106        }
107
108        @Override
109        public int run(String[] args) throws Exception {
110                Class<? extends InputFormat> lzoClass = null;
111                try {
112                        lzoClass = (Class<? extends InputFormat>) Class.forName("com.hadoop.mapreduce.LzoTextInputFormat");
113                } catch (final ClassNotFoundException nfe) {
114                        System.err.println("LZO not installed; skipping");
115                        return -1;
116                }
117
118                final Path[] paths = new Path[] { new Path(args[0]) };
119                final Path out = new Path(args[1]);
120                HadoopToolsUtil.validateOutput(args[1], true);
121                final Job job = new Job(this.getConf());
122
123                job.setInputFormatClass(lzoClass);
124                job.setOutputKeyClass(LongWritable.class);
125                job.setOutputValueClass(Text.class);
126                job.setOutputFormatClass(TextOutputFormat.class);
127                job.setJarByClass(this.getClass());
128
129                lzoClass.getMethod("setInputPaths", Path[].class).invoke(null, paths);
130                TextOutputFormat.setOutputPath(job, out);
131                job.setMapperClass(CounterMapper.class);
132                job.setReducerClass(CounterReducer.class);
133
134                long start, end;
135                start = System.currentTimeMillis();
136                job.waitForCompletion(true);
137                end = System.currentTimeMillis();
138                System.out.println("Took: " + (end - start) + "ms");
139                return 0;
140        }
141
142        public static void main(String[] args) throws Exception {
143                ToolRunner.run(new HadoopLZOTest(), args);
144        }
145}