1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activespace.cache.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activespace.cache.OptimisticTransactionException;
23 import org.codehaus.activespace.cache.TransactionException;
24 import org.codehaus.activespace.cache.TransactionalCache;
25 import org.codehaus.activespace.cache.TransactionalCacheManager;
26
27 import javax.cache.Cache;
28 import javax.cache.CacheEntry;
29 import java.util.Iterator;
30 import java.util.Map;
31
32
33 /***
34 * <p><code>TransactionalCacheCommand</code> is the default
35 * implementation of a CacheCommand which can include adding
36 * new items, updating items and removing items.
37 *
38 * @author James Strachan
39 * @version 1.7
40 */
41 public class TransactionalCacheCommand extends CacheCommand {
42 private static final Log log = LogFactory.getLog(TransactionalCache.class);
43
44 private String name;
45 private Map changes;
46 private Map removalVersions;
47 private Map readVersions;
48 private boolean clear;
49 private String originator;
50
51 public TransactionalCacheCommand(String name, Map changes, Map removalKeys, boolean clear, String originator) {
52 this.name = name;
53 this.changes = changes;
54 this.removalVersions = removalKeys;
55 this.clear = clear;
56 this.originator = originator;
57 }
58
59 public String getOriginator() {
60 return originator;
61 }
62
63 public void setOriginator(String originator) {
64 this.originator = originator;
65 }
66
67 public void run(TransactionalCacheManager cacheManager) {
68 TransactionalCache transactionCache = cacheManager.getTransactionalCache(name);
69 if (transactionCache != null) {
70 runOnCache(transactionCache);
71 }
72 else {
73 log.warn("Discarding command for unknown cache: " + name);
74 }
75 }
76
77 public void runOnCache(TransactionalCache transactionCache) {
78
79 ThreadCache threadCache = transactionCache.getThreadCache(originator);
80 if (threadCache == null) {
81
82
83 try {
84 updateBackingCache(transactionCache.getBackingCache());
85 }
86 catch (TransactionException e) {
87 if (log.isDebugEnabled()) {
88 log.debug("Discarding optimistic transaction failure: " + e, e);
89 }
90
91 }
92 finally {
93 threadCache.resetLocalChanges();
94 }
95 }
96 else {
97 runOnCache(threadCache);
98 }
99 }
100
101 public void runOnCache(ThreadCache threadCache) {
102 try {
103 updateBackingCache(threadCache.getDelegate());
104 threadCache.onCommitException(null);
105 }
106 catch (TransactionException e) {
107 threadCache.onCommitException(e);
108 }
109 finally {
110 threadCache.resetLocalChanges();
111 }
112 }
113
114 public void updateBackingCache(Cache cache) {
115 synchronized (cache) {
116 checkForConcurrencyFailure(cache);
117 if (clear) {
118 cache.clear();
119 }
120 else {
121 for (Iterator iter = removalVersions.keySet().iterator(); iter.hasNext();) {
122 cache.remove(iter.next());
123 }
124 }
125 applyChanges(cache);
126 }
127 }
128
129 public Map getReadVersions() {
130 return readVersions;
131 }
132
133 public void setReadVersions(Map readVersions) {
134 this.readVersions = readVersions;
135 }
136
137 protected void checkForConcurrencyFailure(Cache cache) {
138 for (Iterator iter = changes.entrySet().iterator(); iter.hasNext();) {
139 Map.Entry entry = (Map.Entry) iter.next();
140 Object key = entry.getKey();
141
142 VersionedValue change = (VersionedValue) entry.getValue();
143 Object updateVersion = change.getVersion();
144 Object version = JCacheHelper.getEntryVersion(cache, key);
145
146 if (log.isDebugEnabled()) {
147 log.debug("key: " + key + " update version: " + updateVersion + " on current version: " + version + " on cache: " + System.identityHashCode(cache));
148 }
149
150 if (!versionsCompatible(updateVersion, version)) {
151 if (log.isDebugEnabled()) {
152 log.debug("Incompatible change: version: " + version + " not compatible with: " + updateVersion);
153 }
154 throw new OptimisticTransactionException(cache, key, updateVersion, version);
155 }
156 }
157 for (Iterator iter = removalVersions.entrySet().iterator(); iter.hasNext();) {
158 Map.Entry entry = (Map.Entry) iter.next();
159 Object key = entry.getKey();
160 Object updateVersion = entry.getKey();
161 Object version = JCacheHelper.getEntryVersion(cache, key);
162
163 if (log.isDebugEnabled()) {
164 log.debug("key: " + key + " remove version: " + updateVersion + " on current version: " + version + " on cache: " + System.identityHashCode(cache));
165 }
166
167 if (!versionsCompatible(updateVersion, version)) {
168 if (log.isDebugEnabled()) {
169 log.debug("Incompatible remove: version: " + version + " not compatible with: " + updateVersion);
170 }
171 throw new OptimisticTransactionException(cache, key, updateVersion, version);
172 }
173 }
174 if (readVersions != null) {
175 for (Iterator iter = readVersions.entrySet().iterator(); iter.hasNext();) {
176 Map.Entry entry = (Map.Entry) iter.next();
177 Object key = entry.getKey();
178 Object updateVersion = entry.getKey();
179 Object version = JCacheHelper.getEntryVersion(cache, key);
180
181 if (log.isDebugEnabled()) {
182 log.debug("key: " + key + " read version: " + updateVersion + " on current version: " + version + " on cache: " + System.identityHashCode(cache));
183 }
184
185 if (!versionsCompatible(updateVersion, version)) {
186 if (log.isDebugEnabled()) {
187 log.debug("Incompatible read: version: " + version + " not compatible with: " + updateVersion);
188 }
189 throw new OptimisticTransactionException(cache, key, updateVersion, version);
190 }
191 }
192 }
193 }
194
195
196 /***
197 * Returns true if the given version of the entry is compatible with this
198 * change
199 */
200 protected boolean versionsCompatible(Object updateVersion, Object currentVersion) {
201 return currentVersion == updateVersion || (currentVersion != null && currentVersion.equals(updateVersion));
202 }
203
204 protected void applyChanges(Cache cache) {
205 for (Iterator iter = changes.entrySet().iterator(); iter.hasNext();) {
206 Map.Entry entry = (Map.Entry) iter.next();
207 Object key = entry.getKey();
208 VersionedValue change = (VersionedValue) entry.getValue();
209
210
211 CacheEntry cacheEntry = cache.getCacheEntry(key);
212 if (cacheEntry != null) {
213 cacheEntry.setValue(change.getValue());
214 }
215 else {
216 cache.put(key, change.getValue());
217 }
218
219
220 }
221 }
222 }