JMS 发送程序代码

javahero1984 2010-04-18
CommandLineSupport.java
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

import java.util.Arrays;
import java.io.IOException;

/**
* @Author: Vane Company: LinkTrust.com.cn Product: IntraSec Module:(此处由模块说明替换)
*          Created on 2008-10-9
*/
public class JMSTopicListener implements MessageListener {

private Connection connection;

private Session session;

private Topic topic;

private static String url;

private static String user;

private static String pw;

private static String jmsTopicMessages;

private static String mapString;

private static String mapStringDateType;

private static String mapStringProperty;

private static String compart;

public static void main(String[] argv) {
try {
LogRecorder.Init("jms_log.cfg");
init("JMS.cfg");
JMSTopicListener l = new JMSTopicListener();
String[] unknown = CommandLineSupport.setOptions(l, argv);
if (unknown.length > 0) {
LogRecorder.GetLog().info(
"Unknown options: " + Arrays.toString(unknown));
System.exit(-1);
}
l.run();
} catch (IOException e) {
LogRecorder.GetLog().info(JMSTopicListener.class, e);
} catch (JMSException e) {
LogRecorder.GetLog().info(JMSTopicListener.class, e);
}
}

public void run() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(jmsTopicMessages);

MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(this);

connection.start();
LogRecorder.GetLog()
.info("connnect successful,waiting for messages...");
}

private static boolean checkText(Message m, String s) {
try {
return m instanceof TextMessage
&& ((TextMessage) m).getText().equals(s);
} catch (JMSException e) {
LogRecorder.GetLog().info(JMSTopicListener.class, e);
return false;
}
}

public void onMessage(Message message) {
try {
if (checkText(message, "SHUTDOWN")) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace(System.out);
}
} else {
StringBuffer out = new StringBuffer();
/*--------------------------------MeptMessage----------------------------------------*/
MapMessage mm = (MapMessage) message;
if (mapStringProperty != null && mapStringProperty.length() > 0) {
String[] arg1 = mapStringProperty.split(";");
for (int i = 0; i < arg1.length; i++) {
String s = arg1[i];
out.append(String.valueOf(mm.getBytes(s))).append(
compart);
}
}
if (mapString != null && mapString.length() > 0) {
String[] arg2 = mapString.split(";");
String[] dateType = mapStringDateType.split(";");
for (int i = 0; i < arg2.length; i++) {
String s = arg2[i];
int type = Integer.parseInt(dateType[i]);
if (type == 0) {
if (mm.itemExists(s)) {
out.append(new String(mm.getBytes(s))).append(
compart);
} else {
out.append(compart);
}
} else {
if (mm.itemExists(s)) {
out.append(String.valueOf(mm.getInt(s)))
.append(compart);
} else {
out.append(compart);
}

}
}
}
System.out.println(out.toString());
LogRecorder.GetLog().debug(out.toString());
}
} catch (JMSException e) {
LogRecorder.GetLog().info(JMSTopicListener.class, e);
}
}

public void setUrl(String url) {
this.url = url;
}

public static void init(String fileName) throws IOException {
PropertyReader prop = new PropertyReader(fileName);
url = prop.getProperty("url");
jmsTopicMessages = prop.getProperty("jmsTopicMessages");
mapString = prop.getProperty("mapString");
mapStringDateType = prop.getProperty("mapStringDateType");
mapStringProperty = prop.getProperty("mapStringProperty");
compart = prop.getProperty("compart");
LogRecorder.GetLog().info("jms url:" + url);
}

}


JMSTopicListener.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

import java.util.Arrays;
import java.io.IOException;

/**
* @Author: Vane Company: LinkTrust.com.cn Product: IntraSec Module:(此处由模块说明替换)
*          Created on 2008-10-9
*/
public class JMSTopicListener implements MessageListener {

private Connection connection;

private Session session;

private Topic topic;

private static String url;

private static String user;

private static String pw;

private static String jmsTopicMessages;

private static String mapString;

private static String mapStringDateType;

private static String mapStringProperty;

private static String compart;

public static void main(String[] argv) {
try {
LogRecorder.Init("jms_log.cfg");
init("JMS.cfg");
JMSTopicListener l = new JMSTopicListener();
String[] unknown = CommandLineSupport.setOptions(l, argv);
if (unknown.length > 0) {
LogRecorder.GetLog().info(
"Unknown options: " + Arrays.toString(unknown));
System.exit(-1);
}
l.run();
} catch (IOException e) {
LogRecorder.GetLog().info(JMSTopicListener.class, e);
} catch (JMSException e) {
LogRecorder.GetLog().info(JMSTopicListener.class, e);
}
}

public void run() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(jmsTopicMessages);

MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(this);

connection.start();
LogRecorder.GetLog()
.info("connnect successful,waiting for messages...");
}

