001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.mode.dfidf; 031 032import gnu.trove.map.hash.TObjectIntHashMap; 033import gnu.trove.procedure.TLongObjectProcedure; 034 035import java.io.ByteArrayInputStream; 036import java.io.ByteArrayOutputStream; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.HashMap; 040import java.util.List; 041import java.util.Map.Entry; 042 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataInputStream; 045import org.apache.hadoop.fs.FSDataOutputStream; 046import org.apache.hadoop.fs.FileStatus; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.io.BytesWritable; 050import org.apache.hadoop.io.LongWritable; 051import org.apache.hadoop.io.Text; 052import org.apache.hadoop.mapreduce.Counters; 053import org.apache.hadoop.mapreduce.Job; 054import org.apache.hadoop.mapreduce.Mapper; 055import org.apache.hadoop.mapreduce.Reducer; 056import org.joda.time.DateTime; 057import org.kohsuke.args4j.CmdLineException; 058import org.openimaj.hadoop.mapreduce.stage.IdentityReducer; 059import org.openimaj.hadoop.mapreduce.stage.StageProvider; 060import org.openimaj.hadoop.mapreduce.stage.helper.TextLongByteStage; 061import org.openimaj.hadoop.tools.HadoopToolsUtil; 062import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions; 063import org.openimaj.hadoop.tools.twitter.JsonPathFilterSet; 064import org.openimaj.hadoop.tools.twitter.token.mode.TextEntryType; 065import org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter; 066import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.TimeFrequencyHolder.TimeFrequency; 067import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap; 068import org.openimaj.io.IOUtils; 069import org.openimaj.twitter.USMFStatus; 070 071import com.jayway.jsonpath.JsonPath; 072 073/** 074 * A mapper/reducer whose purpose is to do the following: 075 * function(timePeriodLength) So a word in a tweet can happen in the time period 076 * between t - 1 and t. First task: map input: tweetstatus # json twitter status 077 * with JSONPath to words map output: <timePeriod: <word:#freq,tweets:#freq>, 078 * -1:<word:#freq,tweets:#freq> > reduce input: <timePeriod: 079 * [<word:#freq,tweets:#freq>,...,<word:#freq,tweets:#freq>]> reduce output: 080 * <timePeriod: <<tweet:#freq>,<word:#freq>,<word:#freq>,...> 081 * 082 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 083 * 084 */ 085public class CountTweetsInTimeperiod extends StageProvider { 086 087 private String[] nonHadoopArgs; 088 private boolean inmemoryCombine; 089 private boolean buildTimeIndex = true; 090 private long timedelta; 091 /** 092 * option for the timecount dir location 093 */ 094 public final static String TIMECOUNT_DIR = "timeperiodTweet"; 095 096 /** 097 * A time index holding tweet totals and cumulative totals for each time 098 * period 099 */ 100 public final static String TIMEINDEX_FILE = "timeindex"; 101 102 /** 103 * where to find the global stats file 104 */ 105 public final static String GLOBAL_STATS_FILE = "globalstats"; 106 private static final String TIMEDELTA = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timedelta"; 107 /** 108 * A time index holding tweet totals and cumulative totals for each time 109 * period 110 */ 111 public final static String TIMEINDEX_LOCATION_PROP = "org.openimaj.hadoop.tools.twitter.token.mode.dfidf.timeindex"; 112 113 /** 114 * @param output 115 * the output location 116 * @param nonHadoopArgs 117 * to be sent to the stage 118 * @param timedelta 119 * the time delta between which to quantise time periods 120 */ 121 public CountTweetsInTimeperiod(String[] nonHadoopArgs, long timedelta) { 122 this.nonHadoopArgs = nonHadoopArgs; 123 this.inmemoryCombine = false; 124 this.timedelta = timedelta; 125 } 126 127 /** 128 * @param output 129 * the output location 130 * @param nonHadoopArgs 131 * to be sent to the stage 132 * @param inMemoryCombine 133 * whether an in memory combination of word counts should be 134 * performed 135 * @param timedelta 136 * the time delta between which to quantise time periods 137 */ 138 public CountTweetsInTimeperiod(String[] nonHadoopArgs, boolean inMemoryCombine, 139 long timedelta) 140 { 141 this.nonHadoopArgs = nonHadoopArgs; 142 this.inmemoryCombine = inMemoryCombine; 143 this.timedelta = timedelta; 144 } 145 146 /** 147 * 148 * map input: tweetstatus # json twitter status with JSONPath to words map 149 * output: <timePeriod: <word:#freq,tweets:#freq>, 150 * -1:<word:#freq,tweets:#freq> > 151 * 152 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk), Sina Samangooei 153 * <ss@ecs.soton.ac.uk> 154 * 155 */ 156 public static class Map extends Mapper<LongWritable, Text, LongWritable, BytesWritable> { 157 158 /** 159 * Mapper don't care, mapper don't give a fuck 160 */ 161 public Map() { 162 163 } 164 165 /** 166 * The time used to signify the end, used to count total numbers of 167 * times a given word appears 168 */ 169 public static final LongWritable END_TIME = new LongWritable(-1); 170 /** 171 * A total of the number of tweets, must be ignored! 172 */ 173 public static final LongWritable TOTAL_TIME = new LongWritable(-2); 174 private HadoopTwitterTokenToolOptions options; 175 private long timeDeltaMillis; 176 private JsonPath jsonPath; 177 private JsonPathFilterSet filters; 178 179 protected synchronized void loadOptions(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) 180 throws IOException 181 { 182 if (options == null) { 183 try { 184 options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings( 185 HadoopTwitterTokenToolOptions.ARGS_KEY)); 186 options.prepare(); 187 filters = options.getFilters(); 188 timeDeltaMillis = context.getConfiguration().getLong(CountTweetsInTimeperiod.TIMEDELTA, 60) * 60 * 1000; 189 jsonPath = JsonPath.compile(options.getJsonPath()); 190 191 } catch (final CmdLineException e) { 192 throw new IOException(e); 193 } catch (final Exception e) { 194 throw new IOException(e); 195 } 196 } 197 } 198 199 private HashMap<Long, TweetCountWordMap> tweetWordMap; 200 201 @Override 202 protected void setup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws IOException, 203 InterruptedException 204 { 205 loadOptions(context); 206 this.tweetWordMap = new HashMap<Long, TweetCountWordMap>(); 207 } 208 209 @Override 210 protected void map(LongWritable key, Text value, 211 Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) throws java.io.IOException, 212 InterruptedException 213 { 214 List<String> tokens = null; 215 USMFStatus status = null; 216 DateTime time = null; 217 try { 218 final String svalue = value.toString(); 219 status = new USMFStatus(options.getStatusType().type()); 220 status.fillFromString(svalue); 221 if (status.isInvalid()) 222 return; 223 if (!filters.filter(svalue)) 224 return; 225 tokens = jsonPath.read(svalue); 226 if (tokens == null) { 227 context.getCounter(TextEntryType.INVALID_JSON).increment(1); 228 // System.err.println("Couldn't read the tokens from the tweet"); 229 return; 230 } 231 if (tokens.size() == 0) { 232 context.getCounter(TextEntryType.INVALID_ZEROLENGTH).increment(1); 233 return; // Quietly quit, value exists but was empty 234 } 235 time = status.createdAt(); 236 if (time == null) { 237 context.getCounter(TextEntryType.INVALID_TIME).increment(1); 238 // System.err.println("Time was null, this usually means the original tweet had no time. Skip this tweet."); 239 return; 240 } 241 242 } catch (final Exception e) { 243 // System.out.println("Couldn't get tokens from:\n" + value + 244 // "\nwith jsonpath:\n" + jsonPath); 245 return; 246 } 247 // Quantise the time to a specific index 248 final long timeIndex = (time.getMillis() / timeDeltaMillis) * timeDeltaMillis; 249 TweetCountWordMap timeWordMap = this.tweetWordMap.get(timeIndex); 250 // System.out.println("Tweet time: " + time.getMillis()); 251 // System.out.println("Tweet timeindex: " + timeIndex); 252 if (timeWordMap == null) { 253 this.tweetWordMap.put(timeIndex, timeWordMap = new TweetCountWordMap()); 254 } 255 final TObjectIntHashMap<String> tpMap = timeWordMap.getTweetWordMap(); 256 timeWordMap.incrementTweetCount(1); 257 final List<String> seen = new ArrayList<String>(); 258 for (final String token : tokens) { 259 // Apply stop words? 260 // Apply junk words? 261 // Already seen it? 262 263 if (seen.contains(token)) 264 continue; 265 seen.add(token); 266 tpMap.adjustOrPutValue(token, 1, 1); 267 // if(token.equals("...")){ 268 // System.out.println("TOKEN: " + token); 269 // System.out.println("TIME: " + timeIndex); 270 // System.out.println("NEW VALUE: " + newv); 271 // } 272 } 273 context.getCounter(TextEntryType.VALID).increment(1); 274 } 275 276 @Override 277 protected void cleanup(Mapper<LongWritable, Text, LongWritable, BytesWritable>.Context context) 278 throws IOException, InterruptedException 279 { 280 System.out.println("Cleaing up mapper, seen " + this.tweetWordMap.entrySet().size() + " time slots"); 281 for (final Entry<Long, TweetCountWordMap> tpMapEntry : this.tweetWordMap.entrySet()) { 282 final Long time = tpMapEntry.getKey(); 283 final TweetCountWordMap map = tpMapEntry.getValue(); 284 System.out.println("... time( " + time + ") seen " + map.getTweetWordMap().size() + " words"); 285 final ByteArrayOutputStream outarr = new ByteArrayOutputStream(); 286 IOUtils.writeBinary(outarr, map); 287 final byte[] arr = outarr.toByteArray(); 288 final BytesWritable toWrite = new BytesWritable(arr); 289 context.write(END_TIME, toWrite); 290 context.write(new LongWritable(time), toWrite); 291 context.getCounter(TextEntryType.ACUAL_EMITS).increment(1); 292 } 293 } 294 } 295 296 /** 297 * Indetical to the {@link IdentityReducer} but constructs a time index 298 * found in {@link #TIMEINDEX_FILE} 299 * 300 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 301 * 302 */ 303 public static class TimeIndexReducer extends 304 Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> 305 { 306 private TimeFrequencyHolder timeMap; 307 308 /** 309 * 310 */ 311 public TimeIndexReducer() { 312 timeMap = new TimeFrequencyHolder(); 313 } 314 315 @Override 316 protected void reduce(LongWritable time, Iterable<BytesWritable> values, Context context) throws IOException, 317 InterruptedException 318 { 319 if (time.get() == Map.END_TIME.get()) { 320 // End time can be ignored entirley in terms of the time index, 321 // but still pass them on! 322 for (final BytesWritable tweetwordmapbytes : values) { 323 context.write(time, tweetwordmapbytes); 324 } 325 } 326 else { 327 final TweetCountWordMap accum = new TweetCountWordMap(); 328 for (final BytesWritable tweetwordmapbytes : values) { 329 TweetCountWordMap tweetwordmap = null; 330 tweetwordmap = IOUtils.read(new ByteArrayInputStream(tweetwordmapbytes.getBytes()), 331 TweetCountWordMap.class); 332 accum.combine(tweetwordmap); 333 context.write(time, tweetwordmapbytes); 334 } 335 final TimeFrequency tf = new TimeFrequency(time.get(), accum.getNTweets()); 336 timeMap.put(tf.time, tf); 337 } 338 339 } 340 341 @Override 342 protected void cleanup(Context context) throws IOException, InterruptedException { 343 final String output = context.getConfiguration().getStrings(TIMEINDEX_LOCATION_PROP)[0]; 344 final Path indexOut = new Path(output + "/" + context.getTaskAttemptID()); 345 System.out.println("Writing time index to: " + indexOut); 346 System.out.println("Timemap contains: " + this.timeMap.size()); 347 CountTweetsInTimeperiod.writeTimeIndex(this.timeMap, indexOut); 348 } 349 } 350 351 /** 352 * reduce input: <timePeriod: 353 * [<word:#freq,tweets:#freq>,...,<word:#freq,tweets:#freq>]> reduce output: 354 * <timePeriod: <<tweet:#freq>,<word:#freq>,<word:#freq>,...> 355 * 356 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 357 * 358 */ 359 public static class InMemoryCombiningReducer extends 360 Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> 361 { 362 363 /** 364 * default construct does nothing 365 */ 366 public InMemoryCombiningReducer() { 367 368 } 369 370 @Override 371 protected void reduce(LongWritable key, Iterable<BytesWritable> values, 372 Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>.Context context) throws IOException, 373 InterruptedException 374 { 375 final TweetCountWordMap accum = new TweetCountWordMap(); 376 for (final BytesWritable tweetwordmapbytes : values) { 377 TweetCountWordMap tweetwordmap = null; 378 tweetwordmap = IOUtils.read(new ByteArrayInputStream(tweetwordmapbytes.getBytes()), 379 TweetCountWordMap.class); 380 accum.combine(tweetwordmap); 381 } 382 final ByteArrayOutputStream outstream = new ByteArrayOutputStream(); 383 IOUtils.writeBinary(outstream, accum); 384 context.write(key, new BytesWritable(outstream.toByteArray())); 385 } 386 } 387 388 @Override 389 public TextLongByteStage stage() { 390 final TextLongByteStage s = new TextLongByteStage() { 391 private Path actualOutputLocation; 392 393 @Override 394 public void setup(Job job) { 395 job.getConfiguration().setStrings(HadoopTwitterTokenToolOptions.ARGS_KEY, nonHadoopArgs); 396 job.getConfiguration().setLong(TIMEDELTA, timedelta); 397 job.getConfiguration().setStrings(TIMEINDEX_LOCATION_PROP, 398 new Path(actualOutputLocation, TIMEINDEX_FILE).toString()); 399 if (!inmemoryCombine) { 400 if (!buildTimeIndex) { 401 job.setNumReduceTasks(0); 402 } 403 else { 404 job.setNumReduceTasks(10); 405 } 406 } 407 } 408 409 @Override 410 public Class<? extends Mapper<LongWritable, Text, LongWritable, BytesWritable>> mapper() { 411 return CountTweetsInTimeperiod.Map.class; 412 } 413 414 @Override 415 public Class<? extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable>> reducer() { 416 if (inmemoryCombine) 417 return CountTweetsInTimeperiod.InMemoryCombiningReducer.class; 418 else if (buildTimeIndex) 419 return CountTweetsInTimeperiod.TimeIndexReducer.class; 420 else 421 return super.reducer(); 422 } 423 424 @Override 425 public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception { 426 this.actualOutputLocation = output; 427 return super.stage(inputs, output, conf); 428 } 429 430 @Override 431 public String outname() { 432 return TIMECOUNT_DIR; 433 } 434 435 @Override 436 public void finished(Job job) { 437 Counters counters; 438 try { 439 counters = job.getCounters(); 440 } catch (final IOException e) { 441 // System.out.println("Counters not found!"); 442 return; 443 } 444 // Prepare a writer to the actual output location 445 final Path out = new Path(actualOutputLocation, GLOBAL_STATS_FILE); 446 447 FileSystem fs; 448 try { 449 fs = HadoopToolsUtil.getFileSystem(out); 450 final FSDataOutputStream os = fs.create(out); 451 IOUtils.writeASCII(os, new WritableEnumCounter<TextEntryType>(counters, TextEntryType.values()) { 452 @Override 453 public TextEntryType valueOf(String str) { 454 return TextEntryType.valueOf(str); 455 } 456 457 }); 458 } catch (final IOException e) { 459 } 460 461 } 462 }; 463 return s; 464 } 465 466 /** 467 * Write a timeindex to a {@link Path} 468 * 469 * @param timeMap 470 * @param indexOut 471 * @throws IOException 472 */ 473 public static void writeTimeIndex(TimeFrequencyHolder timeMap, Path indexOut) throws IOException { 474 FSDataOutputStream os = null; 475 try { 476 477 final FileSystem fs = HadoopToolsUtil.getFileSystem(indexOut); 478 os = fs.create(indexOut, true); 479 IOUtils.writeBinary(os, timeMap); 480 os.flush(); 481 } finally { 482 os.close(); 483 } 484 } 485 486 /** 487 * Read a {@link TimeFrequencyHolder} from a {@link Path}. Path is assumed 488 * to be a directory containing many {@link TimeFrequencyHolder} instances. 489 * 490 * @param indexOut 491 * @return a new {@link TimeFrequencyHolder} 492 * @throws IOException 493 */ 494 public static TimeFrequencyHolder readTimeIndex(Path indexOut) throws IOException { 495 if (!HadoopToolsUtil.fileExists(indexOut.toString())) { 496 return null; 497 } 498 System.out.println("Reading time index from: " + indexOut); 499 final TimeFrequencyHolder tfh = new TimeFrequencyHolder(); 500 501 final FileSystem fs = HadoopToolsUtil.getFileSystem(indexOut); 502 final FileStatus[] indexParts = fs.listStatus(indexOut); 503 for (final FileStatus fileStatus : indexParts) { 504 System.out.println("Reading index part: " + fileStatus.getPath()); 505 FSDataInputStream in = null; 506 try { 507 in = fs.open(fileStatus.getPath()); 508 final TimeFrequencyHolder tempTfh = IOUtils.read(in, TimeFrequencyHolder.class); 509 tempTfh.forEachEntry(new TLongObjectProcedure<TimeFrequency>() { 510 @Override 511 public boolean execute(long a, TimeFrequency b) { 512 tfh.put(a, b); // This is safe because each time 513 // frequency should contain completely 514 // unique times! 515 return true; 516 } 517 }); 518 } finally { 519 in.close(); 520 } 521 } 522 tfh.recalculateCumulativeFrequencies(); 523 return tfh; 524 525 } 526 527 /** 528 * @param outpath 529 * @return the index location if it exists 530 */ 531 public static Path constructIndexPath(Path outpath) { 532 final Path retPath = new Path(new Path(outpath, CountTweetsInTimeperiod.TIMECOUNT_DIR), TIMEINDEX_FILE); 533 return retPath; 534 } 535}