1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @Value("${rocketmq.name-server}") private String namesrv;
@Override public void registerOrderListener(String consumerGroups, String topic, String tag, MqHandler handler) { try { String mapKey = consumerGroups + topic + tag; if (consumerMap.containsKey(mapKey)) { throw new SystemException(GlobalError.MQ_CONSUMER_REPEAT_ERROR.code, GlobalError.MQ_CONSUMER_REPEAT_ERROR.msg); } DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroups, true); consumer.setNamesrvAddr(namesrv);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe(topic, tag); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt message : msgs) { long time = System.currentTimeMillis(); String key = message.getKeys(); String tag = message.getTags(); String msgid = message.getMsgId(); String content = null; try { content = new String(message.getBody(), "UTF-8"); } catch (UnsupportedEncodingException e) { log.info("MQ注册消费者失败:{}", e.getMessage()); throw new SystemException(GlobalError.MQ_CONSUMER_ERROR.code, GlobalError.MQ_CONSUMER_ERROR.msg); } log.info("MQ消息key:{} tag:{} msgid:{} content:{}", key, tag, msgid, content); MqStatus status = handler.methodT(content); if (!MqStatus.SUCCESS.equals(status)) { log.info("MQ消息key:{} 消费结束:{},耗时:{}ms", key, status, (System.currentTimeMillis() - time)); throw new SystemException(GlobalError.MQ_CONSUMER_ERROR.code, GlobalError.MQ_CONSUMER_ERROR.msg); } } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); consumerMap.put(mapKey, consumer); } catch (MQClientException e) { log.info("MQ注册消费者失败:{}", e.getErrorMessage()); throw new SystemException(GlobalError.MQ_CONSUMER_INIT_ERROR.code, GlobalError.MQ_CONSUMER_INIT_ERROR.msg); }
}
|