View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activespace.jms;
19  
20  import org.codehaus.activespace.Space;
21  import org.codehaus.activespace.SpaceException;
22  import org.codehaus.activespace.SpaceFactory;
23  import org.codehaus.activespace.SpaceListener;
24  
25  import javax.jms.Connection;
26  import javax.jms.DeliveryMode;
27  import javax.jms.Destination;
28  import javax.jms.JMSException;
29  import javax.jms.Message;
30  import javax.jms.MessageConsumer;
31  import javax.jms.MessageProducer;
32  import javax.jms.Session;
33  
34  /***
35   * An implementation of Space which uses JMS
36   *
37   * @version $Revision: 1.2 $
38   */
39  public class JmsSpace implements Space {
40  
41      public static final int DEFAULT_PRIORITY = 10;
42      public static final int DEFAULT_DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
43      public static final DefaultMarshaller DEFAULT_MARSHALLER = new DefaultMarshaller();
44      public static final int DEFAULT_ACKNOWLEGE_MODE = Session.AUTO_ACKNOWLEDGE;
45      public static final boolean DEFAULT_TRANSACTED = false;
46  
47      private JmsSpaceFactory factory;
48      private Connection connection;
49      private String name;
50      private String selector;
51      private int dispatchMode;
52      private boolean transacted;
53      private Marshaller marshaller;
54      private int deliveryMode;
55      private int priority;
56      private int acknowledgeMode;
57  
58      // stuff we lazily create
59      private Session session;
60      private Session asyncSession;
61      private Destination destination;
62      private SpaceMessageListener messageListener;
63      private MessageConsumer consumer;
64      private MessageProducer producer;
65      private Object syncLock = new Object();
66      private MessageConsumer asyncConsumer;
67  
68      public JmsSpace(JmsSpaceFactory factory, Connection connection, String name, int dispatchMode, String selector) {
69          this(factory, connection, name, dispatchMode, selector, DEFAULT_TRANSACTED, DEFAULT_MARSHALLER, DEFAULT_DELIVERY_MODE, DEFAULT_PRIORITY, DEFAULT_ACKNOWLEGE_MODE);
70      }
71  
72      /***
73       * Constructs a new JMS implementation of the Space
74       *
75       * @param factory         may be null, is only used to notify the factory when this space is closed
76       * @param connection      the JMS connection to use
77       * @param name            the subject name of the space, either a dispatchMode name or topic name which can include wildcards
78       * @param dispatchMode    whether only one consumer of the space will receive the each object or whether each
79       *                        consumer in the space will get their own copy of the object
80       * @param selector        the SQL 92 selector to filter on or null if no selector is required
81       * @param transacted      whether the space should work in transacted mode or not
82       * @param marshaller      the marshaller used to move objects into and out of JMS messages
83       * @param deliveryMode    the delivery mode, see {@link DeliveryMode}
84       * @param priority        the default priority
85       * @param acknowledgeMode the acknowlegement mode, see {@link Session}
86       */
87      public JmsSpace(JmsSpaceFactory factory, Connection connection, String name, int dispatchMode, String selector, boolean transacted, Marshaller marshaller, int deliveryMode, int priority, int acknowledgeMode) {
88          this.factory = factory;
89          this.connection = connection;
90          this.name = name;
91          this.selector = selector;
92          this.dispatchMode = dispatchMode;
93          this.transacted = transacted;
94          this.marshaller = marshaller;
95          this.deliveryMode = deliveryMode;
96          this.priority = priority;
97          this.acknowledgeMode = acknowledgeMode;
98          if (factory != null) {
99              factory.onSpaceStart(this);
100         }
101     }
102 
103 
104     // Space interface
105     //-------------------------------------------------------------------------
106     public void put(Object value) {
107         synchronized (syncLock) {
108             try {
109                 getProducer().send(getDestination(), createMessage(value));
110             }
111             catch (JMSException e) {
112                 handleException(e);
113             }
114         }
115     }
116 
117     public void put(Object value, long lease) {
118         synchronized (syncLock) {
119             try {
120                 getProducer().send(getDestination(), createMessage(value), deliveryMode, priority, lease);
121             }
122             catch (JMSException e) {
123                 handleException(e);
124             }
125         }
126     }
127 
128     public Object take() {
129         synchronized (syncLock) {
130             try {
131                 return unmarshall(getConsumer().receive());
132             }
133             catch (JMSException e) {
134                 handleException(e);
135                 return null;
136             }
137         }
138     }
139 
140     public Object take(long timeout) {
141         synchronized (syncLock) {
142             try {
143                 return unmarshall(getConsumer().receive(timeout));
144             }
145             catch (JMSException e) {
146                 handleException(e);
147                 return null;
148             }
149         }
150     }
151 
152     public Object takeNoWait() {
153         synchronized (syncLock) {
154             try {
155                 return unmarshall(getConsumer().receiveNoWait());
156             }
157             catch (JMSException e) {
158                 handleException(e);
159                 return null;
160             }
161         }
162     }
163 
164     public void addSpaceListener(SpaceListener listener) {
165         try {
166             if (messageListener == null) {
167                 messageListener = new SpaceMessageListener(this);
168                 asyncConsumer = getAsyncSession().createConsumer(getDestination(), selector);
169                 asyncConsumer.setMessageListener(messageListener);
170             }
171             messageListener.addSpaceListener(listener);
172         }
173         catch (JMSException e) {
174             handleException(e);
175         }
176     }
177 
178     public void removeSpaceListener(SpaceListener listener) {
179         if (messageListener != null) {
180             messageListener.removeSpaceListener(listener);
181             if (messageListener.isEmpty()) {
182                 try {
183                     asyncConsumer.close();
184                 }
185                 catch (JMSException e) {
186                     handleException(e);
187                 }
188                 asyncConsumer = null;
189                 messageListener = null;
190             }
191         }
192     }
193 
194     public Space createChildSpace(String query) throws SpaceException {
195         String newSelector = query;
196         if (selector != null) {
197             newSelector = "(" + selector + ") and (" + query + ")";
198         }
199         return new JmsSpace(factory, connection, name, dispatchMode, newSelector, transacted, marshaller, deliveryMode, priority, acknowledgeMode);
200     }
201 
202     public String getName() {
203         return name;
204     }
205 
206     public int getDispatchMode() {
207         return dispatchMode;
208     }
209 
210     public void close() throws SpaceException {
211         JMSException lastException = null;
212         if (producer != null) {
213             try {
214                 producer.close();
215             }
216             catch (JMSException e) {
217                 if (lastException == null) {
218                     lastException = e;
219                 }
220             }
221         }
222         if (consumer != null) {
223             try {
224                 consumer.close();
225             }
226             catch (JMSException e) {
227                 if (lastException == null) {
228                     lastException = e;
229                 }
230             }
231         }
232         if (session != null) {
233             try {
234                 session.close();
235             }
236             catch (JMSException e) {
237                 if (lastException == null) {
238                     lastException = e;
239                 }
240             }
241         }
242         if (factory != null) {
243             try {
244                 factory.onSpaceClose(this);
245             }
246             catch (JMSException e) {
247                 if (lastException == null) {
248                     lastException = e;
249                 }
250             }
251         }
252         if (lastException != null) {
253             throw new SpaceException(lastException);
254         }
255     }
256 
257     // Properties
258     //-------------------------------------------------------------------------
259     public Marshaller getMarshaller() {
260         return marshaller;
261     }
262 
263     public void setMarshaller(Marshaller marshaller) {
264         this.marshaller = marshaller;
265     }
266 
267     public Destination getDestination() throws JMSException {
268         if (destination == null) {
269             destination = createDestination();
270         }
271         return destination;
272     }
273 
274     public void setDestination(Destination destination) {
275         this.destination = destination;
276     }
277 
278     public String getSelector() {
279         return selector;
280     }
281 
282     public void setSelector(String selector) {
283         this.selector = selector;
284     }
285 
286     public int getDeliveryMode() {
287         return deliveryMode;
288     }
289 
290     public void setDeliveryMode(int deliveryMode) {
291         this.deliveryMode = deliveryMode;
292     }
293 
294     public int getPriority() {
295         return priority;
296     }
297 
298     public void setPriority(int priority) {
299         this.priority = priority;
300     }
301 
302 
303     // Implementation methods
304     //-------------------------------------------------------------------------
305     protected Session getSession() throws JMSException {
306         if (session == null) {
307             session = connection.createSession(transacted, acknowledgeMode);
308         }
309         return session;
310     }
311 
312     public Session getAsyncSession() throws JMSException {
313         if (asyncSession == null) {
314             asyncSession = connection.createSession(transacted, acknowledgeMode);
315         }
316         return asyncSession;
317     }
318 
319     protected MessageConsumer getConsumer() throws JMSException {
320         if (consumer == null) {
321             consumer = getSession().createConsumer(getDestination(), selector);
322         }
323         return consumer;
324     }
325 
326     protected MessageProducer getProducer() throws JMSException {
327         if (producer == null) {
328             producer = getSession().createProducer(getDestination());
329         }
330         return producer;
331     }
332 
333     protected Destination createDestination() throws JMSException {
334         switch (dispatchMode) {
335             case SpaceFactory.DISPATCH_ALL_CONSUMERS:
336                 return getSession().createTopic(name);
337 
338             case SpaceFactory.DISPATCH_ONE_CONSUMER_EXCLUSIVE:
339                 // TODO implement exclusive queues
340 
341             case SpaceFactory.DISPATCH_ONE_CONSUMER:
342                 return getSession().createQueue(name);
343 
344             default:
345                 throw new JMSException("Invalid DispatchMode. Do not understand value: " + dispatchMode);
346         }
347     }
348 
349 
350     protected Message createMessage(Object value) throws JMSException {
351         return marshaller.marshall(session, value);
352     }
353 
354     protected Object unmarshall(Message message) throws JMSException {
355         return marshaller.unmarshall(message);
356     }
357 
358     protected void handleException(JMSException e) {
359         throw new RuntimeException(e);
360     }
361 
362 }