001/** 002 * Copyright (c) 2011, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.hadoop.tools.fastkmeans; 031 032import java.io.ByteArrayInputStream; 033import java.io.IOException; 034 035import org.apache.hadoop.io.BytesWritable; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.mapreduce.Mapper; 038import org.apache.hadoop.mapreduce.Reducer; 039 040import org.openimaj.tools.clusterquantiser.FeatureFile; 041import org.openimaj.tools.clusterquantiser.FeatureFileFeature; 042import org.openimaj.tools.clusterquantiser.FileType; 043 044public class ImageFeatureSelect { 045 public static final String FILETYPE_KEY = "uk.ac.soton.ecs.jsh2.clusterquantiser.FileType"; 046 public static final String NFEATURE_KEY = "uk.ac.soton.ecs.ss.hadoop.fastkmeans.nfeatures"; 047 public static class Map extends Mapper<Text, BytesWritable, Text, BytesWritable> 048 { 049 private int nfeatures = -1; 050 private static FileType fileType = null; 051 private IndexedByteArrayPriorityQueue queue = null; 052 053 @Override 054 protected void setup(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context)throws IOException, InterruptedException{ 055 System.out.println("Setting up mapper"); 056 if (fileType == null) { 057 fileType = FileType.valueOf(context.getConfiguration().getStrings(FILETYPE_KEY)[0]); 058 } 059 nfeatures = Integer.parseInt(context.getConfiguration().getStrings(NFEATURE_KEY)[0]); 060 if(nfeatures!=-1) 061 queue = new IndexedByteArrayPriorityQueue(nfeatures); 062 } 063 064 @Override 065 public void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { 066 System.out.println("Reading from file: " + key); 067 try{ 068 FeatureFile input = fileType.read(new ByteArrayInputStream(value.getBytes())); 069 int index = 0; 070 for(FeatureFileFeature ff : input){ 071 index++; 072 IndexedByteArray iba = new IndexedByteArray(ff.data); 073 if(queue!=null){ 074 queue.insert(iba); 075 } 076 else{ 077 context.write(new Text(iba.index + ""), new BytesWritable(iba.array)); 078 } 079 080 } 081 } 082 catch(Throwable t){ 083 System.out.println("There was an error while reading the features in the map phase"); 084 t.printStackTrace(); 085 } 086// 087 } 088 089 @Override 090 protected void cleanup(Context context) throws IOException, InterruptedException { 091 System.out.println("Cleaning up (emitting)"); 092 try { 093 if(queue !=null) 094 while(this.queue.size() > 0){ 095 IndexedByteArray item = this.queue.pop(); 096 context.write(new Text("" + item.index), new BytesWritable(item.array)); 097 } 098 } catch (Throwable e) { 099 System.out.println("There was an error reading features in the cleanup phase"); 100 e.printStackTrace(); 101 } 102 } 103 104 } 105 public static class Reduce extends Reducer<Text, BytesWritable, Text, BytesWritable> { 106 private int nfeatures = -1; 107 private int seen = 0; 108 @Override 109 protected void setup(Context context)throws IOException, InterruptedException{ 110 if(nfeatures == -1){ 111 nfeatures = Integer.parseInt(context.getConfiguration().getStrings(NFEATURE_KEY)[0]); 112 } 113 } 114 115 @Override 116 public void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { 117 if(nfeatures != -1 && seen >= nfeatures){ 118 return; 119 } 120 for (BytesWritable val : values) { 121 context.write(new Text(seen+""), val); 122 seen++; 123 if(seen >= nfeatures){ 124 return; 125 } 126 } 127 } 128 } 129}