
本文链接:
安装和配置的内容,可参考
下面是安装配置后的图片:


根据通道是本方还是我方,启用相应的通道(即服务)。

远程队列_remote发送到对方的本地队列_local。

比如ibm websphere mq搭建,在abc_remote上放入消息“123”,在cz_local队列上能够获得“123”。
更具体的理解请点击
import grp.pt.util.StringUtil;
import org.apache.log4j.Logger;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.MQSimpleConnectionManager;
import com.river.common.UploadFileUtil;
/**
* MQ配置 连接信息
*/
public class MqHandler {
private static Logger log = Logger.getLogger(MqHandler.class);
private static String MQ_IP = UploadFileUtil
.getFromPro("mqconfig", "MQ_IP");
private static int MQ_PORT = Integer.parseInt(UploadFileUtil.getFromPro(
"mqconfig", "MQ_PORT"));
// 队列管理器
private static String MQ_QMANAGER = UploadFileUtil.getFromPro("mqconfig",
"MQ_QMANAGER");
// mq服务方通道
private static String MQ_CHANNEL = UploadFileUtil.getFromPro("mqconfig",
"MQ_CHANNEL");
// mq通讯编码集
private static int MQ_CCSID = Integer.parseInt(UploadFileUtil.getFromPro(
"mqconfig", "MQ_CCSID"));
// mq请求方发送队列
private static String MQ_CLENT_SENDQUEUE = UploadFileUtil.getFromPro(
"mqconfig", "MQ_CLENT_SENDQUEUE");
// mq请求方接收队列
private static String MQ_CLENT_RECQUEUE = UploadFileUtil.getFromPro(
"mqconfig", "MQ_CLENT_RECQUEUE");
// MQ自带连接池
private static MQSimpleConnectionManager myConnMan = null;
// 异步长连接接收队列管理器
private MQQueueManager queueManager = null;
// 异步发送队列
private MQQueue SendQueue = null;
// 异步接收队列
public MQQueue ReceiveQueue = null;
/**
* 构造方法
*/
public MqHandler() throws Exception {
init();
}
/**
* 异步发送消息
*
* @param msg
* @param messageId
* @param charSet
* @throws Exception
*/
public void sendMsg(String msg, String messageId, String charSet)
throws Exception {
try {
if (StringUtil.isEmpty(charSet)) {
charSet = "GBK";
}
byte[] msgId = StringUtil.isEmpty(messageId) ? null : messageId
.getBytes(charSet);
// 打开队列
queueManager = new MQQueueManager(MQ_QMANAGER, myConnMan);
SendQueue = queueManager.accessQueue(MQ_CLENT_SENDQUEUE,
MQC.MQOO_OUTPUT | MQC.MQPMO_NEW_MSG_ID
| MQC.MQOO_FAIL_IF_QUIESCING, null, null, null);
// 初始化消息选项
MQPutMessageOptions pmo = new MQPutMessageOptions();
// 确保每次发送前为消息自动生成唯一的msgId
pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID;
// 如果设置了该参数,则发送后必须调用commit功能。否则无法将消息发送出
pmo.options = pmo.options + MQC.MQPMO_SYNCPOINT;
// 创建消息对象
MQMessage outMsg = new MQMessage();
// 设置MQMD格式字段
outMsg.format = MQC.MQFMT_STRING;
outMsg.messageId = msgId == null ? MQC.MQMI_NONE : msgId;
outMsg.encoding = MQ_CCSID;
outMsg.characterSet = MQ_CCSID;
// 消息发送时必须以字节流的方式发送
outMsg.write(msg.getBytes(charSet));
// 在队列上放置消息
SendQueue.put(outMsg, pmo);
// 和MQC.MQPMO_SYNCPOINT属性对应。如果设置了该属性,则发送后需要提交。
queueManager.commit();
} finally {
close(SendQueue);
}
}
/**
* 接收消息
* @param correlationId
* @param charSet
* @return
* @throws Exception
*/
public String receive(String correlationId, String charSet)
throws Exception {
try {
if (StringUtil.isEmpty(charSet)) {
charSet = "GBK";
}
queueManager = new MQQueueManager(MQ_QMANAGER, myConnMan);
ReceiveQueue = queueManager.accessQueue(MQ_CLENT_RECQUEUE,
MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE
| MQC.MQOO_FAIL_IF_QUIESCING);
MQMessage respMessage = new MQMessage();
// 设置MQMD 格式字段
respMessage.format = MQC.MQFMT_STRING;
// 设置编码格式与MQ服务一致
respMessage.encoding = MQ_CCSID;
// 设置字符集与MQ服务一致
respMessage.characterSet = MQ_CCSID;
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_WAIT; // 如果设置了该参数,则当前线程将阻塞,直到等到回复的消息或超时
gmo.waitInterval = MQC.MQWI_UNLIMITED;
if (StringUtil.isNotEmpty(correlationId)) {
// 如果设置了该参数,则根据消息的correlId去匹配对应的响应消息
gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
respMessage.correlationId = correlationId.getBytes(charSet);
}
ReceiveQueue.get(respMessage, gmo);
byte[] msgBuffer = new byte[respMessage.getMessageLength()];
respMessage.readFully(msgBuffer);
String respMsg = new String(msgBuffer, charSet);
return respMsg;
} finally {
close(ReceiveQueue);
}
}
/**
* 初始化连接
*
* @param type
* @throws MQException
*/
private void init() throws MQException {
MQEnvironment.hostname = MQ_IP;
MQEnvironment.port = MQ_PORT;
MQEnvironment.CCSID = MQ_CCSID;
MQEnvironment.channel = MQ_CHANNEL;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES_CLIENT);
myConnMan = new MQSimpleConnectionManager();
myConnMan.setActive(MQSimpleConnectionManager.MODE_AUTO);
myConnMan.setTimeout(3600000);
myConnMan.setMaxConnections(75);
myConnMan.setMaxUnusedConnections(50);
MQEnvironment.setDefaultConnectionManager(myConnMan);
log.info("初始化队列管理器receiverQueueManager....." + MQ_QMANAGER);
}
private void close(MQQueue queue) {
try {
if (queue != null && queue.isOpen) {
queue.close();
}
if (queueManager != null) {
queueManager.disconnect();
}
} catch (MQException ex) {
log.error("", ex);
}
}
}
网上类似的代码太多,下面对代码中的细节进行解释。

