001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.mode.dfidf;
031
032import gnu.trove.procedure.TObjectIntProcedure;
033
034import java.io.ByteArrayInputStream;
035import java.io.ByteArrayOutputStream;
036import java.io.DataOutput;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.HashMap;
040import java.util.List;
041import java.util.TreeSet;
042
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.io.BytesWritable;
046import org.apache.hadoop.io.LongWritable;
047import org.apache.hadoop.io.Text;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.mapreduce.Mapper;
050import org.apache.hadoop.mapreduce.Reducer;
051import org.kohsuke.args4j.CmdLineException;
052import org.openimaj.hadoop.mapreduce.stage.StageProvider;
053import org.openimaj.hadoop.mapreduce.stage.helper.SimpleSequenceFileStage;
054import org.openimaj.hadoop.tools.HadoopToolsUtil;
055import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions;
056import org.openimaj.hadoop.tools.twitter.token.mode.TextEntryType;
057import org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter;
058import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.TimeFrequencyHolder.TimeFrequency;
059import org.openimaj.hadoop.tools.twitter.utils.TimeperiodTweetCountWordCount;
060import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap;
061import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
062import org.openimaj.io.IOUtils;
063import org.openimaj.io.wrappers.WriteableListBinary;
064
065/**
066 * A Stage provider wrapping the functionality of
067 * {@link CountWordsAcrossTimeperiod.Map} and
068 * {@link CountWordsAcrossTimeperiod.Reduce}.
069 * 
070 * The Map expects times as keys and maps of word counts as input. The mapper
071 * emits words as keys with pairs of the timeperiods and counts within that time
072 * period
073 * 
074 * This along with the TimeIndex generated by the
075 * {@link CountTweetsInTimeperiod} is used to construct a DFIDF per word per
076 * time period.
077 * 
078 * One interpretation of the DFIDF score is a weighting based on counts up to a
079 * particular time period. This functionality is encoded in the
080 * {@link NonCombinedTimesReducer}
081 * 
082 * Another interpretation is that the DFIDF score weights words based on word
083 * occurences at the end of some period of time, i.e. the count at some END
084 * 
085 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
086 * 
087 */
088public class CountWordsAcrossTimeperiod extends StageProvider {
089        private String[] nonHadoopArgs;
090        private boolean combinedTimes = false;
091
092        /**
093         * 
094         * @param outpath
095         *            the root of the output
096         * @param nonHadoopArgs
097         */
098        public CountWordsAcrossTimeperiod(String[] nonHadoopArgs) {
099                this.nonHadoopArgs = nonHadoopArgs;
100        }
101
102        /**
103         * @param nonHadoopArgs
104         * @param combinedTimes
105         *            whether the mapper expects times entries with values for each
106         *            word. i.e. combined times
107         */
108        public CountWordsAcrossTimeperiod(String[] nonHadoopArgs, boolean combinedTimes) {
109                this.nonHadoopArgs = nonHadoopArgs;
110                this.combinedTimes = combinedTimes;
111        }
112
113        /**
114         * arg key
115         */
116        public static final String ARGS_KEY = "TOKEN_ARGS";
117        private static final LongWritable END_TIME = new LongWritable(-1);
118        /**
119         * where the intermediat word counts should be stored
120         */
121        public final static String WORDCOUNT_DIR = "wordtimeperiodDFIDF";
122
123        /**
124         * function(timePeriodLength) map input: <timePeriod:
125         * <<tweet:#freq>,<word:#freq>,<word:#freq>,...> map output: [ word:
126         * <timeperiod, tweet:#freq, word:#freq>, word: <timeperiod, tweet:#freq,
127         * word:#freq>, ... ]
128         * 
129         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
130         * 
131         */
132        public static class Map extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> {
133
134                /**
135                 * Mapper constructor doesn't do anything (Mapper constructor doesn't
136                 * give a fuck)
137                 */
138                public Map() {
139
140                }
141
142                private static HadoopTwitterTokenToolOptions options;
143
144                protected static synchronized void loadOptions(
145                                Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException
146                {
147                        if (options == null) {
148                                try {
149                                        options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(ARGS_KEY));
150                                        options.prepare();
151                                } catch (final CmdLineException e) {
152                                        throw new IOException(e);
153                                } catch (final Exception e) {
154                                        throw new IOException(e);
155                                }
156                        }
157                }
158
159                @Override
160                protected void setup(Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context)
161                                throws IOException, InterruptedException
162                {
163                        loadOptions(context);
164                }
165
166                @Override
167                protected void map(final LongWritable key, BytesWritable value,
168                                final Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context)
169                                throws java.io.IOException, InterruptedException
170                {
171
172                        final TweetCountWordMap periodCountWordCount = IOUtils.read(new ByteArrayInputStream(value.getBytes()),
173                                        TweetCountWordMap.class);
174                        final boolean written = periodCountWordCount.getTweetWordMap().forEachEntry(new TObjectIntProcedure<String>()
175                        {
176
177                                @Override
178                                public boolean execute(String word, int wordCount) {
179                                        final TimeperiodTweetCountWordCount timeMap = new TimeperiodTweetCountWordCount(key.get(), wordCount,
180                                                        periodCountWordCount.getNTweets());
181                                        final ByteArrayOutputStream os = new ByteArrayOutputStream();
182                                        try {
183                                                IOUtils.writeBinary(os, timeMap);
184                                                final BytesWritable writeable = new BytesWritable(os.toByteArray());
185                                                context.write(new Text(word), writeable);
186                                        } catch (final IOException e) {
187                                                return false;
188                                        } catch (final InterruptedException e) {
189                                                return false;
190                                        }
191
192                                        return true;
193                                }
194                        });
195                        if (!written) {
196                                throw new IOException("Couldn't write the TimeperiodTweetCountWordCount object");
197                        }
198                }
199        }
200
201        /**
202         * reduce input: <word: [ <timeperiod, tweet:#freq, word:#freq>,
203         * <timeperiod, tweet:#freq, word:#freq>,... ] reduce output: # <word:
204         * <timePeriod, DFIDF>,...>
205         * 
206         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
207         * 
208         */
209        public static class Reduce extends Reducer<Text, BytesWritable, Text, BytesWritable> {
210
211                /**
212                 * default construct does nothing
213                 */
214                public Reduce() {
215
216                }
217
218                @Override
219                protected void reduce(Text word, Iterable<BytesWritable> values,
220                                Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
221                                InterruptedException
222                {
223                        // read all timeperiods to objects, find the END_TIME instance, hold
224                        // the rest
225                        /*
226                         * # read total tweet frequency from timeperiod -1 Ttf # read total
227                         * word tweet frequency from timeperiod -1 Twf # read time period
228                         * tweet frequency from entry tf # read time period word frequency
229                         * from entry wf
230                         */
231                        TimeperiodTweetCountWordCount endTime = null;
232                        final List<TimeperiodTweetCountWordCount> otherTimes = new ArrayList<TimeperiodTweetCountWordCount>();
233                        for (final BytesWritable inputArr : values) {
234                                final ByteArrayInputStream stream = new ByteArrayInputStream(inputArr.getBytes());
235                                final TimeperiodTweetCountWordCount instance = IOUtils.read(stream, TimeperiodTweetCountWordCount.class);
236                                if (instance.timeperiod == END_TIME.get())
237                                        endTime = instance;
238                                else
239                                        otherTimes.add(instance);
240                        }
241                        /*
242                         * # for entry in input: # DF = wf/tf # IDF = Ttf/Twf
243                         */
244                        // Total number of tweets in all timeperiods
245                        final long Ttf = endTime.tweetcount;
246                        // Number of tweets containing this word in all timeperiods
247                        final long Twf = endTime.wordcount;
248                        final TreeSet<WordDFIDF> allDFIDF = new TreeSet<WordDFIDF>();
249                        for (final TimeperiodTweetCountWordCount tcwc : otherTimes) {
250                                // Number of tweets in this timeperiod
251                                final long tf = tcwc.tweetcount;
252                                // Number of tweets containing this word in this time period
253                                final long wf = tcwc.wordcount;
254
255                                final WordDFIDF dfidf = new WordDFIDF(tcwc.timeperiod, wf, tf, Twf, Ttf);
256                                allDFIDF.add(dfidf);
257                        }
258                        final List<WordDFIDF> listVersion = new ArrayList<WordDFIDF>();
259                        listVersion.addAll(allDFIDF);
260                        final WriteableListBinary<WordDFIDF> writeableCollection = new WriteableListBinary<WordDFIDF>(listVersion) {
261                                @Override
262                                protected void writeValue(WordDFIDF v, DataOutput out) throws IOException {
263                                        v.writeBinary(out);
264                                }
265
266                        };
267                        context.write(word, new BytesWritable(IOUtils.serialize(writeableCollection)));
268                }
269        }
270
271        /**
272         * reduce input: <word: [ <timeperiod, tweet:#freq, word:#freq>,
273         * <timeperiod, tweet:#freq, word:#freq>,... ] but unlike {@link Reduce}
274         * expects that each timeperiod may appear multiple times (i.e. each
275         * timeperiod was not combined!) reduce output: # <word: <timePeriod,
276         * DFIDF>,...>
277         * 
278         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
279         */
280        public static class NonCombinedTimesReducer extends
281                        Reducer<Text, BytesWritable, Text, BytesWritable>
282        {
283
284                private HadoopTwitterTokenToolOptions options;
285                private WritableEnumCounter<TextEntryType> tgs;
286                private TimeFrequencyHolder timeIndex;
287
288                /**
289                 * default construct does nothing
290                 */
291                public NonCombinedTimesReducer() {
292
293                }
294
295                protected synchronized void loadOptions(Reducer<Text, BytesWritable, Text, BytesWritable>.Context context)
296                                throws IOException
297                {
298                        try {
299                                options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(ARGS_KEY));
300                                options.prepare();
301                                final Path outpath = HadoopToolsUtil.getOutputPath(options);
302
303                                timeIndex = CountTweetsInTimeperiod.readTimeIndex(CountTweetsInTimeperiod.constructIndexPath(outpath));
304                                final Path timecountOut = new Path(outpath, CountTweetsInTimeperiod.TIMECOUNT_DIR);
305                                final Path statsout = new Path(timecountOut, CountTweetsInTimeperiod.GLOBAL_STATS_FILE);
306                                final FileSystem fs = HadoopToolsUtil.getFileSystem(statsout);
307                                final WritableEnumCounter<TextEntryType> et = new WritableEnumCounter<TextEntryType>() {
308                                        @Override
309                                        public TextEntryType valueOf(String str) {
310                                                return TextEntryType.valueOf(str);
311                                        }
312
313                                };
314                                tgs = IOUtils.read(fs.open(statsout), et);
315                        } catch (final CmdLineException e) {
316                                e.printStackTrace();
317                                throw new IOException(e);
318                        } catch (final Exception e) {
319                                e.printStackTrace();
320                                throw new IOException(e);
321                        }
322                }
323
324                @Override
325                protected void setup(Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
326                                InterruptedException
327                {
328                        loadOptions(context);
329                }
330
331                @Override
332                protected void reduce(Text word, Iterable<BytesWritable> values,
333                                Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
334                                InterruptedException
335                {
336                        // read all timeperiods to objects, find the END_TIME instance, hold
337                        // the rest
338                        /*
339                         * read time period tweet frequency from entry tf read time period
340                         * word frequency from entry wf construct a single count for all
341                         * time periods of the above
342                         * 
343                         * Go through compiled timeperiod counts in order, keep a count of
344                         * total tweets up to a time period and total words up to a time
345                         * period use the current total to construct a WordDFIDF object per
346                         * time period
347                         */
348                        TimeperiodTweetCountWordCount endTime = null;
349                        final TreeSet<Long> times = new TreeSet<Long>();
350                        final HashMap<Long, TimeperiodTweetCountWordCount> otherTimes = new HashMap<Long, TimeperiodTweetCountWordCount>();
351                        System.out.println("STARTING WORD: " + word);
352                        for (final BytesWritable inputArr : values) {
353                                final ByteArrayInputStream stream = new ByteArrayInputStream(inputArr.getBytes());
354                                final TimeperiodTweetCountWordCount instance = IOUtils.read(stream, TimeperiodTweetCountWordCount.class);
355                                System.out.println("... FOUND TIME INSTANCE:" + instance.timeperiod);
356                                if (instance.timeperiod == END_TIME.get())
357                                {
358                                        if (endTime == null)
359                                        {
360                                                // System.out.println("... end time CREATED");
361                                                endTime = instance;
362                                                endTime.tweetcount = tgs.getValue(TextEntryType.VALID);
363                                        }
364                                        else
365                                        {
366                                                // System.out.println("... end time INCREMENTED");
367                                                endTime.wordcount += instance.wordcount;
368                                        }
369                                        // Skip, not important!
370
371                                }
372                                else
373                                {
374                                        times.add(instance.timeperiod);
375                                        final TimeperiodTweetCountWordCount currentTimeCounter = otherTimes.get(instance.timeperiod);
376                                        System.out.println("Instance tweet count: " + instance.tweetcount);
377                                        System.out.println("Instance word count: " + instance.wordcount);
378                                        if (currentTimeCounter == null) {
379                                                System.out.println("... time CREATED");
380                                                otherTimes.put(instance.timeperiod, instance);
381                                        }
382                                        else {
383                                                System.out.println("... incremented time CREATED");
384                                                currentTimeCounter.tweetcount += instance.tweetcount;
385                                                currentTimeCounter.wordcount += instance.wordcount;
386                                        }
387                                }
388                        }
389                        // System.out.println("... TOTAL tweets = " + endTime.tweetcount);
390                        // System.out.println("... TOTAL tweets with THIS word = " +
391                        // endTime.wordcount);
392                        /*
393                         * # for entry in input: # DF = wf/tf # IDF = Ttf/Twf
394                         */
395                        // Total number of tweets seen so far
396                        // Number of tweets containing this word seen so far
397                        long Twf = 0;
398                        final TreeSet<WordDFIDF> allDFIDF = new TreeSet<WordDFIDF>();
399                        for (final Long time : times) {
400                                final TimeperiodTweetCountWordCount tcwc = otherTimes.get(time);
401                                final TimeFrequency indexEntry = timeIndex.get(time);
402
403                                final long Ttf = indexEntry.cumulativeFrequency;
404                                final long tf = indexEntry.periodFrequency;
405                                // Number of tweets containing this word in this time period
406                                final long wf = tcwc.wordcount;
407                                Twf += wf;
408                                final WordDFIDF dfidf = new WordDFIDF(tcwc.timeperiod, wf, tf, Twf, Ttf);
409                                allDFIDF.add(dfidf);
410                        }
411                        final List<WordDFIDF> listVersion = new ArrayList<WordDFIDF>();
412                        listVersion.addAll(allDFIDF);
413                        final WriteableListBinary<WordDFIDF> writeableCollection = new WriteableListBinary<WordDFIDF>(listVersion) {
414                                @Override
415                                protected void writeValue(WordDFIDF v, DataOutput out) throws IOException {
416                                        v.writeBinary(out);
417                                }
418
419                        };
420                        context.write(word, new BytesWritable(IOUtils.serialize(writeableCollection)));
421                }
422        }
423
424        @Override
425        public SimpleSequenceFileStage<LongWritable, BytesWritable, Text, BytesWritable> stage() {
426                return new SimpleSequenceFileStage<LongWritable, BytesWritable, Text, BytesWritable>() {
427                        @Override
428                        public void setup(Job job) {
429                                job.getConfiguration().setStrings(CountWordsAcrossTimeperiod.ARGS_KEY, nonHadoopArgs);
430
431                                // If times are not combined, each reducer has to do a bit more
432                                // work than usual, t
433                                if (!CountWordsAcrossTimeperiod.this.combinedTimes)
434                                        job.setNumReduceTasks(26);
435                        }
436
437                        @Override
438                        public Class<? extends Mapper<LongWritable, BytesWritable, Text, BytesWritable>> mapper() {
439                                return CountWordsAcrossTimeperiod.Map.class;
440                        }
441
442                        @Override
443                        public Class<? extends Reducer<Text, BytesWritable, Text, BytesWritable>> reducer() {
444                                if (CountWordsAcrossTimeperiod.this.combinedTimes)
445                                        return CountWordsAcrossTimeperiod.Reduce.class;
446                                else
447                                        return CountWordsAcrossTimeperiod.NonCombinedTimesReducer.class;
448
449                        }
450
451                        @Override
452                        public String outname() {
453                                return WORDCOUNT_DIR;
454                        }
455                };
456        }
457
458}