001/**
002 * Copyright (c) 2011, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.hadoop.tools.globalfeature;
031
032import java.io.ByteArrayInputStream;
033import java.io.ByteArrayOutputStream;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039
040import org.apache.hadoop.conf.Configured;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.io.BytesWritable;
043import org.apache.hadoop.io.SequenceFile;
044import org.apache.hadoop.io.Text;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.Mapper;
047import org.apache.hadoop.util.Tool;
048import org.apache.hadoop.util.ToolRunner;
049import org.apache.log4j.Logger;
050import org.openimaj.feature.FeatureVector;
051import org.openimaj.hadoop.mapreduce.TextBytesJobUtil;
052import org.openimaj.hadoop.sequencefile.MetadataConfiguration;
053import org.openimaj.hadoop.tools.HadoopToolsUtil;
054import org.openimaj.image.ImageUtilities;
055import org.openimaj.image.MBFImage;
056import org.openimaj.io.IOUtils;
057
058/**
059 * A Hadoop version of the GlobalFeaturesTool. Capable of 
060 * extracting global image features on very large scale corpora
061 * from images stored in {@link SequenceFile}s.
062 * 
063 * @author Jonathon Hare (jsh2@ecs.soton.ac.uk)
064 */
065public class HadoopGlobalFeaturesTool extends Configured implements Tool 
066{
067        private static final String ARGS_KEY = "globalfeatures.args";
068        private static Logger logger = Logger.getLogger(HadoopGlobalFeaturesTool.class);
069
070        static class GlobalFeaturesMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
071                private HadoopGlobalFeaturesOptions options;
072
073                public GlobalFeaturesMapper() {}
074
075                @Override
076                protected void setup(Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) {
077                        options = new HadoopGlobalFeaturesOptions(context.getConfiguration().getStrings(ARGS_KEY)); 
078                }
079
080                @Override
081                protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) throws InterruptedException {
082                        try {
083                                MBFImage img = ImageUtilities.readMBF(new ByteArrayInputStream(value.getBytes()));
084                                FeatureVector fv = options.featureOp.extract(img);
085
086                                ByteArrayOutputStream baos = new ByteArrayOutputStream();
087                                if (options.binary)
088                                        IOUtils.writeBinary(baos, fv);
089                                else
090                                        IOUtils.writeASCII(baos, fv);
091
092                                context.write(key, new BytesWritable(baos.toByteArray()));
093                        } catch (Exception e) {
094                                logger.warn("Problem processing image " + key + " (" + e + ")");
095                        } 
096                }
097        }
098
099        @Override
100        public int run(String[] args) throws Exception {
101                HadoopGlobalFeaturesOptions options = new HadoopGlobalFeaturesOptions(args,true); 
102
103                Map<String,String> metadata = new HashMap<String,String>();
104                metadata.put(MetadataConfiguration.CONTENT_TYPE_KEY, "application/globalfeature-" + options.feature + "-" + (options.binary? "bin" : "ascii" ));
105
106                metadata.put("clusterquantiser.filetype", (options.binary ? "bin" : "ascii" ));
107                
108                List<Path> allPaths = new ArrayList<Path>();
109                for (String p : options.input) {
110                        allPaths.addAll(Arrays.asList(HadoopToolsUtil.getInputPaths(p)));
111                }
112                
113                Job job = TextBytesJobUtil.createJob(allPaths, new Path(options.output), metadata, this.getConf());
114                job.setJarByClass(this.getClass());
115                job.setMapperClass(GlobalFeaturesMapper.class);
116                job.getConfiguration().setStrings(ARGS_KEY, args);
117                job.setNumReduceTasks(0);
118
119                job.waitForCompletion(true);
120                
121                return 0;
122        }
123
124        /**
125         * The main method for the tool.
126         * @param args the command-line arguments
127         * @throws Exception if an error occurs
128         */
129        public static void main( String[] args ) throws Exception
130        {
131                ToolRunner.run(new HadoopGlobalFeaturesTool(), args);
132        }
133}