private static boolean checkText(Message m, String s) {
try {
return m instanceof TextMessage
&& ((TextMessage) m).getText().equals(s);
} catch (JMSException e) {
LogRecorder.GetLog().info(JMSTopicListener.class, e);
return false;
}
}

public void onMessage(Message message) {
try {
if (checkText(message, "SHUTDOWN")) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace(System.out);
}
} else {
StringBuffer out = new StringBuffer();
/*--------------------------------MeptMessage----------------------------------------*/
MapMessage mm = (MapMessage) message;
if (mapStringProperty != null && mapStringProperty.length() > 0) {
String[] arg1 = mapStringProperty.split(";");
for (int i = 0; i < arg1.length; i++) {
String s = arg1[i];
out.append(String.valueOf(mm.getBytes(s))).append(
compart);
}
}
if (mapString != null && mapString.length() > 0) {
String[] arg2 = mapString.split(";");
String[] dateType = mapStringDateType.split(";");
for (int i = 0; i < arg2.length; i++) {
String s = arg2[i];
int type = Integer.parseInt(dateType[i]);
if (type == 0) {
if (mm.itemExists(s)) {
out.append(new String(mm.getBytes(s))).append(
compart);
} else {
out.append(compart);
}
} else {
if (mm.itemExists(s)) {
out.append(String.valueOf(mm.getInt(s)))
.append(compart);
} else {
out.append(compart);
}

}
}
}
System.out.println(out.toString());
LogRecorder.GetLog().debug(out.toString());
}
} catch (JMSException e) {
LogRecorder.GetLog().info(JMSTopicListener.class, e);
}
}

public void setUrl(String url) {
this.url = url;
}

public static void init(String fileName) throws IOException {
PropertyReader prop = new PropertyReader(fileName);
url = prop.getProperty("url");
jmsTopicMessages = prop.getProperty("jmsTopicMessages");
mapString = prop.getProperty("mapString");
mapStringDateType = prop.getProperty("mapStringDateType");
mapStringProperty = prop.getProperty("mapStringProperty");
compart = prop.getProperty("compart");
LogRecorder.GetLog().info("jms url:" + url);
}

}

JMSTopicPublisher.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.Arrays;
import java.io.IOException;
import java.io.Serializable;

/**
* @Author: Vane
* Company: LinkTrust.com.cn
* Product: IntraSec
* Module:(此处由模块说明替换)
* Created on 2008-10-8
*/
public class JMSTopicPublisher implements MessageListener {
    private final Object mutex = new Object();
    private Connection connection;
    private Session session;
    private MessageProducer publisher;
    private Topic topic;
    private Topic control;

    private static String url ;
    private static int size ;
    private static int subscribers ;
    private static int remaining;
    private static int messages ;
    private long delay;
    private static int batch ;

    private static String jmsTopicMessages;

    public static void main(String[] argv) throws Exception {
        init("JMS.cfg");
        JMSTopicPublisher p = new JMSTopicPublisher();
        String[] unknown = CommandLineSupport.setOptions(p, argv);
        if (unknown.length > 0) {
            System.out.println("Unknown options: " + Arrays.toString(unknown));
            System.exit(-1);
        }
        p.run();
    }

    private void run() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        connection = factory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        topic = session.createTopic(jmsTopicMessages);
        control = session.createTopic("topictest.control");

