001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv; 031 032import java.io.BufferedReader; 033import java.io.IOException; 034import java.io.InputStreamReader; 035import java.util.LinkedHashMap; 036import java.util.Map.Entry; 037 038import org.apache.hadoop.fs.FSDataInputStream; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.io.BytesWritable; 042import org.apache.hadoop.io.LongWritable; 043import org.apache.hadoop.io.NullWritable; 044import org.apache.hadoop.io.Text; 045import org.apache.hadoop.mapreduce.Job; 046import org.apache.hadoop.mapreduce.Mapper; 047import org.apache.hadoop.mapreduce.Reducer; 048import org.openimaj.hadoop.mapreduce.stage.StageProvider; 049import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage; 050import org.openimaj.hadoop.mapreduce.stage.helper.SimpleSequenceFileTextStage; 051import org.openimaj.hadoop.tools.HadoopToolsUtil; 052import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF; 053import org.openimaj.hadoop.tools.twitter.utils.WordDFIDFTimeSeries; 054import org.openimaj.util.pair.IndependentPair; 055 056import com.Ostermiller.util.CSVParser; 057 058 059/** 060 * Output the word/time values for each word 061 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 062 * 063 */ 064public class Values extends StageProvider{ 065 private String outputPath; 066 private int valueReduceSplit; 067 private boolean sortValueByTime; 068 private boolean matlabOutput; 069 /** 070 * Assign the output path for the stage 071 * @param outputPath 072 * @param sortValueByTime 073 * @param matlabOutput 074 */ 075 public Values(String outputPath, int valueReduceSplit, boolean sortValueByTime, boolean matlabOutput) { 076 this.outputPath = outputPath; 077 this.valueReduceSplit = valueReduceSplit; 078 this.sortValueByTime = sortValueByTime; 079 this.matlabOutput = matlabOutput; 080 } 081 /** 082 * The index location config option 083 */ 084 public static final String ARGS_KEY = "INDEX_ARGS"; 085 public static final String MATLAB_OUT = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.matlab_out"; 086 @Override 087 public SequenceFileTextStage<?,?,?, ?,?, ?> stage() { 088 if(this.sortValueByTime){ 089 return new SequenceFileTextStage<Text, BytesWritable, LongWritable, BytesWritable,NullWritable, Text> () { 090 @Override 091 public void setup(Job job) { 092 job.setNumReduceTasks(valueReduceSplit); 093 job.getConfiguration().setStrings(Values.ARGS_KEY, new String[]{outputPath.toString()}); 094 job.getConfiguration().setBoolean(MATLAB_OUT, matlabOutput); 095 } 096 @Override 097 public Class<? extends Mapper<Text, BytesWritable, LongWritable, BytesWritable>> mapper() { 098 return MapValuesByTime.class; 099 } 100 @Override 101 public Class<? extends Reducer<LongWritable,BytesWritable,NullWritable,Text>> reducer() { 102 return ReduceValuesByTime.class; 103 } 104 @Override 105 public String outname() { 106 return "values"; 107 } 108 109 @Override 110 public void finished(Job job) { 111 if(matlabOutput){ 112 try { 113 WordIndex.writeToMatlab(outputPath.toString()); 114 TimeIndex.writeToMatlab(outputPath.toString()); 115 System.out.println("Done writing the word and time index files to matlab"); 116 } catch (IOException e) { 117 System.out.println("Failed to write the word and time index files"); 118 } 119 } 120 } 121 }; 122 } 123 else{ 124 return new SimpleSequenceFileTextStage<Text, BytesWritable, NullWritable, Text> () { 125 @Override 126 public void setup(Job job) { 127 job.setNumReduceTasks(valueReduceSplit); 128 job.getConfiguration().setStrings(Values.ARGS_KEY, new String[]{outputPath.toString()}); 129 } 130 @Override 131 public Class<? extends Mapper<Text, BytesWritable, NullWritable, Text>> mapper() { 132 return MapValuesByWord.class; 133 } 134 @Override 135 public Class<? extends Reducer<NullWritable,Text,NullWritable,Text>> reducer() { 136 return ReduceValuesByWord.class; 137 } 138 @Override 139 public String outname() { 140 return "values"; 141 } 142 }; 143 } 144 } 145 /** 146 * Construct a time series per word 147 * 148 * @param path 149 * @param timeIndex 150 * @param wordIndex 151 * @return hashmap containing a {@link WordDFIDFTimeSeries} instance per word 152 * @throws IOException 153 */ 154 public static LinkedHashMap<String, WordDFIDFTimeSeries> readWordDFIDF(String path,LinkedHashMap<Long, IndependentPair<Long, Long>> timeIndex,LinkedHashMap<String, IndependentPair<Long, Long>> wordIndex) throws IOException { 155 LinkedHashMap<String, WordDFIDFTimeSeries> tsMap = new LinkedHashMap<String, WordDFIDFTimeSeries>(); 156 157 long[] timeReverseIndex = new long[timeIndex.size()]; 158 for (Entry<Long, IndependentPair<Long, Long>> l : timeIndex.entrySet()) { 159 long lineNum = l.getValue().secondObject(); 160 timeReverseIndex[(int) lineNum] = l.getKey(); 161 } 162 163 String[] wordReverseIndex = new String[wordIndex.size()]; 164 for (Entry<String, IndependentPair<Long, Long>> w : wordIndex.entrySet()) { 165 long lineNum = w.getValue().secondObject(); 166 wordReverseIndex[(int) lineNum] = w.getKey(); 167 } 168 String wordPath = path + "/values"; 169 Path p = HadoopToolsUtil.getInputPaths(wordPath)[0]; 170 FileSystem fs = HadoopToolsUtil.getFileSystem(p); 171 FSDataInputStream toRead = fs.open(p); 172 BufferedReader reader = new BufferedReader(new InputStreamReader(toRead)); 173 CSVParser csvreader = new CSVParser(reader); 174 long lineN = 0; 175 String[] next = null; 176 177 178 while((next = csvreader.getLine())!=null && next.length > 0){ 179// writer.writeln(new String[]{wordI + "",timeI + "",idf.wf + "",idf.tf + "",idf.Twf + "", idf.Ttf + ""}); 180 int wordI = Integer.parseInt(next[0]); 181 int timeI = Integer.parseInt(next[1]); 182 long wf = Long.parseLong(next[2]); 183 long tf = Long.parseLong(next[3]); 184 long Twf = Long.parseLong(next[4]); 185 long Ttf = Long.parseLong(next[5]); 186 long time = timeReverseIndex[timeI]; 187 WordDFIDF wordDFIDF = new WordDFIDF(time, wf, tf, Twf, Ttf); 188 String word = wordReverseIndex[wordI]; 189 WordDFIDFTimeSeries current = tsMap.get(word); 190 if(current == null){ 191 tsMap.put(word, current = new WordDFIDFTimeSeries()); 192 } 193 current.add(time, wordDFIDF); 194 lineN ++; 195 } 196 197 return tsMap; 198 } 199}