初始化
MQEnvironment的静态属性:hostname、port、CCSID等初始化后,MQQueueManager利用这种属性来构造实例对象。
MQSimpleConnectionManager是简单的连接池,当设定了连接池后MQEnvironment.setDefaultConnectionManager(myConnMan)ibm websphere mq搭建,再使用close方法时,并不会破坏连接,而是归还池中。
连接池详细内容可以参考官网的API

消息参数
更多参数参见官方表明
import org.jfree.util.Log;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.ctjsoft.treasury.util.MqHandler;
@RunWith(JUnit4.class)
public class MqTest {
@Test
public void mqSend(){
try {
MqHandler mq = new MqHandler();
mq.sendMsg("sq--send1", null, "gbk");
mq.sendMsg("sq--send2", null, "gbk");
mq.sendMsg("sq--send3", null, "gbk");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void mqReceive(){
try {
MqHandler mq = new MqHandler();
String msg = mq.receive(null, "gbk");
System.out.println(msg);
String msg1 = mq.receive(null, "gbk");
System.out.println(msg1);
String msg2 = mq.receive(null, "gbk");
System.out.println(msg2);
System.out.println("任务完成");
} catch (Exception e) {
Log.error("", e);
}
}
}
测试过程中发生的问题举例:
感觉有用请点赞
本文来自电脑杂谈,转载请注明本文网址:
http://www.pc-fly.com/a/jisuanjixue/article-121212-1.html
英雄所见略同
奶粉是干的不
说糟糕点的话虫子能密封不透气的袋子存活吗