multithreading - How to implement java production grade RabbitMQ consumer -
multithreading - How to implement java production grade RabbitMQ consumer -
i having problem rabbitmq work queue implementation. im current running in tomcat, , have next class listerning new task in queue. after day or two, sudden behaving strangely, object deliveryok homecoming by channel.queuedeclare(taskqueuename, isdurable, false, false, null); zero. (i print out in log below mentioning "current poolsize").
but in rabbit admin (./rabbitmqadmin list queues or rabbitmq admin portal) homecoming number greater 0 (say 1267 messages in queue). , not cut down 0 until restart tomcat, class below able observe there messages in queue.
initially thought class terminated somehow, able consume messages newly arrive. not consume 1267 messages left hanging within queue. illustration messages 1267 in queue, not consume until restart tomcat.
from code below, because buggy implementation or there improve way implement queue consumer rabbitmq? have read related stack post(producer/consumer threads using queue), im not sure if helps.
also, true consumer implementation below not survive runtimeexception?
mqconsumer class:
@service public class mqconsumer implements runnable{ private static final logger logger = loggerfactory.getlogger(mqconsumer.class); private final int max_alert_threshold = 10000; @autowired private asynchsystemconnections asynchsystemconnections; public mqconsumer(){ } @postconstruct private void init() { (new thread(new mqconsumer(asynchsystemconnections))).start(); } public mqconsumer(asynchsystemconnections asynchsystemconnections){ this.asynchsystemconnections = asynchsystemconnections; } @override public void run() { logger.info("execute consumer instance..."); while (true) { // infinite loop until die due server restart boolean tosleep = consume(asynchsystemconnections); if (tosleep){ logger.error("sleeping 1 second..."); seek { thread.sleep(1000); } grab (interruptedexception e) { logger.error("", e); } } } } private boolean consume(asynchsystemconnections asynchsystemconnections) { com.rabbitmq.client.connection mqconnection = null; channel mqchannel = null; datasiftmq dmq = null; seek { connectionfactory mill = new connectionfactory(); factory.sethost(asynchsystemconnections.getmqserverhost()); mqconnection = factory.newconnection(); mqchannel = mqconnection.createchannel(); //consumepushinteractionjob method forwards asynchtwservice.consume(connection, channel, asynchtwservice.push_interaction_queue ) dmq = asynchsystemconnections.getasynchservice().consumepushinteractionjob(mqconnection, mqchannel); int poolsize = asynchsystemconnections.getasynchservice().getpushinteractionqueuesize(); logger.info("current poolsize: " + poolsize); } catch(nullpointerexception e) { logger.error("", e); if (dmq != null) { seek { logger.error("removing json with" + dmq.getlogheader(dmq)); asynchsystemconnections.getasynchservice().ack(mqchannel, dmq.getdelivery()); logger.error("removed json with" + dmq.getlogheader(dmq)); } grab (ioexception e1) { logger.error("remove json failed: ", e); } } homecoming true; } grab (ioexception e) { logger.error("unable create new mq connection factory.", e); homecoming true; } grab (interruptedexception e) { logger.error("", e); homecoming true; } grab (classnotfoundexception e) { logger.error("", e); homecoming true; } grab (exception e) { logger.error("big problem, improve solve fast!!", e); asynchsystemconnections.getnotificationservice().notifysystemexception(null, e); homecoming true; } { seek { asynchsystemconnections.getasynchservice().ack(mqchannel, dmq.getdelivery()); asynchsystemconnections.getasynchservice().disconnect(mqconnection, mqchannel); } grab (ioexception e) { logger.error("", e); } } homecoming false; } asynchtwservice class:
@service("asynchtwservice") public class asynchtwservice implements asynchservice { static final string favourite_count_queue = "favourite_count_queue"; static final string friends_followers_queue = "friends_followers_queue"; static final string direct_message_receive_queue = "direct_message_receive_queue"; static final string push_interaction_queue = "push_interaction_queue"; private static string mqserverhost; private static final logger logger = loggerfactory.getlogger(asynchtwservice.class); private static final boolean isdurable = true; private boolean autoack = false; private concurrenthashmap<string, integer> currentqueuesize = new concurrenthashmap<string, integer>(); @override public connection getconnection() throws ioexception{ connectionfactory mill = new connectionfactory(); factory.sethost(mqserverhost); homecoming factory.newconnection(); } @override public void produce(connection connection, channel channel, object object, string taskqueuename) throws ioexception { sendtoqueue(connection, channel, object, taskqueuename); } @override public queueitem consume(connection connection, channel channel, string taskqueuename) throws ioexception, interruptedexception, classnotfoundexception{ serializer serializer = new serializer(); seek { delivery delivery = listenfromqueue(connection, channel, taskqueuename); object messageobj = serializer.toobject(delivery.getbody()); queueitem queueitem = (queueitem)messageobj; queueitem.setdelivery(delivery); homecoming queueitem; } grab (interruptedexception e) { throw e; } grab (classnotfoundexception e) { logger.error("unable serialize message queueitem object", e); throw e; } } @override public int getqueuesize(string taskqueuename){ homecoming this.currentqueuesize.get(taskqueuename); } private delivery listenfromqueue(connection connection, channel channel, string taskqueuename) throws ioexception, interruptedexception, classnotfoundexception{ seek { declareok ok = channel.queuedeclare(taskqueuename, isdurable, false, false, null); currentqueuesize.put(taskqueuename, ok.getmessagecount()); logger.info("queue ("+ taskqueuename + ") has items: " +ok.getmessagecount()); } grab (ioexception e) { // todo auto-generated grab block } logger.info(" [*] consuming "+taskqueuename+" message..."); queueingconsumer consumer = new queueingconsumer(channel); seek { channel.basicconsume(taskqueuename, autoack, consumer); } grab (ioexception e) { logger.error("", e); } seek { queueingconsumer.delivery delivery = consumer.nextdelivery(); homecoming delivery; } grab (shutdownsignalexception e) { logger.error("unable retrieve message queue", e); throw e; } grab (consumercancelledexception e) { logger.error("unable retrieve message queue", e); throw e; } grab (interruptedexception e) { logger.error("unable retrieve message queue", e); throw e; } } private void sendtoqueue(connection connection, channel channel, object object, string taskqueuename) throws ioexception{ //initialization, create message queue broker connection try{ channel.queuedeclare(taskqueuename, isdurable, false, false, null); }catch(ioexception e) { logger.error(e.getmessage()); logger.error("error create message queue connection queue name:" + taskqueuename, e); throw e; } //send message broker seek { long start = system.currenttimemillis(); serializer serializer = new serializer(); logger.info("sending twitter queueitem message queue..."); channel.basicpublish("", taskqueuename, messageproperties.persistent_text_plain, serializer.tobytes(object)); logger.info("queue sent, process took: " + (system.currenttimemillis()-start)+ "ms"); } grab (ioexception e) { logger.error("error while sending object queue : " + taskqueuename, e); throw e; } } public static string getmqserverhost() { homecoming mqserverhost; } public static void setmqserverhost(string mqserverhost) { asynchtwservice.mqserverhost = mqserverhost; } @override public void disconnect(connection connection, channel channel) throws ioexception{ seek { if (channel != null){ if (channel.isopen()){ channel.close(); } } if (connection != null){ if (connection.isopen()){ connection.close(); } } logger.debug("mq channel disconnected"); } grab (ioexception e) { throw e; } } @override public void ack(channel channel, queueingconsumer.delivery delivery) throws ioexception { // made method phone call avoid ack fast united nations intentionally seek { channel.basicack(delivery.getenvelope().getdeliverytag(), false); logger.info("[x] acked" ); } grab (ioexception e) { logger.error("unable acknowledge queue message", e); throw e; } } @override public datasiftmq consumedatasiftinteraction(connection connection, channel channel, string taskqueuename) throws ioexception, interruptedexception, classnotfoundexception { serializer serializer = new serializer(); seek { delivery delivery = listenfromqueue(connection, channel, taskqueuename); object messageobj = serializer.toobject(delivery.getbody()); datasiftmq dto = (datasiftmq)messageobj; dto.setdelivery(delivery); homecoming dto; } grab (interruptedexception e) { throw e; } grab (classnotfoundexception e) { logger.error("unable serialize message datasiftdto object", e); throw e; } } @override public void requeue(channel channel, delivery delivery) throws ioexception { seek { channel.basicnack(delivery.getenvelope().getdeliverytag(), false, true); logger.info("[x] nacked" ); } grab (ioexception e) { logger.error("unable acknowledge queue message", e); throw e; } } }
seems missing basics here.
taken here , code of mine. setting connection outside of consumer thread:
//executed 1 time connectionfactory mill = new connectionfactory(); factory.sethost("somehost"); factory.setusername("user"); factory.setpassword("pass"); connection connection = factory.newconnection(); what have within thread:
//consumer - executed in thread queueingconsumer consumer = new queueingconsumer(connection.createchannel()); boolean autoack = false; channel.basicconsume("hello", autoack, consumer); while (!thread.current().isinterrupted())) { queueingconsumer.delivery delivery = consumer.nextdelivery(); //... channel.basicack(delivery.getenvelope().getdeliverytag(), false); } in general still recommand check out spring-amqp library integrates perfectly.
java multithreading tomcat rabbitmq
Comments
Post a Comment