001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv;
031
032import java.io.BufferedReader;
033import java.io.IOException;
034import java.io.InputStreamReader;
035import java.util.LinkedHashMap;
036import java.util.Map.Entry;
037
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.io.BytesWritable;
042import org.apache.hadoop.io.LongWritable;
043import org.apache.hadoop.io.NullWritable;
044import org.apache.hadoop.io.Text;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.Mapper;
047import org.apache.hadoop.mapreduce.Reducer;
048import org.openimaj.hadoop.mapreduce.stage.StageProvider;
049import org.openimaj.hadoop.mapreduce.stage.helper.SequenceFileTextStage;
050import org.openimaj.hadoop.mapreduce.stage.helper.SimpleSequenceFileTextStage;
051import org.openimaj.hadoop.tools.HadoopToolsUtil;
052import org.openimaj.hadoop.tools.twitter.utils.WordDFIDF;
053import org.openimaj.hadoop.tools.twitter.utils.WordDFIDFTimeSeries;
054import org.openimaj.util.pair.IndependentPair;
055
056import com.Ostermiller.util.CSVParser;
057
058
059/**
060 * Output the word/time values for each word
061 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
062 *
063 */
064public class Values extends StageProvider{
065        private String outputPath;
066        private int valueReduceSplit;
067        private boolean sortValueByTime;
068        private boolean matlabOutput;
069        /**
070         * Assign the output path for the stage
071         * @param outputPath
072         * @param sortValueByTime 
073         * @param matlabOutput 
074         */
075        public Values(String outputPath, int valueReduceSplit, boolean sortValueByTime, boolean matlabOutput) {
076                this.outputPath = outputPath;
077                this.valueReduceSplit = valueReduceSplit;
078                this.sortValueByTime = sortValueByTime;
079                this.matlabOutput = matlabOutput;
080        }
081        /**
082         * The index location config option
083         */
084        public static final String ARGS_KEY = "INDEX_ARGS";
085        public static final String MATLAB_OUT = "org.openimaj.hadoop.tools.twitter.token.outputmode.sparsecsv.matlab_out";
086        @Override
087        public SequenceFileTextStage<?,?,?, ?,?, ?> stage() {
088                if(this.sortValueByTime){
089                        return new SequenceFileTextStage<Text, BytesWritable, LongWritable, BytesWritable,NullWritable, Text> () {
090                                @Override
091                                public void setup(Job job) {
092                                        job.setNumReduceTasks(valueReduceSplit);
093                                        job.getConfiguration().setStrings(Values.ARGS_KEY, new String[]{outputPath.toString()});
094                                        job.getConfiguration().setBoolean(MATLAB_OUT, matlabOutput);
095                                }
096                                @Override
097                                public Class<? extends Mapper<Text, BytesWritable, LongWritable, BytesWritable>> mapper() {
098                                        return MapValuesByTime.class;
099                                }
100                                @Override
101                                public Class<? extends Reducer<LongWritable,BytesWritable,NullWritable,Text>> reducer() {
102                                        return ReduceValuesByTime.class;
103                                }                       
104                                @Override
105                                public String outname() {
106                                        return "values";
107                                }
108                                
109                                @Override
110                                public void finished(Job job) {
111                                        if(matlabOutput){
112                                                try {
113                                                        WordIndex.writeToMatlab(outputPath.toString());
114                                                        TimeIndex.writeToMatlab(outputPath.toString());
115                                                        System.out.println("Done writing the word and time index files to matlab");
116                                                } catch (IOException e) {
117                                                        System.out.println("Failed to write the word and time index files");
118                                                }
119                                        }
120                                }
121                        };
122                }
123                else{
124                        return new SimpleSequenceFileTextStage<Text, BytesWritable, NullWritable, Text> () {
125                                @Override
126                                public void setup(Job job) {
127                                        job.setNumReduceTasks(valueReduceSplit);
128                                        job.getConfiguration().setStrings(Values.ARGS_KEY, new String[]{outputPath.toString()});
129                                }
130                                @Override
131                                public Class<? extends Mapper<Text, BytesWritable, NullWritable, Text>> mapper() {
132                                        return MapValuesByWord.class;
133                                }
134                                @Override
135                                public Class<? extends Reducer<NullWritable,Text,NullWritable,Text>> reducer() {
136                                        return ReduceValuesByWord.class;
137                                }                       
138                                @Override
139                                public String outname() {
140                                        return "values";
141                                }
142                        };
143                }
144        }
145        /**
146         * Construct a time series per word 
147         * 
148         * @param path
149         * @param timeIndex
150         * @param wordIndex
151         * @return hashmap containing a {@link WordDFIDFTimeSeries} instance per word
152         * @throws IOException 
153         */
154        public static LinkedHashMap<String, WordDFIDFTimeSeries> readWordDFIDF(String path,LinkedHashMap<Long, IndependentPair<Long, Long>> timeIndex,LinkedHashMap<String, IndependentPair<Long, Long>> wordIndex) throws IOException {
155                LinkedHashMap<String, WordDFIDFTimeSeries> tsMap = new LinkedHashMap<String, WordDFIDFTimeSeries>();
156                
157                long[] timeReverseIndex = new long[timeIndex.size()];
158                for (Entry<Long, IndependentPair<Long, Long>> l : timeIndex.entrySet()) {
159                        long lineNum = l.getValue().secondObject();
160                        timeReverseIndex[(int) lineNum] = l.getKey();
161                }
162                
163                String[] wordReverseIndex = new String[wordIndex.size()];
164                for (Entry<String, IndependentPair<Long, Long>> w : wordIndex.entrySet()) {
165                        long lineNum = w.getValue().secondObject();
166                        wordReverseIndex[(int) lineNum] = w.getKey();
167                }
168                String wordPath = path + "/values";
169                Path p = HadoopToolsUtil.getInputPaths(wordPath)[0];
170                FileSystem fs = HadoopToolsUtil.getFileSystem(p);
171                FSDataInputStream toRead = fs.open(p);
172                BufferedReader reader = new BufferedReader(new InputStreamReader(toRead));
173                CSVParser csvreader = new CSVParser(reader);
174                long lineN = 0;
175                String[] next = null;
176                
177                
178                while((next = csvreader.getLine())!=null && next.length > 0){
179//                      writer.writeln(new String[]{wordI + "",timeI + "",idf.wf + "",idf.tf + "",idf.Twf + "", idf.Ttf + ""});
180                        int wordI = Integer.parseInt(next[0]);
181                        int timeI = Integer.parseInt(next[1]);
182                        long wf = Long.parseLong(next[2]);
183                        long tf = Long.parseLong(next[3]);
184                        long Twf = Long.parseLong(next[4]);
185                        long Ttf = Long.parseLong(next[5]);
186                        long time = timeReverseIndex[timeI];
187                        WordDFIDF wordDFIDF = new WordDFIDF(time, wf, tf, Twf, Ttf);
188                        String word = wordReverseIndex[wordI];
189                        WordDFIDFTimeSeries current = tsMap.get(word);
190                        if(current == null){
191                                tsMap.put(word, current = new WordDFIDFTimeSeries());
192                        }
193                        current.add(time, wordDFIDF);
194                        lineN ++;
195                }
196                
197                return tsMap;
198        }
199}