* The program contains a DurableSubscriber class, a MultiplePublisher class,
* a main method, and a method that instantiates the classes and calls their
* methods in sequence.
*
* The program begins like any publish/subscribe program: the subscriber starts,
* the publisher publishes some messages, and the subscriber receives them.
*
* At this point the subscriber closes itself. The publisher then publishes
* some messages while the subscriber is not active. The subscriber then
* restarts and receives the messages.
*
* Specify a topic name on the command line when you run the program.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class DurableSubscriberExample {
String topicName = null;
int exitResult = 0;
static int startindex = 0;
/**
* The DurableSubscriber class contains a constructor, a startSubscriber
* method, a closeSubscriber method, and a finish method.
*
* The class fetches messages asynchronously, using a message listener,
* TextListener.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
public class DurableSubscriber {
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
TextListener topicListener = null;
/**
* The TextListener class implements the MessageListener interface by
* defining an onMessage method for the DurableSubscriber class.
*
* @author Kim Haase
* @version 1.6, 08/18/00
*/
private class TextListener implements MessageListener {
final SampleUtilities.DoneLatch monitor =
new SampleUtilities.DoneLatch();
/**
* Casts the message to a TextMessage and displays its text.
* A non-text message is interpreted as the end of the message
* stream, and the message listener sets its monitor state to all
* done processing messages.
*
* @param message the incoming message
*/
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("SUBSCRIBER: Reading message: "
+ msg.getText());
} catch (JMSException e) {
System.out.println("Exception in onMessage(): "
+ e.toString());
}
} else {
monitor.allDone();
}
}
}
/**
* Constructor: looks up a connection factory and topic and creates a
* connection and session.
*/
public DurableSubscriber() {
TopicConnectionFactory topicConnectionFactory = null;
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicConnection.setClientID("EE");
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
}
/**
* Stops connection, then creates durable subscriber, registers message
* listener (TextListener), and starts message delivery; listener
* displays the messages obtained.
*/
public void startSubscriber() {
try {
System.out.println("Starting subscriber");
topicConnection.stop();
topicSubscriber = topicSession.createDurableSubscriber(topic,
"MakeItLast");
topicListener = new TextListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
/**
* Blocks until publisher issues a control message indica3ting
* end of publish stream, then closes subscriber.
*/
public void closeSubscriber() {
try {
topicListener.monitor.waitTillDone();
System.out.println("Closing subscriber");
topicSubscriber.close();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
/**
* Closes the connection.
*/
public void finish() {
if (topicConnection != null) {
try {
topicSession.unsubscribe("MakeItLast");
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}