001/** 002 * Copyright (c) 2012, The University of Southampton and the individual contributors. 003 * All rights reserved. 004 * 005 * Redistribution and use in source and binary forms, with or without modification, 006 * are permitted provided that the following conditions are met: 007 * 008 * * Redistributions of source code must retain the above copyright notice, 009 * this list of conditions and the following disclaimer. 010 * 011 * * Redistributions in binary form must reproduce the above copyright notice, 012 * this list of conditions and the following disclaimer in the documentation 013 * and/or other materials provided with the distribution. 014 * 015 * * Neither the name of the University of Southampton nor the names of its 016 * contributors may be used to endorse or promote products derived from this 017 * software without specific prior written permission. 018 * 019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR 023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 029 */ 030package org.openimaj.storm.util.graph; 031 032import java.util.HashMap; 033import java.util.Map; 034import java.util.Map.Entry; 035 036import org.jgrapht.DirectedGraph; 037import org.jgrapht.graph.DefaultEdge; 038import org.jgrapht.graph.ListenableDirectedGraph; 039import org.openimaj.storm.util.graph.StormGraphCreator.NamingStrategy.DefaultNamingStrategy; 040 041import backtype.storm.generated.Bolt; 042import backtype.storm.generated.ComponentCommon; 043import backtype.storm.generated.GlobalStreamId; 044import backtype.storm.generated.Grouping; 045import backtype.storm.generated.SpoutSpec; 046import backtype.storm.generated.StormTopology; 047 048/** 049 * Create {@link DirectedGraph} instances from {@link StormTopology} instances 050 * 051 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 052 * 053 */ 054public class StormGraphCreator { 055 /** 056 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 057 * 058 */ 059 public interface NamingStrategy { 060 /** 061 * @param name 062 * the id of a {@link ComponentCommon} 063 * @return the name to render 064 */ 065 public String name(String name); 066 067 /** 068 * Uses id as name 069 * 070 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 071 * 072 */ 073 public static class DefaultNamingStrategy implements NamingStrategy { 074 075 @Override 076 public String name(String name) { 077 return name; 078 } 079 080 } 081 082 /** 083 * Names {@link ComponentCommon} as A, B, C... and provides a lookup map 084 * 085 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 086 * 087 */ 088 public static class AlphabeticNamingStrategy implements NamingStrategy { 089 char currentLetter = 'A'; 090 /** 091 * maps names to their original name 092 */ 093 public Map<String, String> lookup = new HashMap<String, String>(); 094 private Map<String, String> seen = new HashMap<String, String>(); 095 096 @Override 097 public String name(String name) { 098 if (!seen.containsKey(name)) { 099 String newName = new String(new char[] { currentLetter++ }); 100 seen.put(name, newName); 101 lookup.put(newName, name); 102 } 103 return seen.get(name); 104 } 105 106 } 107 } 108 109 enum Type { 110 SPOUT, BOLT; 111 } 112 113 private HashMap<String, NamedNode> nns; 114 private NamingStrategy namingStrategy; 115 116 private StormGraphCreator() { 117 nns = new HashMap<String, NamedNode>(); 118 namingStrategy = new DefaultNamingStrategy(); 119 } 120 121 private StormGraphCreator(NamingStrategy strat) { 122 nns = new HashMap<String, NamedNode>(); 123 namingStrategy = strat; 124 } 125 126 /** 127 * A name and type node 128 * 129 * @author Sina Samangooei (ss@ecs.soton.ac.uk) 130 * 131 */ 132 public class NamedNode { 133 /** 134 * name of the node 135 */ 136 public String name; 137 /** 138 * the node's type 139 */ 140 public Type node; 141 142 /** 143 * @param name 144 * @param node 145 */ 146 public NamedNode(String name, Type node) { 147 this.name = namingStrategy.name(name); 148 this.node = node; 149 } 150 151 @Override 152 public boolean equals(Object other) { 153 if (!(other instanceof NamedNode)) 154 return false; 155 NamedNode that = (NamedNode) other; 156 if (that.node != this.node) 157 return false; 158 return this.hashCode() == that.hashCode(); 159 } 160 161 @Override 162 public int hashCode() { 163 return name.hashCode(); 164 } 165 166 @Override 167 public String toString() { 168 return "<" + name + ">"; 169 } 170 } 171 172 private ListenableDirectedGraph<NamedNode, DefaultEdge> _asGraph(StormTopology t) { 173 Map<String, Bolt> bolts = t.get_bolts(); 174 Map<String, SpoutSpec> spouts = t.get_spouts(); 175 ListenableDirectedGraph<NamedNode, DefaultEdge> ret = new ListenableDirectedGraph<NamedNode, DefaultEdge>(DefaultEdge.class); 176 177 createSpouts(spouts, ret); 178 createBolts(bolts, ret); 179 createConnections(bolts, ret); 180 return ret; 181 } 182 183 private void createConnections(Map<String, Bolt> bolts, ListenableDirectedGraph<NamedNode, DefaultEdge> ret) { 184 for (Entry<String, Bolt> boltspec : bolts.entrySet()) { 185 Bolt bolt = boltspec.getValue(); 186 String id = boltspec.getKey(); 187 Map<GlobalStreamId, Grouping> inputs = bolt.get_common().get_inputs(); 188 for (Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) { 189 GlobalStreamId from = input.getKey(); 190 // Grouping grouping = input.getValue(); 191 String fromId = from.get_componentId(); 192 // String streamId = from.get_streamId(); 193 ret.addEdge(nns.get(fromId), nns.get(id)); 194 } 195 196 } 197 } 198 199 private void createSpouts(Map<String, SpoutSpec> spouts, ListenableDirectedGraph<NamedNode, DefaultEdge> ret) { 200 for (Entry<String, SpoutSpec> spoutEntries : spouts.entrySet()) { 201 String name = spoutEntries.getKey(); 202 if (!nns.containsKey(name)) 203 nns.put(name, new NamedNode(name, Type.SPOUT)); 204 ret.addVertex(nns.get(name)); 205 } 206 } 207 208 private void createBolts(Map<String, Bolt> bolts, ListenableDirectedGraph<NamedNode, DefaultEdge> ret) { 209 for (Entry<String, Bolt> boltEntries : bolts.entrySet()) { 210 String name = boltEntries.getKey(); 211 if (!nns.containsKey(name)) 212 nns.put(name, new NamedNode(name, Type.BOLT)); 213 ret.addVertex(nns.get(name)); 214 } 215 } 216 217 /** 218 * @param t 219 * {@link StormTopology} to graph 220 * @return a {@link ListenableDirectedGraph} usable with JGraph 221 */ 222 public static ListenableDirectedGraph<NamedNode, DefaultEdge> asGraph(StormTopology t) { 223 StormGraphCreator creator = new StormGraphCreator(); 224 return creator._asGraph(t); 225 } 226 227 /** 228 * @param t 229 * {@link StormTopology} to graph 230 * @param strat 231 * the naming strategy 232 * @return a {@link ListenableDirectedGraph} usable with JGraph 233 */ 234 public static ListenableDirectedGraph<NamedNode, DefaultEdge> asGraph(StormTopology t, NamingStrategy strat) { 235 StormGraphCreator creator = new StormGraphCreator(strat); 236 return creator._asGraph(t); 237 } 238}