        publisher = session.createProducer(topic);
        publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        for(int i=0 ; i<1000 ; i++){
            System.out.println("================="+i+1+"==================");
            MapMessage message = session.createMapMessage();
            message.setStringProperty("SENSE", "IBM ITM 6.1.1["+i+"]");
            message.setStringProperty("CLASS", "COMPUTERSYSTEM["+i+"]");
            message.setString("CPULOAD:NO=0","0.1["+i+"]");
            message.setString("CPUFREE:NO=0","0.9["+i+"]");
//            TestVO vo = new TestVO();
//            vo.setName("name"+i);
//            vo.setDesc("desc"+i);
//            vo.setTitle("title"+i);
            publisher.send(message);
            //publisher.send(session.createTextMessage("abcdefghijklmnopqistuvwxyz"));
        }
        connection.stop();
        connection.close();
    }

    public void onMessage(Message message) {
        synchronized (mutex) {
            System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");
            if (remaining == 0) {
                mutex.notify();
            }
        }
    }

    Object getReport(Message m) {
        try {
            return ((TextMessage)m).getText();
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            return e.toString();
        }
    }


    public void setBatch(int batch) {
        this.batch = batch;
    }

    public void setDelay(long delay) {
        this.delay = delay;
    }

    public void setMessages(int messages) {
        this.messages = messages;
    }

    public void setSize(int size) {
        this.size = size;
    }

    public void setSubscribers(int subscribers) {
        this.subscribers = subscribers;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public static void init(String fileName) throws IOException {
        PropertyReader prop = new PropertyReader(fileName);
        url = prop.getProperty("url");
        size = Integer.parseInt(prop.getProperty("size"));
        subscribers = Integer.parseInt(prop.getProperty("subscribers"));
        messages = Integer.parseInt(prop.getProperty("messages"));
        batch = Integer.parseInt(prop.getProperty("batch"));
        jmsTopicMessages = prop.getProperty("jmsTopicMessages");
        System.out.println("==============================");
        System.out.println(url + " " + size + " " + subscribers +" "+messages +" "+batch);
        System.out.println("==============================");
    }

}


LogRecorder.java

import org.apache.log4j.Logger;

/**
* @Author: Vane
* Company: LinkTrust.com.cn
* Product: IntraSec
* Module:(此处由模块说明替换)
* Created on 2008-10-13
*/
public class LogRecorder {
    private static Logger log;

public static void Init(String config) {
org.apache.log4j.PropertyConfigurator.configure(config);
log = Logger.getLogger(LogRecorder.class);
}

public static Logger GetLog() {
return log;
}
}

PropertyReader---
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;

/**
* @Author: Vane
* Company: LinkTrust.com.cn
* Product: IntraSec
* Module:(此处由模块说明替换)
* Created on 2008-10-8
*/
public class PropertyReader {
    protected Properties prop = new Properties();

    public PropertyReader(String propertyFile) throws IOException {
        propertyRead(propertyFile);
    }

    protected void propertyRead(String propertyFile) throws IOException {
        prop.clear();
        //InputStream is = getClass().getResourceAsStream(propertyFile);
        InputStream is = new FileInputStream(propertyFile);
        prop.load(is);
    }

    public String getProperty(String aprop) {
        return prop.getProperty(aprop).trim();
    }
}


TopicListener.java


/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Arrays;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* Use in conjunction with TopicPublisher to test the performance of ActiveMQ
* Topics.
*/
public class TopicListener implements MessageListener {

    private Connection connection;
    private MessageProducer producer;
    private Session session;
    private int count;
    private long start;
    private Topic topic;
    private Topic control;

    private String url = "tcp://localhost:61616";

    public static void main(String[] argv) throws Exception {
        TopicListener l = new TopicListener();
        String[] unknown = CommandLineSupport.setOptions(l, argv);
        if (unknown.length > 0) {
            System.out.println("Unknown options: " + Arrays.toString(unknown));
            System.exit(-1);
        }
        l.run();
    }

    public void run() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        connection = factory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        topic = session.createTopic("topictest.messages");
        control = session.createTopic("topictest.control");

        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(this);

        connection.start();

        producer = session.createProducer(control);
        System.out.println("Waiting for messages...");
    }

    private static boolean checkText(Message m, String s) {
        try {
            return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            return false;
        }
    }

    public void onMessage(Message message) {
        try {

        if (checkText(message, "SHUTDOWN")) {

            try {
                connection.close();
            } catch (Exception e) {
                e.printStackTrace(System.out);
            }

        } else if (checkText(message, "REPORT")) {
            // send a report:

            try {
                long time = System.currentTimeMillis() - start;
                String msg = "Received " + count + " in " + time + "ms";
                producer.send(session.createTextMessage(msg));
            } catch (Exception e) {
                e.printStackTrace(System.out);
            }
            count = 0;

        } else {

            if (count == 0) {
                start = System.currentTimeMillis();
            }

            if (++count % 1000 == 0) {
                BytesMessage txtMsg = (BytesMessage)message;
System.out.println("Received =" + txtMsg.readByte());
System.out.println("Received " + count + " messages.");
            }
        }
} catch (JMSException e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

    public void setUrl(String url) {
        this.url = url;
    }

}
javahero1984 2010-04-20
http://fusesource.com/documentation/fuse-service-framework-documentation/---fuse 怎么部署
http://fusesource.com/documentation/fuse-message-broker-documentation/ -- 文档概览
Global site tag (gtag.js) - Google Analytics