001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.mode.dfidf; 031 032import gnu.trove.procedure.TObjectIntProcedure; 033 034import java.io.ByteArrayInputStream; 035import java.io.ByteArrayOutputStream; 036import java.io.DataOutput; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.HashMap; 040import java.util.List; 041import java.util.TreeSet; 042 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.io.BytesWritable; 046import org.apache.hadoop.io.LongWritable; 047import org.apache.hadoop.io.Text; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.mapreduce.Mapper; 050import org.apache.hadoop.mapreduce.Reducer; 051import org.kohsuke.args4j.CmdLineException; 052import org.openimaj.hadoop.mapreduce.stage.StageProvider; 053import org.openimaj.hadoop.mapreduce.stage.helper.SimpleSequenceFileStage; 054import org.openimaj.hadoop.tools.HadoopToolsUtil; 055import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions; 056import org.openimaj.hadoop.tools.twitter.token.mode.TextEntryType; 057import org.openimaj.hadoop.tools.twitter.token.mode.WritableEnumCounter; 058import org.openimaj.hadoop.tools.twitter.token.mode.dfidf.TimeFrequencyHolder.TimeFrequency; 059import org.openimaj.hadoop.tools.twitter.utils.TimeperiodTweetCountWordCount; 060import org.openimaj.hadoop.tools.twitter.utils.TweetCountWordMap; 061import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 062import org.openimaj.io.IOUtils; 063import org.openimaj.io.wrappers.WriteableListBinary; 064 065/** 066 * A Stage provider wrapping the functionality of 067 * {@link CountWordsAcrossTimeperiod.Map} and 068 * {@link CountWordsAcrossTimeperiod.Reduce}. 069 * 070 * The Map expects times as keys and maps of word counts as input. The mapper 071 * emits words as keys with pairs of the timeperiods and counts within that time 072 * period 073 * 074 * This along with the TimeIndex generated by the 075 * {@link CountTweetsInTimeperiod} is used to construct a DFIDF per word per 076 * time period. 077 * 078 * One interpretation of the DFIDF score is a weighting based on counts up to a 079 * particular time period. This functionality is encoded in the 080 * {@link NonCombinedTimesReducer} 081 * 082 * Another interpretation is that the DFIDF score weights words based on word 083 * occurences at the end of some period of time, i.e. the count at some END 084 * 085 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 086 * 087 */ 088public class CountWordsAcrossTimeperiod extends StageProvider { 089 private String[] nonHadoopArgs; 090 private boolean combinedTimes = false; 091 092 /** 093 * 094 * @param outpath 095 * the root of the output 096 * @param nonHadoopArgs 097 */ 098 public CountWordsAcrossTimeperiod(String[] nonHadoopArgs) { 099 this.nonHadoopArgs = nonHadoopArgs; 100 } 101 102 /** 103 * @param nonHadoopArgs 104 * @param combinedTimes 105 * whether the mapper expects times entries with values for each 106 * word. i.e. combined times 107 */ 108 public CountWordsAcrossTimeperiod(String[] nonHadoopArgs, boolean combinedTimes) { 109 this.nonHadoopArgs = nonHadoopArgs; 110 this.combinedTimes = combinedTimes; 111 } 112 113 /** 114 * arg key 115 */ 116 public static final String ARGS_KEY = "TOKEN_ARGS"; 117 private static final LongWritable END_TIME = new LongWritable(-1); 118 /** 119 * where the intermediat word counts should be stored 120 */ 121 public final static String WORDCOUNT_DIR = "wordtimeperiodDFIDF"; 122 123 /** 124 * function(timePeriodLength) map input: <timePeriod: 125 * <<tweet:#freq>,<word:#freq>,<word:#freq>,...> map output: [ word: 126 * <timeperiod, tweet:#freq, word:#freq>, word: <timeperiod, tweet:#freq, 127 * word:#freq>, ... ] 128 * 129 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 130 * 131 */ 132 public static class Map extends Mapper<LongWritable, BytesWritable, Text, BytesWritable> { 133 134 /** 135 * Mapper constructor doesn't do anything (Mapper constructor doesn't 136 * give a fuck) 137 */ 138 public Map() { 139 140 } 141 142 private static HadoopTwitterTokenToolOptions options; 143 144 protected static synchronized void loadOptions( 145 Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException 146 { 147 if (options == null) { 148 try { 149 options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(ARGS_KEY)); 150 options.prepare(); 151 } catch (final CmdLineException e) { 152 throw new IOException(e); 153 } catch (final Exception e) { 154 throw new IOException(e); 155 } 156 } 157 } 158 159 @Override 160 protected void setup(Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context) 161 throws IOException, InterruptedException 162 { 163 loadOptions(context); 164 } 165 166 @Override 167 protected void map(final LongWritable key, BytesWritable value, 168 final Mapper<LongWritable, BytesWritable, Text, BytesWritable>.Context context) 169 throws java.io.IOException, InterruptedException 170 { 171 172 final TweetCountWordMap periodCountWordCount = IOUtils.read(new ByteArrayInputStream(value.getBytes()), 173 TweetCountWordMap.class); 174 final boolean written = periodCountWordCount.getTweetWordMap().forEachEntry(new TObjectIntProcedure<String>() 175 { 176 177 @Override 178 public boolean execute(String word, int wordCount) { 179 final TimeperiodTweetCountWordCount timeMap = new TimeperiodTweetCountWordCount(key.get(), wordCount, 180 periodCountWordCount.getNTweets()); 181 final ByteArrayOutputStream os = new ByteArrayOutputStream(); 182 try { 183 IOUtils.writeBinary(os, timeMap); 184 final BytesWritable writeable = new BytesWritable(os.toByteArray()); 185 context.write(new Text(word), writeable); 186 } catch (final IOException e) { 187 return false; 188 } catch (final InterruptedException e) { 189 return false; 190 } 191 192 return true; 193 } 194 }); 195 if (!written) { 196 throw new IOException("Couldn't write the TimeperiodTweetCountWordCount object"); 197 } 198 } 199 } 200 201 /** 202 * reduce input: <word: [ <timeperiod, tweet:#freq, word:#freq>, 203 * <timeperiod, tweet:#freq, word:#freq>,... ] reduce output: # <word: 204 * <timePeriod, DFIDF>,...> 205 * 206 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 207 * 208 */ 209 public static class Reduce extends Reducer<Text, BytesWritable, Text, BytesWritable> { 210 211 /** 212 * default construct does nothing 213 */ 214 public Reduce() { 215 216 } 217 218 @Override 219 protected void reduce(Text word, Iterable<BytesWritable> values, 220 Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, 221 InterruptedException 222 { 223 // read all timeperiods to objects, find the END_TIME instance, hold 224 // the rest 225 /* 226 * # read total tweet frequency from timeperiod -1 Ttf # read total 227 * word tweet frequency from timeperiod -1 Twf # read time period 228 * tweet frequency from entry tf # read time period word frequency 229 * from entry wf 230 */ 231 TimeperiodTweetCountWordCount endTime = null; 232 final List<TimeperiodTweetCountWordCount> otherTimes = new ArrayList<TimeperiodTweetCountWordCount>(); 233 for (final BytesWritable inputArr : values) { 234 final ByteArrayInputStream stream = new ByteArrayInputStream(inputArr.getBytes()); 235 final TimeperiodTweetCountWordCount instance = IOUtils.read(stream, TimeperiodTweetCountWordCount.class); 236 if (instance.timeperiod == END_TIME.get()) 237 endTime = instance; 238 else 239 otherTimes.add(instance); 240 } 241 /* 242 * # for entry in input: # DF = wf/tf # IDF = Ttf/Twf 243 */ 244 // Total number of tweets in all timeperiods 245 final long Ttf = endTime.tweetcount; 246 // Number of tweets containing this word in all timeperiods 247 final long Twf = endTime.wordcount; 248 final TreeSet<WordDFIDF> allDFIDF = new TreeSet<WordDFIDF>(); 249 for (final TimeperiodTweetCountWordCount tcwc : otherTimes) { 250 // Number of tweets in this timeperiod 251 final long tf = tcwc.tweetcount; 252 // Number of tweets containing this word in this time period 253 final long wf = tcwc.wordcount; 254 255 final WordDFIDF dfidf = new WordDFIDF(tcwc.timeperiod, wf, tf, Twf, Ttf); 256 allDFIDF.add(dfidf); 257 } 258 final List<WordDFIDF> listVersion = new ArrayList<WordDFIDF>(); 259 listVersion.addAll(allDFIDF); 260 final WriteableListBinary<WordDFIDF> writeableCollection = new WriteableListBinary<WordDFIDF>(listVersion) { 261 @Override 262 protected void writeValue(WordDFIDF v, DataOutput out) throws IOException { 263 v.writeBinary(out); 264 } 265 266 }; 267 context.write(word, new BytesWritable(IOUtils.serialize(writeableCollection))); 268 } 269 } 270 271 /** 272 * reduce input: <word: [ <timeperiod, tweet:#freq, word:#freq>, 273 * <timeperiod, tweet:#freq, word:#freq>,... ] but unlike {@link Reduce} 274 * expects that each timeperiod may appear multiple times (i.e. each 275 * timeperiod was not combined!) reduce output: # <word: <timePeriod, 276 * DFIDF>,...> 277 * 278 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 279 */ 280 public static class NonCombinedTimesReducer extends 281 Reducer<Text, BytesWritable, Text, BytesWritable> 282 { 283 284 private HadoopTwitterTokenToolOptions options; 285 private WritableEnumCounter<TextEntryType> tgs; 286 private TimeFrequencyHolder timeIndex; 287 288 /** 289 * default construct does nothing 290 */ 291 public NonCombinedTimesReducer() { 292 293 } 294 295 protected synchronized void loadOptions(Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) 296 throws IOException 297 { 298 try { 299 options = new HadoopTwitterTokenToolOptions(context.getConfiguration().getStrings(ARGS_KEY)); 300 options.prepare(); 301 final Path outpath = HadoopToolsUtil.getOutputPath(options); 302 303 timeIndex = CountTweetsInTimeperiod.readTimeIndex(CountTweetsInTimeperiod.constructIndexPath(outpath)); 304 final Path timecountOut = new Path(outpath, CountTweetsInTimeperiod.TIMECOUNT_DIR); 305 final Path statsout = new Path(timecountOut, CountTweetsInTimeperiod.GLOBAL_STATS_FILE); 306 final FileSystem fs = HadoopToolsUtil.getFileSystem(statsout); 307 final WritableEnumCounter<TextEntryType> et = new WritableEnumCounter<TextEntryType>() { 308 @Override 309 public TextEntryType valueOf(String str) { 310 return TextEntryType.valueOf(str); 311 } 312 313 }; 314 tgs = IOUtils.read(fs.open(statsout), et); 315 } catch (final CmdLineException e) { 316 e.printStackTrace(); 317 throw new IOException(e); 318 } catch (final Exception e) { 319 e.printStackTrace(); 320 throw new IOException(e); 321 } 322 } 323 324 @Override 325 protected void setup(Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, 326 InterruptedException 327 { 328 loadOptions(context); 329 } 330 331 @Override 332 protected void reduce(Text word, Iterable<BytesWritable> values, 333 Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, 334 InterruptedException 335 { 336 // read all timeperiods to objects, find the END_TIME instance, hold 337 // the rest 338 /* 339 * read time period tweet frequency from entry tf read time period 340 * word frequency from entry wf construct a single count for all 341 * time periods of the above 342 * 343 * Go through compiled timeperiod counts in order, keep a count of 344 * total tweets up to a time period and total words up to a time 345 * period use the current total to construct a WordDFIDF object per 346 * time period 347 */ 348 TimeperiodTweetCountWordCount endTime = null; 349 final TreeSet<Long> times = new TreeSet<Long>(); 350 final HashMap<Long, TimeperiodTweetCountWordCount> otherTimes = new HashMap<Long, TimeperiodTweetCountWordCount>(); 351 System.out.println("STARTING WORD: " + word); 352 for (final BytesWritable inputArr : values) { 353 final ByteArrayInputStream stream = new ByteArrayInputStream(inputArr.getBytes()); 354 final TimeperiodTweetCountWordCount instance = IOUtils.read(stream, TimeperiodTweetCountWordCount.class); 355 System.out.println("... FOUND TIME INSTANCE:" + instance.timeperiod); 356 if (instance.timeperiod == END_TIME.get()) 357 { 358 if (endTime == null) 359 { 360 // System.out.println("... end time CREATED"); 361 endTime = instance; 362 endTime.tweetcount = tgs.getValue(TextEntryType.VALID); 363 } 364 else 365 { 366 // System.out.println("... end time INCREMENTED"); 367 endTime.wordcount += instance.wordcount; 368 } 369 // Skip, not important! 370 371 } 372 else 373 { 374 times.add(instance.timeperiod); 375 final TimeperiodTweetCountWordCount currentTimeCounter = otherTimes.get(instance.timeperiod); 376 System.out.println("Instance tweet count: " + instance.tweetcount); 377 System.out.println("Instance word count: " + instance.wordcount); 378 if (currentTimeCounter == null) { 379 System.out.println("... time CREATED"); 380 otherTimes.put(instance.timeperiod, instance); 381 } 382 else { 383 System.out.println("... incremented time CREATED"); 384 currentTimeCounter.tweetcount += instance.tweetcount; 385 currentTimeCounter.wordcount += instance.wordcount; 386 } 387 } 388 } 389 // System.out.println("... TOTAL tweets = " + endTime.tweetcount); 390 // System.out.println("... TOTAL tweets with THIS word = " + 391 // endTime.wordcount); 392 /* 393 * # for entry in input: # DF = wf/tf # IDF = Ttf/Twf 394 */ 395 // Total number of tweets seen so far 396 // Number of tweets containing this word seen so far 397 long Twf = 0; 398 final TreeSet<WordDFIDF> allDFIDF = new TreeSet<WordDFIDF>(); 399 for (final Long time : times) { 400 final TimeperiodTweetCountWordCount tcwc = otherTimes.get(time); 401 final TimeFrequency indexEntry = timeIndex.get(time); 402 403 final long Ttf = indexEntry.cumulativeFrequency; 404 final long tf = indexEntry.periodFrequency; 405 // Number of tweets containing this word in this time period 406 final long wf = tcwc.wordcount; 407 Twf += wf; 408 final WordDFIDF dfidf = new WordDFIDF(tcwc.timeperiod, wf, tf, Twf, Ttf); 409 allDFIDF.add(dfidf); 410 } 411 final List<WordDFIDF> listVersion = new ArrayList<WordDFIDF>(); 412 listVersion.addAll(allDFIDF); 413 final WriteableListBinary<WordDFIDF> writeableCollection = new WriteableListBinary<WordDFIDF>(listVersion) { 414 @Override 415 protected void writeValue(WordDFIDF v, DataOutput out) throws IOException { 416 v.writeBinary(out); 417 } 418 419 }; 420 context.write(word, new BytesWritable(IOUtils.serialize(writeableCollection))); 421 } 422 } 423 424 @Override 425 public SimpleSequenceFileStage<LongWritable, BytesWritable, Text, BytesWritable> stage() { 426 return new SimpleSequenceFileStage<LongWritable, BytesWritable, Text, BytesWritable>() { 427 @Override 428 public void setup(Job job) { 429 job.getConfiguration().setStrings(CountWordsAcrossTimeperiod.ARGS_KEY, nonHadoopArgs); 430 431 // If times are not combined, each reducer has to do a bit more 432 // work than usual, t 433 if (!CountWordsAcrossTimeperiod.this.combinedTimes) 434 job.setNumReduceTasks(26); 435 } 436 437 @Override 438 public Class<? extends Mapper<LongWritable, BytesWritable, Text, BytesWritable>> mapper() { 439 return CountWordsAcrossTimeperiod.Map.class; 440 } 441 442 @Override 443 public Class<? extends Reducer<Text, BytesWritable, Text, BytesWritable>> reducer() { 444 if (CountWordsAcrossTimeperiod.this.combinedTimes) 445 return CountWordsAcrossTimeperiod.Reduce.class; 446 else 447 return CountWordsAcrossTimeperiod.NonCombinedTimesReducer.class; 448 449 } 450 451 @Override 452 public String outname() { 453 return WORDCOUNT_DIR; 454 } 455 }; 456 } 457 458}