001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.mode.dfidf;
031
032import gnu.trove.map.hash.TObjectIntHashMap;
033import gnu.trove.procedure.TLongObjectProcedure;
034
035import java.io.ByteArrayInputStream;
036import java.io.ByteArrayOutputStream;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.HashMap;
040import java.util.List;
041import java.util.Map.Entry;
042
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataInputStream;
045import org.apache.hadoop.fs.FSDataOutputStream;
046import org.apache.hadoop.fs.FileStatus;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.io.BytesWritable;
050import org.apache.hadoop.io.LongWritable;
051import org.apache.hadoop.io.Text;
052import org.apache.hadoop.mapreduce.Counters;
053import org.apache.hadoop.mapreduce.Job;
054import org.apache.hadoop.mapreduce.Mapper;
055import org.apache.hadoop.mapreduce.Reducer;
056import org.joda.time.DateTime;
057import org.kohsuke.args4j.CmdLineException;
058import org.openimaj.hadoop.mapreduce.stage.IdentityReducer;
059import org.openimaj.hadoop.mapreduce.stage.StageProvider;
060import org.openimaj.hadoop.mapreduce.stage.helper.TextLongByteStage;
061import org.openimaj.hadoop.tools.HadoopToolsUtil;
062import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions;
063import org.openimaj.hadoop.tools.twitter.JsonPathFilterSet;
064import org.openimaj.hadoop.tools.twitter.token.mode.TextEntryType;
065import org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter;
066import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.TimeFrequencyHolder.TimeFrequency;
067import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap;
068import org.openimaj.io.IOUtils;
069import org.openimaj.twitter.USMFStatus;
070
071import com.jayway.jsonpath.JsonPath;
072
073/**
074 * A mapper/reducer whose purpose is to do the following:
075 * function(timePeriodLength) So a word in a tweet can happen in the time period
076 * between t - 1 and t. First task: map input: tweetstatus # json twitter status
077 * with JSONPath to words map output: <timePeriod: <word:#freq,tweets:#freq>,
078 * -1:<word:#freq,tweets:#freq> > reduce input: <timePeriod:
079 * [<word:#freq,tweets:#freq>,...,<word:#freq,tweets:#freq>]> reduce output:
080 * <timePeriod: <<tweet:#freq>,<word:#freq>,<word:#freq>,...>
081 * 
082 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
083 * 
084 */
085public class CountTweetsInTimeperiod extends StageProvider {
086
087        private String[] nonHadoopArgs;
088        private boolean inmemoryCombine;
089        private boolean buildTimeIndex = true;
090        private long timedelta;
091        /**
092         * option for the timecount dir location
093         */
094        public final static String TIMECOUNT_DIR = "timeperiodTweet";
095
096        /**
097         * A time index holding tweet totals and cumulative totals for each time
098         * period
099         */
100        public final static String TIMEINDEX_FILE = "timeindex";
101
102        /**
103         * where to find the global stats file
104         */
105        public final static String GLOBAL_STATS_FILE = "globalstats";
106        private static final String TIMEDELTA = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timedelta";
107        /**
108         * A time index holding tweet totals and cumulative totals for each time
109         * period
110         */
111        public final static String TIMEINDEX_LOCATION_PROP = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timeindex";
112
113        /**
114         * @param output
115         *            the output location
116         * @param nonHadoopArgs
117         *            to be sent to the stage
118         * @param timedelta
119         *            the time delta between which to quantise time periods
120         */
121        public CountTweetsInTimeperiod(String[] nonHadoopArgs, long timedelta) {
122                this.nonHadoopArgs = nonHadoopArgs;
123                this.inmemoryCombine = false;
124                this.timedelta = timedelta;
125        }
126
127        /**
128         * @param output
129         *            the output location
130         * @param nonHadoopArgs
131         *            to be sent to the stage
132         * @param inMemoryCombine
133         *            whether an in memory combination of word counts should be
134         *            performed
135         * @param timedelta
136         *            the time delta between which to quantise time periods
137         */
138        public CountTweetsInTimeperiod(String[] nonHadoopArgs, boolean inMemoryCombine,
139                        long timedelta)
140        {
141                this.nonHadoopArgs = nonHadoopArgs;
142                this.inmemoryCombine = inMemoryCombine;
143                this.timedelta = timedelta;
144        }
145
146        /**
147         * 
148         * map input: tweetstatus # json twitter status with JSONPath to words map
149         * output: <timePeriod: <word:#freq,tweets:#freq>,
150         * -1:<word:#freq,tweets:#freq> >
151         * 
152         * @author Jonathon Hare (jsh2@ecs.soton.ac.uk), Sina Samangooei
153         *         <ss@ecs.soton.ac.uk>
154         * 
155         */
156        public static class Map extends Mapper<LongWritable, Text, LongWritable, BytesWritable> {
157
158                /**
159                 * Mapper don't care, mapper don't give a fuck
160                 */
161                public Map() {
162
163                }
164
165                /**
166                 * The time used to signify the end, used to count total numbers of
167                 * times a given word appears
168                 */
169                public static final LongWritable END_TIME = new LongWritable(-1);
170                /**
171                 * A total of the number of tweets, must be ignored!
172                 */
173                public static final LongWritable TOTAL_TIME = new LongWritable(-2);
174                private HadoopTwitterTokenToolOptions options;
175                private long timeDeltaMillis;
176                private JsonPath jsonPath;
177                private JsonPathFilterSet filters;
178
179                protected synchronized void loadOptions(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context)
180                                throws IOException
181                {
182                        if (options == null) {
183                                try {
184                                        options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(
185                                                        HadoopTwitterTokenToolOptions.ARGS_KEY));
186                                        options.prepare();
187                                        filters = options.getFilters();
188                                        timeDeltaMillis = context.getConfiguration().getLong(CountTweetsInTimeperiod.TIMEDELTA, 60) * 60 * 1000;
189                                        jsonPath = JsonPath.compile(options.getJsonPath());
190
191                                } catch (final CmdLineException e) {
192                                        throw new IOException(e);
193                                } catch (final Exception e) {
194                                        throw new IOException(e);
195                                }
196                        }
197                }
198
199                private HashMap<Long, TweetCountWordMap> tweetWordMap;
200
201                @Override
202                protected void setup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws IOException,
203                                InterruptedException
204                {
205                        loadOptions(context);
206                        this.tweetWordMap = new HashMap<Long, TweetCountWordMap>();
207                }
208
209                @Override
210                protected void map(LongWritable key, Text value,
211                                Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws java.io.IOException,
212                                InterruptedException
213                {
214                        List<String> tokens = null;
215                        USMFStatus status = null;
216                        DateTime time = null;
217                        try {
218                                final String svalue = value.toString();
219                                status = new USMFStatus(options.getStatusType().type());
220                                status.fillFromString(svalue);
221                                if (status.isInvalid())
222                                        return;
223                                if (!filters.filter(svalue))
224                                        return;
225                                tokens = jsonPath.read(svalue);
226                                if (tokens == null) {
227                                        context.getCounter(TextEntryType.INVALID_JSON).increment(1);
228                                        // System.err.println("Couldn't read the tokens from the tweet");
229                                        return;
230                                }
231                                if (tokens.size() == 0) {
232                                        context.getCounter(TextEntryType.INVALID_ZEROLENGTH).increment(1);
233                                        return; // Quietly quit, value exists but was empty
234                                }
235                                time = status.createdAt();
236                                if (time == null) {
237                                        context.getCounter(TextEntryType.INVALID_TIME).increment(1);
238                                        // System.err.println("Time was null, this usually means the original tweet had no time. Skip this tweet.");
239                                        return;
240                                }
241
242                        } catch (final Exception e) {
243                                // System.out.println("Couldn't get tokens from:\n" + value +
244                                // "\nwith jsonpath:\n" + jsonPath);
245                                return;
246                        }
247                        // Quantise the time to a specific index
248                        final long timeIndex = (time.getMillis() / timeDeltaMillis) * timeDeltaMillis;
249                        TweetCountWordMap timeWordMap = this.tweetWordMap.get(timeIndex);
250                        // System.out.println("Tweet time: " + time.getMillis());
251                        // System.out.println("Tweet timeindex: " + timeIndex);
252                        if (timeWordMap == null) {
253                                this.tweetWordMap.put(timeIndex, timeWordMap = new TweetCountWordMap());
254                        }
255                        final TObjectIntHashMap<String> tpMap = timeWordMap.getTweetWordMap();
256                        timeWordMap.incrementTweetCount(1);
257                        final List<String> seen = new ArrayList<String>();
258                        for (final String token : tokens) {
259                                // Apply stop words?
260                                // Apply junk words?
261                                // Already seen it?
262
263                                if (seen.contains(token))
264                                        continue;
265                                seen.add(token);
266                                tpMap.adjustOrPutValue(token, 1, 1);
267                                // if(token.equals("...")){
268                                // System.out.println("TOKEN: " + token);
269                                // System.out.println("TIME: " + timeIndex);
270                                // System.out.println("NEW VALUE: " + newv);
271                                // }
272                        }
273                        context.getCounter(TextEntryType.VALID).increment(1);
274                }
275
276                @Override
277                protected void cleanup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context)
278                                throws IOException, InterruptedException
279                {
280                        System.out.println("Cleaing up mapper, seen " + this.tweetWordMap.entrySet().size() + " time slots");
281                        for (final Entry<Long, TweetCountWordMap> tpMapEntry : this.tweetWordMap.entrySet()) {
282                                final Long time = tpMapEntry.getKey();
283                                final TweetCountWordMap map = tpMapEntry.getValue();
284                                System.out.println("... time( " + time + ") seen " + map.getTweetWordMap().size() + " words");
285                                final ByteArrayOutputStream outarr = new ByteArrayOutputStream();
286                                IOUtils.writeBinary(outarr, map);
287                                final byte[] arr = outarr.toByteArray();
288                                final BytesWritable toWrite = new BytesWritable(arr);
289                                context.write(END_TIME, toWrite);
290                                context.write(new LongWritable(time), toWrite);
291                                context.getCounter(TextEntryType.ACUAL_EMITS).increment(1);
292                        }
293                }
294        }
295
296        /**
297         * Indetical to the {@link IdentityReducer} but constructs a time index
298         * found in {@link #TIMEINDEX_FILE}
299         * 
300         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
301         * 
302         */
303        public static class TimeIndexReducer extends
304                        Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>
305        {
306                private TimeFrequencyHolder timeMap;
307
308                /**
309                 *
310                 */
311                public TimeIndexReducer() {
312                        timeMap = new TimeFrequencyHolder();
313                }
314
315                @Override
316                protected void reduce(LongWritable time, Iterable<BytesWritable> values, Context context) throws IOException,
317                                InterruptedException
318                {
319                        if (time.get() == Map.END_TIME.get()) {
320                                // End time can be ignored entirley in terms of the time index,
321                                // but still pass them on!
322                                for (final BytesWritable tweetwordmapbytes : values) {
323                                        context.write(time, tweetwordmapbytes);
324                                }
325                        }
326                        else {
327                                final TweetCountWordMap accum = new TweetCountWordMap();
328                                for (final BytesWritable tweetwordmapbytes : values) {
329                                        TweetCountWordMap tweetwordmap = null;
330                                        tweetwordmap = IOUtils.read(new ByteArrayInputStream(tweetwordmapbytes.getBytes()),
331                                                        TweetCountWordMap.class);
332                                        accum.combine(tweetwordmap);
333                                        context.write(time, tweetwordmapbytes);
334                                }
335                                final TimeFrequency tf = new TimeFrequency(time.get(), accum.getNTweets());
336                                timeMap.put(tf.time, tf);
337                        }
338
339                }
340
341                @Override
342                protected void cleanup(Context context) throws IOException, InterruptedException {
343                        final String output = context.getConfiguration().getStrings(TIMEINDEX_LOCATION_PROP)[0];
344                        final Path indexOut = new Path(output + "/" + context.getTaskAttemptID());
345                        System.out.println("Writing time index to: " + indexOut);
346                        System.out.println("Timemap contains: " + this.timeMap.size());
347                        CountTweetsInTimeperiod.writeTimeIndex(this.timeMap, indexOut);
348                }
349        }
350
351        /**
352         * reduce input: <timePeriod:
353         * [<word:#freq,tweets:#freq>,...,<word:#freq,tweets:#freq>]> reduce output:
354         * <timePeriod: <<tweet:#freq>,<word:#freq>,<word:#freq>,...>
355         * 
356         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
357         * 
358         */
359        public static class InMemoryCombiningReducer extends
360                        Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>
361        {
362
363                /**
364                 * default construct does nothing
365                 */
366                public InMemoryCombiningReducer() {
367
368                }
369
370                @Override
371                protected void reduce(LongWritable key, Iterable<BytesWritable> values,
372                                Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context context) throws IOException,
373                                InterruptedException
374                {
375                        final TweetCountWordMap accum = new TweetCountWordMap();
376                        for (final BytesWritable tweetwordmapbytes : values) {
377                                TweetCountWordMap tweetwordmap = null;
378                                tweetwordmap = IOUtils.read(new ByteArrayInputStream(tweetwordmapbytes.getBytes()),
379                                                TweetCountWordMap.class);
380                                accum.combine(tweetwordmap);
381                        }
382                        final ByteArrayOutputStream outstream = new ByteArrayOutputStream();
383                        IOUtils.writeBinary(outstream, accum);
384                        context.write(key, new BytesWritable(outstream.toByteArray()));
385                }
386        }
387
388        @Override
389        public TextLongByteStage stage() {
390                final TextLongByteStage s = new TextLongByteStage() {
391                        private Path actualOutputLocation;
392
393                        @Override
394                        public void setup(Job job) {
395                                job.getConfiguration().setStrings(HadoopTwitterTokenToolOptions.ARGS_KEY, nonHadoopArgs);
396                                job.getConfiguration().setLong(TIMEDELTA, timedelta);
397                                job.getConfiguration().setStrings(TIMEINDEX_LOCATION_PROP,
398                                                new Path(actualOutputLocation, TIMEINDEX_FILE).toString());
399                                if (!inmemoryCombine) {
400                                        if (!buildTimeIndex) {
401                                                job.setNumReduceTasks(0);
402                                        }
403                                        else {
404                                                job.setNumReduceTasks(10);
405                                        }
406                                }
407                        }
408
409                        @Override
410                        public Class<? extends Mapper<LongWritable, Text, LongWritable, BytesWritable>> mapper() {
411                                return CountTweetsInTimeperiod.Map.class;
412                        }
413
414                        @Override
415                        public Class<? extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>> reducer() {
416                                if (inmemoryCombine)
417                                        return CountTweetsInTimeperiod.InMemoryCombiningReducer.class;
418                                else if (buildTimeIndex)
419                                        return CountTweetsInTimeperiod.TimeIndexReducer.class;
420                                else
421                                        return super.reducer();
422                        }
423
424                        @Override
425                        public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception {
426                                this.actualOutputLocation = output;
427                                return super.stage(inputs, output, conf);
428                        }
429
430                        @Override
431                        public String outname() {
432                                return TIMECOUNT_DIR;
433                        }
434
435                        @Override
436                        public void finished(Job job) {
437                                Counters counters;
438                                try {
439                                        counters = job.getCounters();
440                                } catch (final IOException e) {
441                                        // System.out.println("Counters not found!");
442                                        return;
443                                }
444                                // Prepare a writer to the actual output location
445                                final Path out = new Path(actualOutputLocation, GLOBAL_STATS_FILE);
446
447                                FileSystem fs;
448                                try {
449                                        fs = HadoopToolsUtil.getFileSystem(out);
450                                        final FSDataOutputStream os = fs.create(out);
451                                        IOUtils.writeASCII(os, new WritableEnumCounter<TextEntryType>(counters, TextEntryType.values()) {
452                                                @Override
453                                                public TextEntryType valueOf(String str) {
454                                                        return TextEntryType.valueOf(str);
455                                                }
456
457                                        });
458                                } catch (final IOException e) {
459                                }
460
461                        }
462                };
463                return s;
464        }
465
466        /**
467         * Write a timeindex to a {@link Path}
468         * 
469         * @param timeMap
470         * @param indexOut
471         * @throws IOException
472         */
473        public static void writeTimeIndex(TimeFrequencyHolder timeMap, Path indexOut) throws IOException {
474                FSDataOutputStream os = null;
475                try {
476
477                        final FileSystem fs = HadoopToolsUtil.getFileSystem(indexOut);
478                        os = fs.create(indexOut, true);
479                        IOUtils.writeBinary(os, timeMap);
480                        os.flush();
481                } finally {
482                        os.close();
483                }
484        }
485
486        /**
487         * Read a {@link TimeFrequencyHolder} from a {@link Path}. Path is assumed
488         * to be a directory containing many {@link TimeFrequencyHolder} instances.
489         * 
490         * @param indexOut
491         * @return a new {@link TimeFrequencyHolder}
492         * @throws IOException
493         */
494        public static TimeFrequencyHolder readTimeIndex(Path indexOut) throws IOException {
495                if (!HadoopToolsUtil.fileExists(indexOut.toString())) {
496                        return null;
497                }
498                System.out.println("Reading time index from: " + indexOut);
499                final TimeFrequencyHolder tfh = new TimeFrequencyHolder();
500
501                final FileSystem fs = HadoopToolsUtil.getFileSystem(indexOut);
502                final FileStatus[] indexParts = fs.listStatus(indexOut);
503                for (final FileStatus fileStatus : indexParts) {
504                        System.out.println("Reading index part: " + fileStatus.getPath());
505                        FSDataInputStream in = null;
506                        try {
507                                in = fs.open(fileStatus.getPath());
508                                final TimeFrequencyHolder tempTfh = IOUtils.read(in, TimeFrequencyHolder.class);
509                                tempTfh.forEachEntry(new TLongObjectProcedure<TimeFrequency>() {
510                                        @Override
511                                        public boolean execute(long a, TimeFrequency b) {
512                                                tfh.put(a, b); // This is safe because each time
513                                                                                // frequency should contain completely
514                                                                                // unique times!
515                                                return true;
516                                        }
517                                });
518                        } finally {
519                                in.close();
520                        }
521                }
522                tfh.recalculateCumulativeFrequencies();
523                return tfh;
524
525        }
526
527        /**
528         * @param outpath
529         * @return the index location if it exists
530         */
531        public static Path constructIndexPath(Path outpath) {
532                final Path retPath = new Path(new Path(outpath, CountTweetsInTimeperiod.TIMECOUNT_DIR), TIMEINDEX_FILE);
533                return retPath;
534        }
535}