java - How to run topology on storm cluster? I cant see the output log -
java - How to run topology on storm cluster? I cant see the output log -
i'm trying execute topology in cluster. wrote topology , compile jar , registered cluster. reason looks topology not running. want utilize storm here pipeline. registering cmd: ./storm jar /tmp/storm_test.jar storm.topology.mytopology /tmp/bigfile.log
topology:
package storm.topology; import storm.spouts.linereaderspout; import backtype.storm.config; import backtype.storm.stormsubmitter; import backtype.storm.topology.topologybuilder; import storm.bolts.bolt; public class mytopology { public static long tuplecounter = 0; public static long endtime = 0; public static void main(string[] args) throws exception { config config = new config(); config.put("inputfile", args[0]); config.put(config.topology_workers, 4); config.setdebug(true); config.put(config.topology_max_spout_pending, 10); topologybuilder builder = new topologybuilder(); builder.setspout("line-reader-spout", new linereaderspout()); builder.setbolt("bolta", new bolt()).shufflegrouping("line-reader-spout"); stormsubmitter.submittopology("mytopology", config, builder.createtopology()); } }
bolt:
package storm.bolts; import backtype.storm.task.outputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.irichbolt; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.tuple.tuple; import java.io.filenotfoundexception; import java.io.printwriter; import java.io.unsupportedencodingexception; import storm.topology.mytopology2; import storm.spouts.linereaderspout; import java.text.numberformat; import java.util.hashmap; import java.util.locale; import java.util.map; import java.util.logging.level; import java.util.logging.logger; public class bolt implements irichbolt { integer id; string name; static long totaltime; map<string, integer> counters; private outputcollector collector; @override public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.counters = new hashmap<string, integer>(); this.collector = collector; this.name = context.getthiscomponentid(); this.id = context.getthistaskid(); } @override public void execute(tuple input) { //string str = input.getstring(0); mytopology2.tuplecounter++; if (input.getstring(0).contains("end")) { mytopology.endtime = system.nanotime(); system.out.println("===================================================="); system.out.println("number of tuples: " + mytopology.tuplecounter); totaltime = mytopology.endtime - linereaderspout.starttime; double tuplepersec = mytopology.tuplecounter / (totaltime / 1000000000d); system.out.println("test results: " + numberformat.getnumberinstance(locale.us).format(tuplepersec) + "tuple/sec"); totaltime = mytopology2.endtime - linereaderspout.star`enter code here`ttime; system.out.println("total run time: " + totaltime + " nsec"); system.out.println("===================================================="); printwriter writer; seek { author = new printwriter("/tmp/storm_results.log", "utf-8"); writer.println("number of tuples: " + mytopology.tuplecounter); writer.println("test results: " + numberformat.getnumberinstance(locale.us).format(tuplepersec) + "tuple/sec"); writer.println("total run time: " + totaltime + " nsec"); writer.println("===================================================="); writer.close(); } grab (filenotfoundexception ex) { logger.getlogger(testbolt.class.getname()).log(level.severe, null, ex); } grab (unsupportedencodingexception ex) { logger.getlogger(testbolt.class.getname()).log(level.severe, null, ex); } } } @override public void cleanup() { } @override public void declareoutputfields(outputfieldsdeclarer declarer) { // todo auto-generated method stub } @override public map<string, object> getcomponentconfiguration() { // todo auto-generated method stub homecoming null; } }
spout:
package storm.spouts; import java.io.bufferedreader; import java.io.filenotfoundexception; import java.io.filereader; import java.io.ioexception; import java.util.map; import backtype.storm.spout.spoutoutputcollector; import backtype.storm.task.topologycontext; import backtype.storm.topology.irichspout; import backtype.storm.topology.outputfieldsdeclarer; import backtype.storm.tuple.fields; import backtype.storm.tuple.values; public class linereaderspout implements irichspout { private spoutoutputcollector collector; private filereader filereader; private boolean completed = false; private topologycontext context; public static long starttime; @override public void open(map conf, topologycontext context, spoutoutputcollector collector) { seek { this.context = context; this.filereader = new filereader(conf.get("inputfile").tostring()); starttime = system.nanotime(); } grab (filenotfoundexception e) { throw new runtimeexception("error reading file " + conf.get("inputfile")); } this.collector = collector; } @override public void nexttuple() { if (completed) { } string str; bufferedreader reader = new bufferedreader(filereader); seek { while ((str = reader.readline()) != null) { this.collector.emit(new values(str), str); } } grab (exception e) { throw new runtimeexception("error reading typle", e); } { completed = true; } } @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("line")); } @override public void close() { seek { filereader.close(); } grab (ioexception e) { // todo auto-generated grab block e.printstacktrace(); } } public boolean isdistributed() { homecoming false; } @override public void activate() { // todo auto-generated method stub } @override public void deactivate() { // todo auto-generated method stub } @override public void ack(object msgid) { } @override public void fail(object msgid) { } @override public map<string, object> getcomponentconfiguration() { homecoming null; } }
you can utilize local cluster , submit topology it, way don't need set production cluster on remote machine, code looks this:
localcluster cluster = new localcluster(); cluster.submittopology("mytopology", conf, builder.createtopology());
java cloud bigdata apache-storm
Comments
Post a Comment