001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.fastkmeans;
031
032import java.io.ByteArrayInputStream;
033import java.io.IOException;
034
035import org.apache.hadoop.io.BytesWritable;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.mapreduce.Mapper;
038import org.apache.hadoop.mapreduce.Reducer;
039
040import org.openimaj.tools.clusterquantiser.FeatureFile;
041import org.openimaj.tools.clusterquantiser.FeatureFileFeature;
042import org.openimaj.tools.clusterquantiser.FileType;
043
044public class ImageFeatureSelect {
045        public static final String FILETYPE_KEY = "uk.ac.soton.ecs.jsh2.clusterquantiser.FileType";
046        public static final String NFEATURE_KEY = "uk.ac.soton.ecs.ss.hadoop.fastkmeans.nfeatures";
047        public static class Map extends Mapper<Text, BytesWritable, Text, BytesWritable> 
048        {
049                private int nfeatures = -1;
050                private static FileType fileType = null;
051                private IndexedByteArrayPriorityQueue queue = null; 
052                
053                @Override
054                protected void setup(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context)throws IOException, InterruptedException{
055                        System.out.println("Setting up mapper");
056                        if (fileType == null) {
057                                fileType = FileType.valueOf(context.getConfiguration().getStrings(FILETYPE_KEY)[0]);
058                        }
059                        nfeatures  = Integer.parseInt(context.getConfiguration().getStrings(NFEATURE_KEY)[0]);
060                        if(nfeatures!=-1)
061                                queue = new IndexedByteArrayPriorityQueue(nfeatures);
062                }
063                
064                @Override
065                public void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
066                        System.out.println("Reading from file: " + key);
067                        try{
068                                FeatureFile input = fileType.read(new ByteArrayInputStream(value.getBytes()));
069                                int index = 0;
070                                for(FeatureFileFeature ff : input){
071                                        index++;
072                                        IndexedByteArray iba = new IndexedByteArray(ff.data);
073                                        if(queue!=null){
074                                                queue.insert(iba);
075                                        }
076                                        else{
077                                                context.write(new Text(iba.index + ""), new BytesWritable(iba.array));
078                                        }
079                                        
080                                }
081                        }
082                        catch(Throwable t){
083                                System.out.println("There was an error while reading the features in the map phase");
084                                t.printStackTrace();
085                        }
086//                      
087                }
088                
089                @Override
090                protected void cleanup(Context context) throws IOException, InterruptedException {
091                        System.out.println("Cleaning up (emitting)");
092                        try {
093                                if(queue !=null)
094                                        while(this.queue.size() > 0){
095                                                IndexedByteArray item = this.queue.pop();
096                                                context.write(new Text("" + item.index), new BytesWritable(item.array));
097                                        }
098                        } catch (Throwable e) {
099                                System.out.println("There was an error reading features in the cleanup phase");
100                                e.printStackTrace();
101                        }
102                }
103                
104        } 
105        public static class Reduce extends Reducer<Text, BytesWritable, Text, BytesWritable> {
106                private int nfeatures = -1;
107                private int seen = 0;
108                @Override
109                protected void setup(Context context)throws IOException, InterruptedException{
110                        if(nfeatures == -1){
111                                nfeatures  = Integer.parseInt(context.getConfiguration().getStrings(NFEATURE_KEY)[0]);
112                        }
113                }
114                
115                @Override
116                public void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
117                        if(nfeatures != -1 && seen >= nfeatures){
118                                return;
119                        }
120                        for (BytesWritable val : values) {
121                                context.write(new Text(seen+""), val);
122                                seen++;
123                                if(seen >= nfeatures){
124                                        return;
125                                }
126                        }
127                }
128        }
129}