| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
hwl0420
10年前发布

MQ接收队列到本地文件的Java代码

MQ接收队列到本地文件
</div>

 

MQFileReceiver.java     

package com.mq.dpca.file;    import java.io.File;  import java.io.FileOutputStream;    import com.ibm.mq.MQEnvironment;  import com.ibm.mq.MQException;  import com.ibm.mq.MQGetMessageOptions;  import com.ibm.mq.MQMessage;  import com.ibm.mq.MQQueue;  import com.ibm.mq.MQQueueManager;  import com.ibm.mq.constants.MQConstants;  import com.mq.dpca.msg.MQConfig;  import com.mq.dpca.util.ReadCmdLine;  import com.mq.dpca.util.RenameUtil;    /**   *    * MQ分组接收文件功能   * 主动轮询   */  public class MQFileReceiver {   private MQQueueManager qmgr; // 连接到队列管理器     private MQQueue inQueue; // 传输队列     private String queueName = ""; // 队列名称     private String host = ""; //     private int port = 1414; // 侦听器的端口号     private String channel = ""; // 通道名称     private String qmgrName = ""; // 队列管理器     private MQMessage inMsg; // 创建消息缓冲     private MQGetMessageOptions gmo; // 设置获取消息选项     private static String fileName = null; // 接收队列上的消息并存入文件     private int ccsid = 0;     private static String file_dir = null;     /**    * 程序的入口    *     * @param args    */   public static void main(String args[]) {    MQFileReceiver mfs = new MQFileReceiver();    //初始化连接    mfs.initproperty();    //接收文件    mfs.runGoupReceiver();    //获取shell脚本名  //  String shellname = MQConfig.getValueByKey(fileName);  //  if(shellname!=null&&!"".equals(shellname)){  //   //调用shell  //   ReadCmdLine.callShell(shellname);  //  }else{  //   System.out.println("have no shell name,Only receive files.");  //  }     }     public void runGoupReceiver() {    try {     init();     getGroupMessages();     qmgr.commit();     System.out.println("\n Messages successfully Receive ");    } catch (MQException mqe) {     mqe.printStackTrace();     try {      System.out.println("\n Backing out Transaction ");      qmgr.backout();      System.exit(2);     } catch (Exception e) {      e.printStackTrace();      System.exit(2);     }    } catch (Exception e) {     e.printStackTrace();     System.exit(2);    }   }     /**    * 初始化服务器连接信息    *     * @throws Exception    */   private void init() throws Exception {    /* 为客户机连接设置MQEnvironment属性 */    MQEnvironment.hostname = host;    MQEnvironment.channel = channel;    MQEnvironment.port = port;      /* 连接到队列管理器 */    qmgr = new MQQueueManager(qmgrName);      /* 设置队列打开选项以输 */    int opnOptn = MQConstants.MQOO_INPUT_AS_Q_DEF      | MQConstants.MQOO_FAIL_IF_QUIESCING;      /* 打开队列以输 */    inQueue = qmgr.accessQueue(queueName, opnOptn, null, null, null);   }     /**    * 接受文件的主函数    *     * @throws Exception    */   public void getGroupMessages() {    /* 设置获取消息选项 */    gmo = new MQGetMessageOptions();    gmo.options = MQConstants.MQGMO_FAIL_IF_QUIESCING;    gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT;    /* 等待消息 */    gmo.options = gmo.options + MQConstants.MQGMO_WAIT;    /* 设置等待时间限制 */    gmo.waitInterval = 5000;    /* 只获取消息 */    gmo.options = gmo.options + MQConstants.MQGMO_ALL_MSGS_AVAILABLE;    /* 以辑顺序获取消息 */    gmo.options = gmo.options + MQConstants.MQGMO_LOGICAL_ORDER;    gmo.matchOptions = MQConstants.MQMO_MATCH_GROUP_ID;    /* 创建消息缓冲 */    inMsg = new MQMessage();    try {     FileOutputStream fos = null;     /* 处理组消息 */     while (true) {      try {       inQueue.get(inMsg, gmo);       if (fos == null) {        try {         fileName = inMsg.getStringProperty("fileName");         String fileName_full = null;         fileName_full = file_dir + RenameUtil.rename(fileName);         fos = new FileOutputStream(new File(fileName_full));         int msgLength = inMsg.getMessageLength();         byte[] buffer = new byte[msgLength];         inMsg.readFully(buffer);         fos.write(buffer, 0, msgLength);         /* 查看是否是最后消息标识 */         char x = gmo.groupStatus;         if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) {          System.out.println("Last Msg in Group");          break;         }         inMsg.clearMessage();          } catch (Exception e) {         System.out           .println("Receiver the message without property,do nothing!");         inMsg.clearMessage();        }       } else {        int msgLength = inMsg.getMessageLength();        byte[] buffer = new byte[msgLength];        inMsg.readFully(buffer);        fos.write(buffer, 0, msgLength);        /* 查看是否是最后消息标识 */        char x = gmo.groupStatus;        if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) {         System.out.println("Last Msg in Group");         break;        }        inMsg.clearMessage();       }      } catch (Exception e) {       char x = gmo.groupStatus;       if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) {        System.out.println("Last Msg in Group");       }       break;      }     }     if (fos != null)      fos.close();    } catch (Exception e) {     System.out.println(e.getMessage());    }   }     public void initproperty() {    MQConfig config = new MQConfig().getInstance();    if (config.getMQ_MANAGER() != null) {     qmgrName = config.getMQ_MANAGER();     queueName = config.getMQ_QUEUE_NAME();     channel = config.getMQ_CHANNEL();     host = config.getMQ_HOST_NAME();     port = Integer.valueOf(config.getMQ_PROT());     ccsid = Integer.valueOf(config.getMQ_CCSID());     file_dir = config.getFILE_DIR();    }   }  }