JMS 发送程序代码
javahero1984
2010-04-18
CommandLineSupport.java
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.Arrays; import java.io.IOException; /** * @Author: Vane Company: LinkTrust.com.cn Product: IntraSec Module:(此处由模块说明替换) * Created on 2008-10-9 */ public class JMSTopicListener implements MessageListener { private Connection connection; private Session session; private Topic topic; private static String url; private static String user; private static String pw; private static String jmsTopicMessages; private static String mapString; private static String mapStringDateType; private static String mapStringProperty; private static String compart; public static void main(String[] argv) { try { LogRecorder.Init("jms_log.cfg"); init("JMS.cfg"); JMSTopicListener l = new JMSTopicListener(); String[] unknown = CommandLineSupport.setOptions(l, argv); if (unknown.length > 0) { LogRecorder.GetLog().info( "Unknown options: " + Arrays.toString(unknown)); System.exit(-1); } l.run(); } catch (IOException e) { LogRecorder.GetLog().info(JMSTopicListener.class, e); } catch (JMSException e) { LogRecorder.GetLog().info(JMSTopicListener.class, e); } } public void run() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic(jmsTopicMessages); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(this); connection.start(); LogRecorder.GetLog() .info("connnect successful,waiting for messages..."); } private static boolean checkText(Message m, String s) { try { return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); } catch (JMSException e) { LogRecorder.GetLog().info(JMSTopicListener.class, e); return false; } } public void onMessage(Message message) { try { if (checkText(message, "SHUTDOWN")) { try { connection.close(); } catch (Exception e) { e.printStackTrace(System.out); } } else { StringBuffer out = new StringBuffer(); /*--------------------------------MeptMessage----------------------------------------*/ MapMessage mm = (MapMessage) message; if (mapStringProperty != null && mapStringProperty.length() > 0) { String[] arg1 = mapStringProperty.split(";"); for (int i = 0; i < arg1.length; i++) { String s = arg1[i]; out.append(String.valueOf(mm.getBytes(s))).append( compart); } } if (mapString != null && mapString.length() > 0) { String[] arg2 = mapString.split(";"); String[] dateType = mapStringDateType.split(";"); for (int i = 0; i < arg2.length; i++) { String s = arg2[i]; int type = Integer.parseInt(dateType[i]); if (type == 0) { if (mm.itemExists(s)) { out.append(new String(mm.getBytes(s))).append( compart); } else { out.append(compart); } } else { if (mm.itemExists(s)) { out.append(String.valueOf(mm.getInt(s))) .append(compart); } else { out.append(compart); } } } } System.out.println(out.toString()); LogRecorder.GetLog().debug(out.toString()); } } catch (JMSException e) { LogRecorder.GetLog().info(JMSTopicListener.class, e); } } public void setUrl(String url) { this.url = url; } public static void init(String fileName) throws IOException { PropertyReader prop = new PropertyReader(fileName); url = prop.getProperty("url"); jmsTopicMessages = prop.getProperty("jmsTopicMessages"); mapString = prop.getProperty("mapString"); mapStringDateType = prop.getProperty("mapStringDateType"); mapStringProperty = prop.getProperty("mapStringProperty"); compart = prop.getProperty("compart"); LogRecorder.GetLog().info("jms url:" + url); } } JMSTopicListener.java import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.Arrays; import java.io.IOException; /** * @Author: Vane Company: LinkTrust.com.cn Product: IntraSec Module:(此处由模块说明替换) * Created on 2008-10-9 */ public class JMSTopicListener implements MessageListener { private Connection connection; private Session session; private Topic topic; private static String url; private static String user; private static String pw; private static String jmsTopicMessages; private static String mapString; private static String mapStringDateType; private static String mapStringProperty; private static String compart; public static void main(String[] argv) { try { LogRecorder.Init("jms_log.cfg"); init("JMS.cfg"); JMSTopicListener l = new JMSTopicListener(); String[] unknown = CommandLineSupport.setOptions(l, argv); if (unknown.length > 0) { LogRecorder.GetLog().info( "Unknown options: " + Arrays.toString(unknown)); System.exit(-1); } l.run(); } catch (IOException e) { LogRecorder.GetLog().info(JMSTopicListener.class, e); } catch (JMSException e) { LogRecorder.GetLog().info(JMSTopicListener.class, e); } } public void run() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic(jmsTopicMessages); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(this); connection.start(); LogRecorder.GetLog() .info("connnect successful,waiting for messages..."); } private static boolean checkText(Message m, String s) { try { return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); } catch (JMSException e) { LogRecorder.GetLog().info(JMSTopicListener.class, e); return false; } } public void onMessage(Message message) { try { if (checkText(message, "SHUTDOWN")) { try { connection.close(); } catch (Exception e) { e.printStackTrace(System.out); } } else { StringBuffer out = new StringBuffer(); /*--------------------------------MeptMessage----------------------------------------*/ MapMessage mm = (MapMessage) message; if (mapStringProperty != null && mapStringProperty.length() > 0) { String[] arg1 = mapStringProperty.split(";"); for (int i = 0; i < arg1.length; i++) { String s = arg1[i]; out.append(String.valueOf(mm.getBytes(s))).append( compart); } } if (mapString != null && mapString.length() > 0) { String[] arg2 = mapString.split(";"); String[] dateType = mapStringDateType.split(";"); for (int i = 0; i < arg2.length; i++) { String s = arg2[i]; int type = Integer.parseInt(dateType[i]); if (type == 0) { if (mm.itemExists(s)) { out.append(new String(mm.getBytes(s))).append( compart); } else { out.append(compart); } } else { if (mm.itemExists(s)) { out.append(String.valueOf(mm.getInt(s))) .append(compart); } else { out.append(compart); } } } } System.out.println(out.toString()); LogRecorder.GetLog().debug(out.toString()); } } catch (JMSException e) { LogRecorder.GetLog().info(JMSTopicListener.class, e); } } public void setUrl(String url) { this.url = url; } public static void init(String fileName) throws IOException { PropertyReader prop = new PropertyReader(fileName); url = prop.getProperty("url"); jmsTopicMessages = prop.getProperty("jmsTopicMessages"); mapString = prop.getProperty("mapString"); mapStringDateType = prop.getProperty("mapStringDateType"); mapStringProperty = prop.getProperty("mapStringProperty"); compart = prop.getProperty("compart"); LogRecorder.GetLog().info("jms url:" + url); } } JMSTopicPublisher.java import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.Arrays; import java.io.IOException; import java.io.Serializable; /** * @Author: Vane * Company: LinkTrust.com.cn * Product: IntraSec * Module:(此处由模块说明替换) * Created on 2008-10-8 */ public class JMSTopicPublisher implements MessageListener { private final Object mutex = new Object(); private Connection connection; private Session session; private MessageProducer publisher; private Topic topic; private Topic control; private static String url ; private static int size ; private static int subscribers ; private static int remaining; private static int messages ; private long delay; private static int batch ; private static String jmsTopicMessages; public static void main(String[] argv) throws Exception { init("JMS.cfg"); JMSTopicPublisher p = new JMSTopicPublisher(); String[] unknown = CommandLineSupport.setOptions(p, argv); if (unknown.length > 0) { System.out.println("Unknown options: " + Arrays.toString(unknown)); System.exit(-1); } p.run(); } private void run() throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic(jmsTopicMessages); control = session.createTopic("topictest.control"); publisher = session.createProducer(topic); publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for(int i=0 ; i<1000 ; i++){ System.out.println("================="+i+1+"=================="); MapMessage message = session.createMapMessage(); message.setStringProperty("SENSE", "IBM ITM 6.1.1["+i+"]"); message.setStringProperty("CLASS", "COMPUTERSYSTEM["+i+"]"); message.setString("CPULOAD:NO=0","0.1["+i+"]"); message.setString("CPUFREE:NO=0","0.9["+i+"]"); // TestVO vo = new TestVO(); // vo.setName("name"+i); // vo.setDesc("desc"+i); // vo.setTitle("title"+i); publisher.send(message); //publisher.send(session.createTextMessage("abcdefghijklmnopqistuvwxyz")); } connection.stop(); connection.close(); } public void onMessage(Message message) { synchronized (mutex) { System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining"); if (remaining == 0) { mutex.notify(); } } } Object getReport(Message m) { try { return ((TextMessage)m).getText(); } catch (JMSException e) { e.printStackTrace(System.out); return e.toString(); } } public void setBatch(int batch) { this.batch = batch; } public void setDelay(long delay) { this.delay = delay; } public void setMessages(int messages) { this.messages = messages; } public void setSize(int size) { this.size = size; } public void setSubscribers(int subscribers) { this.subscribers = subscribers; } public void setUrl(String url) { this.url = url; } public static void init(String fileName) throws IOException { PropertyReader prop = new PropertyReader(fileName); url = prop.getProperty("url"); size = Integer.parseInt(prop.getProperty("size")); subscribers = Integer.parseInt(prop.getProperty("subscribers")); messages = Integer.parseInt(prop.getProperty("messages")); batch = Integer.parseInt(prop.getProperty("batch")); jmsTopicMessages = prop.getProperty("jmsTopicMessages"); System.out.println("=============================="); System.out.println(url + " " + size + " " + subscribers +" "+messages +" "+batch); System.out.println("=============================="); } } LogRecorder.java import org.apache.log4j.Logger; /** * @Author: Vane * Company: LinkTrust.com.cn * Product: IntraSec * Module:(此处由模块说明替换) * Created on 2008-10-13 */ public class LogRecorder { private static Logger log; public static void Init(String config) { org.apache.log4j.PropertyConfigurator.configure(config); log = Logger.getLogger(LogRecorder.class); } public static Logger GetLog() { return log; } } PropertyReader--- import java.util.Properties; import java.io.IOException; import java.io.FileInputStream; import java.io.InputStream; /** * @Author: Vane * Company: LinkTrust.com.cn * Product: IntraSec * Module:(此处由模块说明替换) * Created on 2008-10-8 */ public class PropertyReader { protected Properties prop = new Properties(); public PropertyReader(String propertyFile) throws IOException { propertyRead(propertyFile); } protected void propertyRead(String propertyFile) throws IOException { prop.clear(); //InputStream is = getClass().getResourceAsStream(propertyFile); InputStream is = new FileInputStream(propertyFile); prop.load(is); } public String getProperty(String aprop) { return prop.getProperty(aprop).trim(); } } TopicListener.java /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.util.Arrays; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; /** * Use in conjunction with TopicPublisher to test the performance of ActiveMQ * Topics. */ public class TopicListener implements MessageListener { private Connection connection; private MessageProducer producer; private Session session; private int count; private long start; private Topic topic; private Topic control; private String url = "tcp://localhost:61616"; public static void main(String[] argv) throws Exception { TopicListener l = new TopicListener(); String[] unknown = CommandLineSupport.setOptions(l, argv); if (unknown.length > 0) { System.out.println("Unknown options: " + Arrays.toString(unknown)); System.exit(-1); } l.run(); } public void run() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic("topictest.messages"); control = session.createTopic("topictest.control"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(this); connection.start(); producer = session.createProducer(control); System.out.println("Waiting for messages..."); } private static boolean checkText(Message m, String s) { try { return m instanceof TextMessage && ((TextMessage)m).getText().equals(s); } catch (JMSException e) { e.printStackTrace(System.out); return false; } } public void onMessage(Message message) { try { if (checkText(message, "SHUTDOWN")) { try { connection.close(); } catch (Exception e) { e.printStackTrace(System.out); } } else if (checkText(message, "REPORT")) { // send a report: try { long time = System.currentTimeMillis() - start; String msg = "Received " + count + " in " + time + "ms"; producer.send(session.createTextMessage(msg)); } catch (Exception e) { e.printStackTrace(System.out); } count = 0; } else { if (count == 0) { start = System.currentTimeMillis(); } if (++count % 1000 == 0) { BytesMessage txtMsg = (BytesMessage)message; System.out.println("Received =" + txtMsg.readByte()); System.out.println("Received " + count + " messages."); } } } catch (JMSException e) { System.out.println("Caught: " + e); e.printStackTrace(); } } public void setUrl(String url) { this.url = url; } } |
|
javahero1984
2010-04-20
http://fusesource.com/documentation/fuse-service-framework-documentation/---fuse 怎么部署
http://fusesource.com/documentation/fuse-message-broker-documentation/ -- 文档概览 |
相关讨论
相关资源推荐
- C#实现图像储存到sql server 2000 或储存到文件夹及图像自数据库读取
- C#中如何把图片保存入SQL Server
- C# 如何把图片放到sql server数据库中
- 如何在 SQL Server2000 中保存图像及读取图像信息
- c#中实现存储图片到SQLServer2005
- Hibernate第十篇【Hibernate查询详解、分页查询】
- Hibernate如何实践union,order by,分页功能共存
- Hibernate实现分页的工具类
- hibernate mysql类型转换_精通hibernate-Hibernate 5.X的使用概述与示例(转)
- 将 varchar 值转换为 JDBC 数据类型 DATE 时发生错误。