001package org.openimaj.picslurper;
002
003import java.io.File;
004import java.util.List;
005import java.util.Map;
006
007import org.openimaj.picslurper.output.OutputListener;
008import org.openimaj.twitter.collection.StreamJSONStatusList.ReadableWritableJSON;
009
010import twitter4j.Status;
011import twitter4j.internal.json.z_T4JInternalJSONImplFactory;
012import twitter4j.internal.org.json.JSONObject;
013import backtype.storm.task.OutputCollector;
014import backtype.storm.task.TopologyContext;
015import backtype.storm.topology.IRichBolt;
016import backtype.storm.topology.OutputFieldsDeclarer;
017import backtype.storm.tuple.Tuple;
018
019/**
020 * A download bolt instantiates {@link StatusConsumer} on {@link Status} recieved
021 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
022 *
023 */
024public class DownloadBolt implements IRichBolt {
025
026        /**
027         *
028         */
029        private static final long serialVersionUID = -6459004768597784620L;
030        private boolean stats;
031        private File globalStats;
032        private File outputLocation;
033        private OutputCollector collector;
034        private z_T4JInternalJSONImplFactory factory;
035        private List<OutputListener> outmodes;
036
037        /**
038         * Information for the {@link StatusConsumer} instances
039         * @param stats
040         * @param globalStats
041         * @param outputLocation
042         * @param outmodes
043         */
044        public DownloadBolt(boolean stats, File globalStats, File outputLocation,List<OutputListener> outmodes) {
045                this.stats = stats;
046                this.globalStats = globalStats;
047                this.outputLocation = outputLocation;
048                this.outmodes = outmodes;
049        }
050
051        @SuppressWarnings("rawtypes")
052        @Override
053        public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
054                this.collector = collector;
055                factory = new z_T4JInternalJSONImplFactory(null);
056
057        }
058
059        @Override
060        public void execute(Tuple input) {
061                ReadableWritableJSON json = (ReadableWritableJSON) input.getValue(0);
062                StatusConsumer consumer = new StatusConsumer(stats,globalStats,outputLocation,outmodes);
063                try {
064                        consumer.consume(factory.createStatus(new JSONObject(json)));
065                        collector.ack(input);
066                } catch (Exception e) {
067                        collector.fail(input);
068                }
069        }
070
071        @Override
072        public void cleanup() {
073        }
074
075        @Override
076        public void declareOutputFields(OutputFieldsDeclarer declarer) {
077        }
078
079        @Override
080        public Map<String, Object> getComponentConfiguration() {
081                return null;
082        }
083
084}