View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activespace.cache.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activespace.cache.OptimisticTransactionException;
23  import org.codehaus.activespace.cache.TransactionException;
24  import org.codehaus.activespace.cache.TransactionalCache;
25  import org.codehaus.activespace.cache.TransactionalCacheManager;
26  
27  import javax.cache.Cache;
28  import javax.cache.CacheEntry;
29  import java.util.Iterator;
30  import java.util.Map;
31  
32  
33  /***
34   * <p><code>TransactionalCacheCommand</code> is the default
35   * implementation of a CacheCommand which can include adding
36   * new items, updating items and removing items.
37   *
38   * @author James Strachan
39   * @version 1.7
40   */
41  public class TransactionalCacheCommand extends CacheCommand {
42      private static final Log log = LogFactory.getLog(TransactionalCache.class);
43  
44      private String name;
45      private Map changes;
46      private Map removalVersions;
47      private Map readVersions;
48      private boolean clear;
49      private String originator;
50  
51      public TransactionalCacheCommand(String name, Map changes, Map removalKeys, boolean clear, String originator) {
52          this.name = name;
53          this.changes = changes;
54          this.removalVersions = removalKeys;
55          this.clear = clear;
56          this.originator = originator;
57      }
58  
59      public String getOriginator() {
60          return originator;
61      }
62  
63      public void setOriginator(String originator) {
64          this.originator = originator;
65      }
66  
67      public void run(TransactionalCacheManager cacheManager) {
68          TransactionalCache transactionCache = cacheManager.getTransactionalCache(name);
69          if (transactionCache != null) {
70              runOnCache(transactionCache);
71          }
72          else {
73              log.warn("Discarding command for unknown cache: " + name);
74          }
75      }
76  
77      public void runOnCache(TransactionalCache transactionCache) {
78          // lets get the thread cache of the originator of the transaction
79          ThreadCache threadCache = transactionCache.getThreadCache(originator);
80          if (threadCache == null) {
81              // we are not the JVM which initiated the transaction to silently discard
82              // any transaction failures
83              try {
84                  updateBackingCache(transactionCache.getBackingCache());
85              }
86              catch (TransactionException e) {
87                  if (log.isDebugEnabled()) {
88                      log.debug("Discarding optimistic transaction failure: " + e, e);
89                  }
90                  //System.out.println("Discarding optimistic transaction failure: " + e + " as no cache could be found for: " + originator + " on transactionCache: " + transactionCache.hashCode());
91              }
92              finally {
93                  threadCache.resetLocalChanges();
94              }
95          }
96          else {
97              runOnCache(threadCache);
98          }
99      }
100 
101     public void runOnCache(ThreadCache threadCache) {
102         try {
103             updateBackingCache(threadCache.getDelegate());
104             threadCache.onCommitException(null);
105         }
106         catch (TransactionException e) {
107             threadCache.onCommitException(e);
108         }
109         finally {
110             threadCache.resetLocalChanges();
111         }
112     }
113 
114     public void updateBackingCache(Cache cache) {
115         synchronized (cache) {
116             checkForConcurrencyFailure(cache);
117             if (clear) {
118                 cache.clear();
119             }
120             else {
121                 for (Iterator iter = removalVersions.keySet().iterator(); iter.hasNext();) {
122                     cache.remove(iter.next());
123                 }
124             }
125             applyChanges(cache);
126         }
127     }
128 
129     public Map getReadVersions() {
130         return readVersions;
131     }
132 
133     public void setReadVersions(Map readVersions) {
134         this.readVersions = readVersions;
135     }
136 
137     protected void checkForConcurrencyFailure(Cache cache) {
138         for (Iterator iter = changes.entrySet().iterator(); iter.hasNext();) {
139             Map.Entry entry = (Map.Entry) iter.next();
140             Object key = entry.getKey();
141 
142             VersionedValue change = (VersionedValue) entry.getValue();
143             Object updateVersion = change.getVersion();
144             Object version = JCacheHelper.getEntryVersion(cache, key);
145 
146             if (log.isDebugEnabled()) {
147                 log.debug("key: " + key + " update version: " + updateVersion + " on current version: " + version + " on cache: " + System.identityHashCode(cache));
148             }
149 
150             if (!versionsCompatible(updateVersion, version)) {
151                 if (log.isDebugEnabled()) {
152                     log.debug("Incompatible change: version: " + version + " not compatible with: " + updateVersion);
153                 }
154                 throw new OptimisticTransactionException(cache, key, updateVersion, version);
155             }
156         }
157         for (Iterator iter = removalVersions.entrySet().iterator(); iter.hasNext();) {
158             Map.Entry entry = (Map.Entry) iter.next();
159             Object key = entry.getKey();
160             Object updateVersion = entry.getKey();
161             Object version = JCacheHelper.getEntryVersion(cache, key);
162 
163             if (log.isDebugEnabled()) {
164                 log.debug("key: " + key + " remove version: " + updateVersion + " on current version: " + version + " on cache: " + System.identityHashCode(cache));
165             }
166 
167             if (!versionsCompatible(updateVersion, version)) {
168                 if (log.isDebugEnabled()) {
169                     log.debug("Incompatible remove: version: " + version + " not compatible with: " + updateVersion);
170                 }
171                 throw new OptimisticTransactionException(cache, key, updateVersion, version);
172             }
173         }
174         if (readVersions != null) {
175             for (Iterator iter = readVersions.entrySet().iterator(); iter.hasNext();) {
176                 Map.Entry entry = (Map.Entry) iter.next();
177                 Object key = entry.getKey();
178                 Object updateVersion = entry.getKey();
179                 Object version = JCacheHelper.getEntryVersion(cache, key);
180 
181                 if (log.isDebugEnabled()) {
182                     log.debug("key: " + key + " read version: " + updateVersion + " on current version: " + version + " on cache: " + System.identityHashCode(cache));
183                 }
184 
185                 if (!versionsCompatible(updateVersion, version)) {
186                     if (log.isDebugEnabled()) {
187                         log.debug("Incompatible read: version: " + version + " not compatible with: " + updateVersion);
188                     }
189                     throw new OptimisticTransactionException(cache, key, updateVersion, version);
190                 }
191             }
192         }
193     }
194 
195 
196     /***
197      * Returns true if the given version of the entry is compatible with this
198      * change
199      */
200     protected boolean versionsCompatible(Object updateVersion, Object currentVersion) {
201         return currentVersion == updateVersion || (currentVersion != null && currentVersion.equals(updateVersion));
202     }
203 
204     protected void applyChanges(Cache cache) {
205         for (Iterator iter = changes.entrySet().iterator(); iter.hasNext();) {
206             Map.Entry entry = (Map.Entry) iter.next();
207             Object key = entry.getKey();
208             VersionedValue change = (VersionedValue) entry.getValue();
209 
210             // TODO should put do this by default?
211             CacheEntry cacheEntry = cache.getCacheEntry(key);
212             if (cacheEntry != null) {
213                 cacheEntry.setValue(change.getValue());
214             }
215             else {
216                 cache.put(key, change.getValue());
217             }
218 
219             //System.out.println("Entry: " + key + " has value " + cache.get(key) + " with version: " + JCacheHelper.getEntryVersion(cache, key) + " for cache: " + System.identityHashCode(cache));
220         }
221     }
222 }