1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activespace.jms;
19
20 import org.codehaus.activespace.Space;
21 import org.codehaus.activespace.SpaceException;
22 import org.codehaus.activespace.SpaceFactory;
23 import org.codehaus.activespace.SpaceListener;
24
25 import javax.jms.Connection;
26 import javax.jms.DeliveryMode;
27 import javax.jms.Destination;
28 import javax.jms.JMSException;
29 import javax.jms.Message;
30 import javax.jms.MessageConsumer;
31 import javax.jms.MessageProducer;
32 import javax.jms.Session;
33
34 /***
35 * An implementation of Space which uses JMS
36 *
37 * @version $Revision: 1.2 $
38 */
39 public class JmsSpace implements Space {
40
41 public static final int DEFAULT_PRIORITY = 10;
42 public static final int DEFAULT_DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
43 public static final DefaultMarshaller DEFAULT_MARSHALLER = new DefaultMarshaller();
44 public static final int DEFAULT_ACKNOWLEGE_MODE = Session.AUTO_ACKNOWLEDGE;
45 public static final boolean DEFAULT_TRANSACTED = false;
46
47 private JmsSpaceFactory factory;
48 private Connection connection;
49 private String name;
50 private String selector;
51 private int dispatchMode;
52 private boolean transacted;
53 private Marshaller marshaller;
54 private int deliveryMode;
55 private int priority;
56 private int acknowledgeMode;
57
58
59 private Session session;
60 private Session asyncSession;
61 private Destination destination;
62 private SpaceMessageListener messageListener;
63 private MessageConsumer consumer;
64 private MessageProducer producer;
65 private Object syncLock = new Object();
66 private MessageConsumer asyncConsumer;
67
68 public JmsSpace(JmsSpaceFactory factory, Connection connection, String name, int dispatchMode, String selector) {
69 this(factory, connection, name, dispatchMode, selector, DEFAULT_TRANSACTED, DEFAULT_MARSHALLER, DEFAULT_DELIVERY_MODE, DEFAULT_PRIORITY, DEFAULT_ACKNOWLEGE_MODE);
70 }
71
72 /***
73 * Constructs a new JMS implementation of the Space
74 *
75 * @param factory may be null, is only used to notify the factory when this space is closed
76 * @param connection the JMS connection to use
77 * @param name the subject name of the space, either a dispatchMode name or topic name which can include wildcards
78 * @param dispatchMode whether only one consumer of the space will receive the each object or whether each
79 * consumer in the space will get their own copy of the object
80 * @param selector the SQL 92 selector to filter on or null if no selector is required
81 * @param transacted whether the space should work in transacted mode or not
82 * @param marshaller the marshaller used to move objects into and out of JMS messages
83 * @param deliveryMode the delivery mode, see {@link DeliveryMode}
84 * @param priority the default priority
85 * @param acknowledgeMode the acknowlegement mode, see {@link Session}
86 */
87 public JmsSpace(JmsSpaceFactory factory, Connection connection, String name, int dispatchMode, String selector, boolean transacted, Marshaller marshaller, int deliveryMode, int priority, int acknowledgeMode) {
88 this.factory = factory;
89 this.connection = connection;
90 this.name = name;
91 this.selector = selector;
92 this.dispatchMode = dispatchMode;
93 this.transacted = transacted;
94 this.marshaller = marshaller;
95 this.deliveryMode = deliveryMode;
96 this.priority = priority;
97 this.acknowledgeMode = acknowledgeMode;
98 if (factory != null) {
99 factory.onSpaceStart(this);
100 }
101 }
102
103
104
105
106 public void put(Object value) {
107 synchronized (syncLock) {
108 try {
109 getProducer().send(getDestination(), createMessage(value));
110 }
111 catch (JMSException e) {
112 handleException(e);
113 }
114 }
115 }
116
117 public void put(Object value, long lease) {
118 synchronized (syncLock) {
119 try {
120 getProducer().send(getDestination(), createMessage(value), deliveryMode, priority, lease);
121 }
122 catch (JMSException e) {
123 handleException(e);
124 }
125 }
126 }
127
128 public Object take() {
129 synchronized (syncLock) {
130 try {
131 return unmarshall(getConsumer().receive());
132 }
133 catch (JMSException e) {
134 handleException(e);
135 return null;
136 }
137 }
138 }
139
140 public Object take(long timeout) {
141 synchronized (syncLock) {
142 try {
143 return unmarshall(getConsumer().receive(timeout));
144 }
145 catch (JMSException e) {
146 handleException(e);
147 return null;
148 }
149 }
150 }
151
152 public Object takeNoWait() {
153 synchronized (syncLock) {
154 try {
155 return unmarshall(getConsumer().receiveNoWait());
156 }
157 catch (JMSException e) {
158 handleException(e);
159 return null;
160 }
161 }
162 }
163
164 public void addSpaceListener(SpaceListener listener) {
165 try {
166 if (messageListener == null) {
167 messageListener = new SpaceMessageListener(this);
168 asyncConsumer = getAsyncSession().createConsumer(getDestination(), selector);
169 asyncConsumer.setMessageListener(messageListener);
170 }
171 messageListener.addSpaceListener(listener);
172 }
173 catch (JMSException e) {
174 handleException(e);
175 }
176 }
177
178 public void removeSpaceListener(SpaceListener listener) {
179 if (messageListener != null) {
180 messageListener.removeSpaceListener(listener);
181 if (messageListener.isEmpty()) {
182 try {
183 asyncConsumer.close();
184 }
185 catch (JMSException e) {
186 handleException(e);
187 }
188 asyncConsumer = null;
189 messageListener = null;
190 }
191 }
192 }
193
194 public Space createChildSpace(String query) throws SpaceException {
195 String newSelector = query;
196 if (selector != null) {
197 newSelector = "(" + selector + ") and (" + query + ")";
198 }
199 return new JmsSpace(factory, connection, name, dispatchMode, newSelector, transacted, marshaller, deliveryMode, priority, acknowledgeMode);
200 }
201
202 public String getName() {
203 return name;
204 }
205
206 public int getDispatchMode() {
207 return dispatchMode;
208 }
209
210 public void close() throws SpaceException {
211 JMSException lastException = null;
212 if (producer != null) {
213 try {
214 producer.close();
215 }
216 catch (JMSException e) {
217 if (lastException == null) {
218 lastException = e;
219 }
220 }
221 }
222 if (consumer != null) {
223 try {
224 consumer.close();
225 }
226 catch (JMSException e) {
227 if (lastException == null) {
228 lastException = e;
229 }
230 }
231 }
232 if (session != null) {
233 try {
234 session.close();
235 }
236 catch (JMSException e) {
237 if (lastException == null) {
238 lastException = e;
239 }
240 }
241 }
242 if (factory != null) {
243 try {
244 factory.onSpaceClose(this);
245 }
246 catch (JMSException e) {
247 if (lastException == null) {
248 lastException = e;
249 }
250 }
251 }
252 if (lastException != null) {
253 throw new SpaceException(lastException);
254 }
255 }
256
257
258
259 public Marshaller getMarshaller() {
260 return marshaller;
261 }
262
263 public void setMarshaller(Marshaller marshaller) {
264 this.marshaller = marshaller;
265 }
266
267 public Destination getDestination() throws JMSException {
268 if (destination == null) {
269 destination = createDestination();
270 }
271 return destination;
272 }
273
274 public void setDestination(Destination destination) {
275 this.destination = destination;
276 }
277
278 public String getSelector() {
279 return selector;
280 }
281
282 public void setSelector(String selector) {
283 this.selector = selector;
284 }
285
286 public int getDeliveryMode() {
287 return deliveryMode;
288 }
289
290 public void setDeliveryMode(int deliveryMode) {
291 this.deliveryMode = deliveryMode;
292 }
293
294 public int getPriority() {
295 return priority;
296 }
297
298 public void setPriority(int priority) {
299 this.priority = priority;
300 }
301
302
303
304
305 protected Session getSession() throws JMSException {
306 if (session == null) {
307 session = connection.createSession(transacted, acknowledgeMode);
308 }
309 return session;
310 }
311
312 public Session getAsyncSession() throws JMSException {
313 if (asyncSession == null) {
314 asyncSession = connection.createSession(transacted, acknowledgeMode);
315 }
316 return asyncSession;
317 }
318
319 protected MessageConsumer getConsumer() throws JMSException {
320 if (consumer == null) {
321 consumer = getSession().createConsumer(getDestination(), selector);
322 }
323 return consumer;
324 }
325
326 protected MessageProducer getProducer() throws JMSException {
327 if (producer == null) {
328 producer = getSession().createProducer(getDestination());
329 }
330 return producer;
331 }
332
333 protected Destination createDestination() throws JMSException {
334 switch (dispatchMode) {
335 case SpaceFactory.DISPATCH_ALL_CONSUMERS:
336 return getSession().createTopic(name);
337
338 case SpaceFactory.DISPATCH_ONE_CONSUMER_EXCLUSIVE:
339
340
341 case SpaceFactory.DISPATCH_ONE_CONSUMER:
342 return getSession().createQueue(name);
343
344 default:
345 throw new JMSException("Invalid DispatchMode. Do not understand value: " + dispatchMode);
346 }
347 }
348
349
350 protected Message createMessage(Object value) throws JMSException {
351 return marshaller.marshall(session, value);
352 }
353
354 protected Object unmarshall(Message message) throws JMSException {
355 return marshaller.unmarshall(message);
356 }
357
358 protected void handleException(JMSException e) {
359 throw new RuntimeException(e);
360 }
361
362 }