001package org.openimaj.picslurper; 002 003import java.io.File; 004import java.util.List; 005import java.util.Map; 006 007import org.openimaj.picslurper.output.OutputListener; 008import org.openimaj.twitter.collection.StreamJSONStatusList.ReadableWritableJSON; 009 010import twitter4j.Status; 011import twitter4j.internal.json.z_T4JInternalJSONImplFactory; 012import twitter4j.internal.org.json.JSONObject; 013import backtype.storm.task.OutputCollector; 014import backtype.storm.task.TopologyContext; 015import backtype.storm.topology.IRichBolt; 016import backtype.storm.topology.OutputFieldsDeclarer; 017import backtype.storm.tuple.Tuple; 018 019/** 020 * A download bolt instantiates {@link StatusConsumer} on {@link Status} recieved 021 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 022 * 023 */ 024public class DownloadBolt implements IRichBolt { 025 026 /** 027 * 028 */ 029 private static final long serialVersionUID = -6459004768597784620L; 030 private boolean stats; 031 private File globalStats; 032 private File outputLocation; 033 private OutputCollector collector; 034 private z_T4JInternalJSONImplFactory factory; 035 private List<OutputListener> outmodes; 036 037 /** 038 * Information for the {@link StatusConsumer} instances 039 * @param stats 040 * @param globalStats 041 * @param outputLocation 042 * @param outmodes 043 */ 044 public DownloadBolt(boolean stats, File globalStats, File outputLocation,List<OutputListener> outmodes) { 045 this.stats = stats; 046 this.globalStats = globalStats; 047 this.outputLocation = outputLocation; 048 this.outmodes = outmodes; 049 } 050 051 @SuppressWarnings("rawtypes") 052 @Override 053 public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) { 054 this.collector = collector; 055 factory = new z_T4JInternalJSONImplFactory(null); 056 057 } 058 059 @Override 060 public void execute(Tuple input) { 061 ReadableWritableJSON json = (ReadableWritableJSON) input.getValue(0); 062 StatusConsumer consumer = new StatusConsumer(stats,globalStats,outputLocation,outmodes); 063 try { 064 consumer.consume(factory.createStatus(new JSONObject(json))); 065 collector.ack(input); 066 } catch (Exception e) { 067 collector.fail(input); 068 } 069 } 070 071 @Override 072 public void cleanup() { 073 } 074 075 @Override 076 public void declareOutputFields(OutputFieldsDeclarer declarer) { 077 } 078 079 @Override 080 public Map<String, Object> getComponentConfiguration() { 081 return null; 082 } 083 084}