001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.twitter.token.mode.pointwisemi.count;
031
032import java.io.BufferedReader;
033import java.io.IOException;
034import java.io.InputStreamReader;
035import java.util.HashMap;
036import java.util.Map;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.io.BytesWritable;
045import org.apache.hadoop.mapred.JobConf;
046import org.apache.hadoop.mapreduce.Job;
047import org.apache.hadoop.mapreduce.Reducer;
048import org.openimaj.hadoop.mapreduce.stage.helper.TextByteByteStage;
049import org.openimaj.hadoop.tools.HadoopToolsUtil;
050import org.openimaj.hadoop.tools.twitter.HadoopTwitterTokenToolOptions;
051import org.openimaj.io.IOUtils;
052
053/**
054 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
055 *
056 */
057public class PairMutualInformation extends TextByteByteStage{
058
059        /**
060         * The time delta between time periods
061         */
062        public static final String TIMEDELTA = "org.openimaj.hadoop.tools.twitter.token.mode.pairwisemi.timedelta";
063        /**
064         * The location of the statistics file
065         */
066        public static final String PAIR_STATS_FILE = "pairstats";
067        /**
068         * The pairMI output directory
069         */
070        public static final String PAIRMI_DIR = "pairmi";
071        /**
072         * The root directory where timeperiod pair counts will be stored
073         */
074        public static final String TIMEPERIOD_COUNT_OUTPUT_ROOT = "org.openimaj.hadoop.tools.twitter.token.mode.pairwisemi.timeoutloc";
075        /**
076         * Name of the timeperiod count directory
077         */
078        public static final String TIMEPERIOD_OUTPUT_NAME = "timeperiod_counts";
079        private String[] nonHadoopArgs;
080        private long timedelta;
081        private Path actualOutputLocation;
082
083        /**
084         * @param outpath where the output is going
085         * @param nonHadoopArgs the arguments for configuration
086         */
087        public PairMutualInformation(String[] nonHadoopArgs, long timedelta) {
088                this.nonHadoopArgs = nonHadoopArgs;
089                this.timedelta = timedelta;
090        }
091
092        @Override
093        public void setup(Job job) throws IOException {
094                job.getConfiguration().setStrings(HadoopTwitterTokenToolOptions.ARGS_KEY, nonHadoopArgs);
095                job.getConfiguration().setLong(TIMEDELTA, timedelta);
096                Path tpcOutRoot = new Path(this.actualOutputLocation,TIMEPERIOD_OUTPUT_NAME);
097                job.getConfiguration().set(TIMEPERIOD_COUNT_OUTPUT_ROOT, tpcOutRoot.toString());
098                if(timedelta!=-1){
099                        // if there are multiple times, split a file per day 
100                        job.setNumReduceTasks(365);
101                }
102                
103                ((JobConf)job.getConfiguration()).setOutputValueGroupingComparator(TokenPairValueGroupingComparator.class);
104                ((JobConf)job.getConfiguration()).setOutputKeyComparatorClass(TokenPairKeyComparator.class);
105                job.setPartitionerClass(TokenPairPartitioner.class);
106        }
107        
108        @Override
109        public Class<PairEmit> mapper() {
110                return PairEmit.class;
111        }
112        
113        
114        @Override
115        public Class<? extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> combiner() {
116                return PairEmitCombiner.class;
117        }
118        
119        @Override
120        public Job stage(Path[] inputs, Path output, Configuration conf) throws Exception {
121                this.actualOutputLocation = output; 
122                return super.stage(inputs, output, conf);
123        }
124        
125        @Override
126        public Class<? extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>> reducer() {
127                return PairEmitCounter.class;
128        }
129        
130        @Override
131        public String outname() {
132                return PAIRMI_DIR;
133        }
134        @Override
135        public void finished(Job job) {
136                Path out = new Path(actualOutputLocation, PAIR_STATS_FILE);
137                FileSystem fs;
138                try {
139                        fs = HadoopToolsUtil.getFileSystem(out);
140                        FSDataOutputStream os = fs.create(out);
141                        IOUtils.writeASCII(os, new WritablePairEnum(job.getCounters(),PairEnum.values()));
142                } catch (IOException e) {
143                }
144        }
145        
146        /**
147         * Load the PointwisePMI stats file from an output location (Path: outpath/{@link PairMutualInformation#PAIR_STATS_FILE}
148         * @param outpath
149         * @return a WritablePairEnum instance with the counter values filled
150         * @throws IOException
151         */
152        public static WritablePairEnum loadStats(Path outpath) throws IOException{
153                Path pmistats = new Path(outpath,PairMutualInformation.PAIRMI_DIR);
154                pmistats = new Path(pmistats,PairMutualInformation.PAIR_STATS_FILE);
155                FileSystem fs = HadoopToolsUtil.getFileSystem(pmistats);
156                FSDataInputStream inp = fs.open(pmistats);
157                WritablePairEnum ret = IOUtils.read(inp,WritablePairEnum.class);
158                return ret;
159        }
160
161        /**
162         * Load the total pairs seen in every time period from the pairmi location provided
163         * @param pairmiloc a directory which contains {@link #PAIRMI_DIR}/{@link #TIMEPERIOD_OUTPUT_NAME}
164         * @return map of a time period to a count
165         * @throws IOException 
166         */
167        public static Map<Long,Long> loadTimeCounts(Path pairmiloc) throws IOException {
168                Path dir = new Path(new Path(pairmiloc,PAIRMI_DIR),TIMEPERIOD_OUTPUT_NAME);
169                FileSystem fs = HadoopToolsUtil.getFileSystem(dir);
170                FileStatus[] timePaths = fs.listStatus(dir);
171                
172                Map<Long, Long> out = new HashMap<Long, Long>();
173                for (FileStatus fileStatus : timePaths) {
174                        Path fsp = fileStatus.getPath();
175                        Long time = Long.parseLong(fsp.getName());
176                        BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(fsp)));
177                        Long count = Long.parseLong(reader.readLine());
178                        out.put(time, count);
179                }
180                return out ;
181        }
182        
183}