Clover coverage report - ActiveSpace - 1.0-SNAPSHOT
Coverage timestamp: Sat Oct 23 2004 15:04:04 BST
file stats: LOC: 173   Methods: 10
NCLOC: 112   Classes: 1
30 day Evaluation Version distributed via the Maven Jar Repository. Clover is not free. You have 30 days to evaluate it. Please visit http://www.thecortex.net/clover to obtain a licensed version of Clover
 
 Source file Conditionals Statements Methods TOTAL
ClusteredCacheManager.java 33.3% 74.5% 100% 68.4%
coverage coverage
 1   
 /**
 2   
  * 
 3   
  * Copyright 2004 Protique Ltd
 4   
  * 
 5   
  * Licensed under the Apache License, Version 2.0 (the "License"); 
 6   
  * you may not use this file except in compliance with the License. 
 7   
  * You may obtain a copy of the License at 
 8   
  * 
 9   
  * http://www.apache.org/licenses/LICENSE-2.0
 10   
  * 
 11   
  * Unless required by applicable law or agreed to in writing, software
 12   
  * distributed under the License is distributed on an "AS IS" BASIS, 
 13   
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14   
  * See the License for the specific language governing permissions and 
 15   
  * limitations under the License. 
 16   
  * 
 17   
  **/
 18   
 package org.codehaus.activespace.cache;
 19   
 
 20   
 import org.apache.commons.logging.Log;
 21   
 import org.apache.commons.logging.LogFactory;
 22   
 import org.codehaus.activespace.Space;
 23   
 import org.codehaus.activespace.SpaceEvent;
 24   
 import org.codehaus.activespace.SpaceListener;
 25   
 import org.codehaus.activespace.cache.impl.CacheCommand;
 26   
 import org.codehaus.activespace.cache.impl.CompositeCacheCommand;
 27   
 import org.codehaus.activespace.cache.impl.ThreadCache;
 28   
 
 29   
 import javax.cache.Cache;
 30   
 import java.util.Collection;
 31   
 import java.util.HashMap;
 32   
 import java.util.Iterator;
 33   
 import java.util.Map;
 34   
 
 35   
 /**
 36   
  * @version $Revision: 1.9 $
 37   
  */
 38   
 public class ClusteredCacheManager extends TransactionalCacheManager {
 39   
     private static final Log log = LogFactory.getLog(ClusteredCacheManager.class);
 40   
 
 41   
     private Space space;
 42   
     private SpaceListener listener;
 43   
     private Map locks = new HashMap();
 44   
 
 45  4
     public ClusteredCacheManager(Space space) {
 46  4
         this.space = space;
 47  4
         listener = new SpaceListener() {
 48  18
             public void onEvent(SpaceEvent event) {
 49  18
                 onSpaceEvent(event);
 50   
             }
 51   
         };
 52  4
         this.space.addSpaceListener(listener);
 53   
     }
 54   
 
 55  4
     public void stop() throws Exception {
 56  4
         space.close();
 57   
     }
 58   
 
 59   
     /**
 60   
      * Performs a thread local commit of all pending transactions on all the caches
 61   
      */
 62  9
     public void commit() {
 63  9
         Collection caches = getTransactionalCaches();
 64  9
         CompositeCacheCommand command = createCommitCommand(caches);
 65  9
         if (!command.isEmpty()) {
 66  9
             String originator = command.getOriginator();
 67  9
             Object lock = getLockForOriginator(originator);
 68   
 
 69  9
             space.put(command);
 70  9
             waitUntilTransactionApplied(lock, originator);
 71   
 
 72  9
             resetLocalChanges(caches);
 73  9
             handleCommitException(caches);
 74   
         }
 75   
     }
 76   
 
 77   
 
 78  18
     protected void onSpaceEvent(SpaceEvent event) {
 79  18
         Object entry = event.getEntry();
 80  18
         if (entry instanceof CacheCommand) {
 81  18
             CacheCommand command = (CacheCommand) entry;
 82  18
             command.run(this);
 83  9
             notifyOriginator(command.getOriginator());
 84   
         }
 85   
     }
 86   
 
 87  9
     protected void notifyOriginator(String originator) {
 88  9
         Object lock = null;
 89  9
         synchronized (locks) {
 90  9
             lock = locks.get(originator);
 91   
         }
 92  9
         if (lock != null) {
 93  9
             if (log.isTraceEnabled()) {
 94  0
                 log.trace("Notifying orginator: " + originator + " with lock: " + lock);
 95   
             }
 96  9
             synchronized (lock) {
 97  9
                 lock.notifyAll();
 98   
             }
 99   
             // now lets remove the lock in case
 100   
             // the other thread misses the notify
 101  9
             synchronized (locks) {
 102  9
                 locks.remove(originator);
 103   
             }
 104   
         }
 105   
         else {
 106  0
             if (log.isTraceEnabled()) {
 107  0
                 log.trace("Igoring notification from originator from another JVM: " + originator);
 108   
             }
 109   
         }
 110   
     }
 111   
 
 112  9
     protected Object getLockForOriginator(String originator) {
 113  9
         synchronized (locks) {
 114  9
             Object answer = locks.get(originator);
 115  9
             if (answer == null) {
 116  9
                 answer = new Object();
 117  9
                 locks.put(originator, answer);
 118   
             }
 119  9
             return answer;
 120   
         }
 121   
     }
 122   
 
 123   
 
 124   
     /**
 125   
      * Wait around up to some timeout period until the local transaction
 126   
      * has been distributed around the cluster and applied locally to the backing
 127   
      * cache
 128   
      */
 129  9
     protected void waitUntilTransactionApplied(Object lock, String originator) {
 130   
         // One strategy could be to not wait at all
 131   
         // and just use the local transaction view.
 132   
         // To do this we'd need to be able to snapshot the local changes
 133   
         // to a savepoint
 134   
         // so that a rollback after a commit() would only rollback the new changes
 135   
         // since the last savepoint. The savepoints can be discarded when the transaction
 136   
         // is eventually applied to the backing cache
 137   
         //
 138   
         // however we'd never be able to fail a transaction with
 139   
         // an optimistic transaction failure - but it would be super fast :)
 140   
 
 141  9
         while (hasLockFor(originator)) {
 142  0
             try {
 143  0
                 if (log.isTraceEnabled()) {
 144  0
                     log.trace("Waiting for transaction to commit for originator: " + originator + " with lock: " + lock);
 145   
                 }
 146  0
                 System.out.println("Waiting for transaction to commit ===>");
 147  0
                 synchronized (lock) {
 148   
                     // TODO we might wanna add some timeout or retries here
 149  0
                     lock.wait(2000L);
 150   
 
 151  0
                     if (log.isTraceEnabled()) {
 152  0
                         log.trace("Transaction commited for originator: " + originator + " with lock: " + lock);
 153   
                     }
 154  0
                     return;
 155   
                 }
 156   
             }
 157   
             catch (InterruptedException e) {
 158  0
                 log.trace("Ignored interupted exception: " + e, e);
 159   
             }
 160   
         }
 161   
     }
 162   
 
 163  9
     protected boolean hasLockFor(String originator) {
 164  9
         synchronized (locks) {
 165  9
             return locks.containsKey(originator);
 166   
         }
 167   
     }
 168   
 
 169  4
     protected TransactionalCache createTransactionalCache(Cache backingCache, String name) {
 170  4
         return new ClusteredCache(space, backingCache);
 171   
     }
 172   
 }
 173