001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.mode.pointwisemi.count; 031 032import java.io.BufferedReader; 033import java.io.IOException; 034import java.io.InputStreamReader; 035import java.util.HashMap; 036import java.util.Map; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FSDataOutputStream; 041import org.apache.hadoop.fs.FileStatus; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.io.BytesWritable; 045import org.apache.hadoop.mapred.JobConf; 046import org.apache.hadoop.mapreduce.Job; 047import org.apache.hadoop.mapreduce.Reducer; 048import org.openimaj.hadoop.mapreduce.stage.helper.TextByteByteStage; 049import org.openimaj.hadoop.tools.HadoopToolsUtil; 050import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions; 051import org.openimaj.io.IOUtils; 052 053/** 054 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 055 * 056 */ 057public class PairMutualInformation extends TextByteByteStage{ 058 059 /** 060 * The time delta between time periods 061 */ 062 public static final String TIMEDELTA = "org.openimaj.hadoop.tools.twitter.token.mode.pairwisemi.timedelta"; 063 /** 064 * The location of the statistics file 065 */ 066 public static final String PAIR_STATS_FILE = "pairstats"; 067 /** 068 * The pairMI output directory 069 */ 070 public static final String PAIRMI_DIR = "pairmi"; 071 /** 072 * The root directory where timeperiod pair counts will be stored 073 */ 074 public static final String TIMEPERIOD_COUNT_OUTPUT_ROOT = "org.openimaj.hadoop.tools.twitter.token.mode.pairwisemi.timeoutloc"; 075 /** 076 * Name of the timeperiod count directory 077 */ 078 public static final String TIMEPERIOD_OUTPUT_NAME = "timeperiod_counts"; 079 private String[] nonHadoopArgs; 080 private long timedelta; 081 private Path actualOutputLocation; 082 083 /** 084 * @param outpath where the output is going 085 * @param nonHadoopArgs the arguments for configuration 086 */ 087 public PairMutualInformation(String[] nonHadoopArgs, long timedelta) { 088 this.nonHadoopArgs = nonHadoopArgs; 089 this.timedelta = timedelta; 090 } 091 092 @Override 093 public void setup(Job job) throws IOException { 094 job.getConfiguration().setStrings(HadoopTwitterTokenToolOptions.ARGS_KEY, nonHadoopArgs); 095 job.getConfiguration().setLong(TIMEDELTA, timedelta); 096 Path tpcOutRoot = new Path(this.actualOutputLocation,TIMEPERIOD_OUTPUT_NAME); 097 job.getConfiguration().set(TIMEPERIOD_COUNT_OUTPUT_ROOT, tpcOutRoot.toString()); 098 if(timedelta!=-1){ 099 // if there are multiple times, split a file per day 100 job.setNumReduceTasks(365); 101 } 102 103 ((JobConf)job.getConfiguration()).setOutputValueGroupingComparator(TokenPairValueGroupingComparator.class); 104 ((JobConf)job.getConfiguration()).setOutputKeyComparatorClass(TokenPairKeyComparator.class); 105 job.setPartitionerClass(TokenPairPartitioner.class); 106 } 107 108 @Override 109 public Class<PairEmit> mapper() { 110 return PairEmit.class; 111 } 112 113 114 @Override 115 public Class<? extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> combiner() { 116 return PairEmitCombiner.class; 117 } 118 119 @Override 120 public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception { 121 this.actualOutputLocation = output; 122 return super.stage(inputs, output, conf); 123 } 124 125 @Override 126 public Class<? extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> reducer() { 127 return PairEmitCounter.class; 128 } 129 130 @Override 131 public String outname() { 132 return PAIRMI_DIR; 133 } 134 @Override 135 public void finished(Job job) { 136 Path out = new Path(actualOutputLocation, PAIR_STATS_FILE); 137 FileSystem fs; 138 try { 139 fs = HadoopToolsUtil.getFileSystem(out); 140 FSDataOutputStream os = fs.create(out); 141 IOUtils.writeASCII(os, new WritablePairEnum(job.getCounters(),PairEnum.values())); 142 } catch (IOException e) { 143 } 144 } 145 146 /** 147 * Load the PointwisePMI stats file from an output location (Path: outpath/{@link PairMutualInformation#PAIR_STATS_FILE} 148 * @param outpath 149 * @return a WritablePairEnum instance with the counter values filled 150 * @throws IOException 151 */ 152 public static WritablePairEnum loadStats(Path outpath) throws IOException{ 153 Path pmistats = new Path(outpath,PairMutualInformation.PAIRMI_DIR); 154 pmistats = new Path(pmistats,PairMutualInformation.PAIR_STATS_FILE); 155 FileSystem fs = HadoopToolsUtil.getFileSystem(pmistats); 156 FSDataInputStream inp = fs.open(pmistats); 157 WritablePairEnum ret = IOUtils.read(inp,WritablePairEnum.class); 158 return ret; 159 } 160 161 /** 162 * Load the total pairs seen in every time period from the pairmi location provided 163 * @param pairmiloc a directory which contains {@link #PAIRMI_DIR}/{@link #TIMEPERIOD_OUTPUT_NAME} 164 * @return map of a time period to a count 165 * @throws IOException 166 */ 167 public static Map<Long,Long> loadTimeCounts(Path pairmiloc) throws IOException { 168 Path dir = new Path(new Path(pairmiloc,PAIRMI_DIR),TIMEPERIOD_OUTPUT_NAME); 169 FileSystem fs = HadoopToolsUtil.getFileSystem(dir); 170 FileStatus[] timePaths = fs.listStatus(dir); 171 172 Map<Long, Long> out = new HashMap<Long, Long>(); 173 for (FileStatus fileStatus : timePaths) { 174 Path fsp = fileStatus.getPath(); 175 Long time = Long.parseLong(fsp.getName()); 176 BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(fsp))); 177 Long count = Long.parseLong(reader.readLine()); 178 out.put(time, count); 179 } 180 return out ; 181 } 182 183}