/*
 * Decompiled with CFR 0.152.
 */
package rice.p2p.past;

import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Hashtable;
import java.util.WeakHashMap;
import rice.Continuation;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.environment.params.Parameters;
import rice.p2p.commonapi.Application;
import rice.p2p.commonapi.CancellableTask;
import rice.p2p.commonapi.Endpoint;
import rice.p2p.commonapi.Id;
import rice.p2p.commonapi.IdFactory;
import rice.p2p.commonapi.IdRange;
import rice.p2p.commonapi.IdSet;
import rice.p2p.commonapi.Message;
import rice.p2p.commonapi.Node;
import rice.p2p.commonapi.NodeHandle;
import rice.p2p.commonapi.NodeHandleSet;
import rice.p2p.commonapi.RouteMessage;
import rice.p2p.commonapi.appsocket.AppSocket;
import rice.p2p.commonapi.appsocket.AppSocketReceiver;
import rice.p2p.commonapi.rawserialization.InputBuffer;
import rice.p2p.commonapi.rawserialization.MessageDeserializer;
import rice.p2p.past.Past;
import rice.p2p.past.PastContent;
import rice.p2p.past.PastContentHandle;
import rice.p2p.past.PastException;
import rice.p2p.past.PastPolicy;
import rice.p2p.past.messaging.CacheMessage;
import rice.p2p.past.messaging.ContinuationMessage;
import rice.p2p.past.messaging.FetchHandleMessage;
import rice.p2p.past.messaging.FetchMessage;
import rice.p2p.past.messaging.InsertMessage;
import rice.p2p.past.messaging.LookupHandlesMessage;
import rice.p2p.past.messaging.LookupMessage;
import rice.p2p.past.messaging.MessageLostMessage;
import rice.p2p.past.messaging.PastMessage;
import rice.p2p.past.rawserialization.DefaultSocketStrategy;
import rice.p2p.past.rawserialization.JavaPastContentDeserializer;
import rice.p2p.past.rawserialization.JavaPastContentHandleDeserializer;
import rice.p2p.past.rawserialization.PastContentDeserializer;
import rice.p2p.past.rawserialization.PastContentHandleDeserializer;
import rice.p2p.past.rawserialization.SocketStrategy;
import rice.p2p.replication.Replication;
import rice.p2p.replication.manager.ReplicationManager;
import rice.p2p.replication.manager.ReplicationManagerClient;
import rice.p2p.replication.manager.ReplicationManagerImpl;
import rice.p2p.util.MathUtils;
import rice.p2p.util.rawserialization.SimpleInputBuffer;
import rice.p2p.util.rawserialization.SimpleOutputBuffer;
import rice.persistence.Cache;
import rice.persistence.LockManager;
import rice.persistence.LockManagerImpl;
import rice.persistence.StorageManager;

public class PastImpl
implements Past,
Application,
ReplicationManagerClient {
    public final int MESSAGE_TIMEOUT;
    public final double SUCCESSFUL_INSERT_THRESHOLD;
    protected Endpoint endpoint;
    protected StorageManager storage;
    protected StorageManager trash;
    protected Cache backup;
    protected int replicationFactor;
    protected ReplicationManager replicaManager;
    protected LockManager lockManager;
    protected PastPolicy policy;
    private int id;
    private Hashtable outstanding;
    private Hashtable timers;
    protected IdFactory factory;
    protected String instance;
    public int inserts = 0;
    public int lookups = 0;
    public int fetchHandles = 0;
    public int other = 0;
    protected Environment environment;
    protected Logger logger;
    protected PastContentDeserializer contentDeserializer;
    protected PastContentHandleDeserializer contentHandleDeserializer;
    public SocketStrategy socketStrategy;
    WeakHashMap pendingSocketTransactions = new WeakHashMap();

    public PastImpl(Node node, StorageManager manager, int replicas, String instance) {
        this(node, manager, replicas, instance, new PastPolicy.DefaultPastPolicy());
    }

    public PastImpl(Node node, StorageManager manager, int replicas, String instance, PastPolicy policy) {
        this(node, manager, null, replicas, instance, policy, null);
    }

    public PastImpl(Node node, StorageManager manager, Cache backup, int replicas, String instance, PastPolicy policy, StorageManager trash) {
        this(node, manager, backup, replicas, instance, policy, trash, false);
    }

    public PastImpl(Node node, StorageManager manager, Cache backup, int replicas, String instance, PastPolicy policy, StorageManager trash, boolean useOwnSocket) {
        this(node, manager, backup, replicas, instance, policy, trash, new DefaultSocketStrategy(useOwnSocket));
    }

    public PastImpl(Node node, StorageManager manager, Cache backup, int replicas, String instance, PastPolicy policy, StorageManager trash, SocketStrategy strategy) {
        this.environment = node.getEnvironment();
        this.logger = this.environment.getLogManager().getLogger(this.getClass(), instance);
        Parameters p = this.environment.getParameters();
        this.MESSAGE_TIMEOUT = p.getInt("p2p_past_messageTimeout");
        this.SUCCESSFUL_INSERT_THRESHOLD = p.getDouble("p2p_past_successfulInsertThreshold");
        this.socketStrategy = strategy;
        this.storage = manager;
        this.backup = backup;
        this.contentDeserializer = new JavaPastContentDeserializer();
        this.contentHandleDeserializer = new JavaPastContentHandleDeserializer();
        this.endpoint = node.buildEndpoint(this, instance);
        this.endpoint.setDeserializer(new PastDeserializer());
        this.factory = node.getIdFactory();
        this.policy = policy;
        this.instance = instance;
        this.trash = trash;
        this.id = Integer.MIN_VALUE;
        this.outstanding = new Hashtable();
        this.timers = new Hashtable();
        this.replicationFactor = replicas;
        this.replicaManager = this.buildReplicationManager(node, instance);
        this.lockManager = new LockManagerImpl(this.environment);
        this.endpoint.accept(new AppSocketReceiver(){

            public void receiveSocket(AppSocket socket) {
                if (PastImpl.this.logger.level <= 500) {
                    PastImpl.this.logger.log("Received Socket from " + socket);
                }
                socket.register(true, false, 10000, this);
                PastImpl.this.endpoint.accept(this);
            }

            public void receiveSelectResult(AppSocket socket, boolean canRead, boolean canWrite) {
                if (PastImpl.this.logger.level <= 400) {
                    PastImpl.this.logger.log("Reading from " + socket);
                }
                try {
                    ByteBuffer[] bb = (ByteBuffer[])PastImpl.this.pendingSocketTransactions.get(socket);
                    if (bb == null) {
                        bb = new ByteBuffer[]{ByteBuffer.allocate(4)};
                        if (socket.read(bb, 0, 1) == -1L) {
                            this.close(socket);
                            return;
                        }
                        byte[] sizeArr = bb[0].array();
                        int size = MathUtils.byteArrayToInt(sizeArr);
                        if (PastImpl.this.logger.level <= 400) {
                            PastImpl.this.logger.log("Found object of size " + size + " from " + socket);
                        }
                        bb[0] = ByteBuffer.allocate(size);
                        PastImpl.this.pendingSocketTransactions.put(socket, bb);
                    }
                    if (socket.read(bb, 0, 1) == -1L) {
                        this.close(socket);
                    }
                    if (bb[0].remaining() == 0) {
                        PastImpl.this.pendingSocketTransactions.remove(socket);
                        if (PastImpl.this.logger.level <= 300) {
                            PastImpl.this.logger.log("bb[0].limit() " + bb[0].limit() + " bb[0].remaining() " + bb[0].remaining() + " from " + socket);
                        }
                        SimpleInputBuffer sib = new SimpleInputBuffer(bb[0].array());
                        short type = sib.readShort();
                        PastMessage result = (PastMessage)PastImpl.this.endpoint.getDeserializer().deserialize(sib, type, 0, null);
                        PastImpl.this.deliver(null, result);
                    }
                    socket.register(true, false, 10000, this);
                }
                catch (IOException ioe) {
                    this.receiveException(socket, ioe);
                }
            }

            public void receiveException(AppSocket socket, Exception e) {
                if (PastImpl.this.logger.level <= 900) {
                    PastImpl.this.logger.logException("Error receiving message", e);
                }
                this.close(socket);
            }

            public void close(AppSocket socket) {
                if (socket == null) {
                    return;
                }
                PastImpl.this.pendingSocketTransactions.remove(socket);
                socket.close();
            }
        });
        this.endpoint.register();
    }

    public String toString() {
        if (this.endpoint == null) {
            return super.toString();
        }
        return "PastImpl[" + this.endpoint.getInstance() + "]";
    }

    public Environment getEnvironment() {
        return this.environment;
    }

    protected ReplicationManager buildReplicationManager(Node node, String instance) {
        return new ReplicationManagerImpl(node, this, this.replicationFactor, instance);
    }

    public Continuation[] getOutstandingMessages() {
        return this.outstanding.values().toArray(new Continuation[0]);
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    protected synchronized int getUID() {
        return this.id++;
    }

    protected Continuation getResponseContinuation(final PastMessage msg) {
        if (this.logger.level <= 400) {
            this.logger.log("Getting the Continuation to respond to the message " + msg);
        }
        final ContinuationMessage cmsg = (ContinuationMessage)msg;
        return new Continuation(){

            public void receiveResult(Object o) {
                cmsg.receiveResult(o);
                PastImpl.this.endpoint.route(null, cmsg, msg.getSource());
            }

            public void receiveException(Exception e) {
                cmsg.receiveException(e);
                PastImpl.this.endpoint.route(null, cmsg, msg.getSource());
            }
        };
    }

    protected Continuation getFetchResponseContinuation(final PastMessage msg) {
        final ContinuationMessage cmsg = (ContinuationMessage)msg;
        return new Continuation(){

            public void receiveResult(Object o) {
                cmsg.receiveResult(o);
                PastContent content = (PastContent)o;
                if (PastImpl.this.socketStrategy.sendAlongSocket(2, content)) {
                    PastImpl.this.sendViaSocket(msg.getSource(), cmsg, null);
                } else {
                    PastImpl.this.endpoint.route(null, cmsg, msg.getSource());
                }
            }

            public void receiveException(Exception e) {
                cmsg.receiveException(e);
                PastImpl.this.endpoint.route(null, cmsg, msg.getSource());
            }
        };
    }

    private void sendViaSocket(final NodeHandle handle, final PastMessage m, final Continuation c) {
        SimpleOutputBuffer sob;
        block5: {
            if (c != null) {
                CancellableTask timer = this.endpoint.scheduleMessage(new MessageLostMessage(m.getUID(), this.getLocalNodeHandle(), null, m, handle), this.MESSAGE_TIMEOUT);
                this.insertPending(m.getUID(), timer, c);
            }
            sob = new SimpleOutputBuffer();
            try {
                sob.writeInt(0);
                sob.writeShort(m.getType());
                m.serialize(sob);
            }
            catch (IOException ioe) {
                if (c == null) break block5;
                c.receiveException(ioe);
            }
        }
        int size = sob.getWritten() - 4;
        if (this.logger.level <= 400) {
            this.logger.log("Sending size of " + size + " to " + handle + " to send " + m);
        }
        byte[] bytes = sob.getBytes();
        MathUtils.intToByteArray(size, bytes, 0);
        final ByteBuffer[] bb = new ByteBuffer[]{ByteBuffer.wrap(bytes, 0, sob.getWritten())};
        if (this.logger.level <= 500) {
            this.logger.log("Opening socket to " + handle + " to send " + m);
        }
        this.endpoint.connect(handle, new AppSocketReceiver(){

            public void receiveSocket(AppSocket socket) {
                if (PastImpl.this.logger.level <= 400) {
                    PastImpl.this.logger.log("Opened socket to " + handle + ":" + socket + " to send " + m);
                }
                socket.register(false, true, 10000, this);
            }

            public void receiveSelectResult(AppSocket socket, boolean canRead, boolean canWrite) {
                if (PastImpl.this.logger.level <= 300) {
                    PastImpl.this.logger.log("Writing to " + handle + ":" + socket + " to send " + m);
                }
                try {
                    socket.write(bb, 0, 1);
                }
                catch (IOException ioe) {
                    if (c != null) {
                        c.receiveException(ioe);
                    } else if (PastImpl.this.logger.level <= 900) {
                        PastImpl.this.logger.logException("Error sending " + m, ioe);
                    }
                    return;
                }
                if (bb[0].remaining() > 0) {
                    socket.register(false, true, 10000, this);
                } else {
                    socket.close();
                }
            }

            public void receiveException(AppSocket socket, Exception e) {
                if (c != null) {
                    c.receiveException(e);
                }
            }
        }, 10000);
    }

    protected void sendRequest(Id id, PastMessage message, Continuation command) {
        this.sendRequest(id, message, null, command);
    }

    protected void sendRequest(NodeHandle handle, PastMessage message, Continuation command) {
        this.sendRequest(null, message, handle, command);
    }

    protected void sendRequest(Id id, PastMessage message, NodeHandle hint, Continuation command) {
        if (this.logger.level <= 400) {
            this.logger.log("Sending request message " + message + " {" + message.getUID() + "} to id " + id + " via " + hint);
        }
        CancellableTask timer = this.endpoint.scheduleMessage(new MessageLostMessage(message.getUID(), this.getLocalNodeHandle(), id, message, hint), this.MESSAGE_TIMEOUT);
        this.insertPending(message.getUID(), timer, command);
        this.endpoint.route(id, message, hint);
    }

    private void insertPending(int uid, CancellableTask timer, Continuation command) {
        if (this.logger.level <= 400) {
            this.logger.log("Loading continuation " + uid + " into pending table");
        }
        this.timers.put(new Integer(uid), timer);
        this.outstanding.put(new Integer(uid), command);
    }

    private Continuation removePending(int uid) {
        CancellableTask timer;
        if (this.logger.level <= 400) {
            this.logger.log("Removing and returning continuation " + uid + " from pending table");
        }
        if ((timer = (CancellableTask)this.timers.remove(new Integer(uid))) != null) {
            timer.cancel();
        }
        return (Continuation)this.outstanding.remove(new Integer(uid));
    }

    private void handleResponse(PastMessage message) {
        Continuation command;
        if (this.logger.level <= 500) {
            this.logger.log("handling reponse message " + message + " from the request");
        }
        if ((command = this.removePending(message.getUID())) != null) {
            message.returnResponse(command, this.environment, this.instance);
        }
    }

    protected void getHandles(Id id, final int max, Continuation command) {
        NodeHandleSet set = this.endpoint.replicaSet(id, max);
        if (set.size() == max) {
            command.receiveResult(set);
        } else {
            this.sendRequest(id, (PastMessage)new LookupHandlesMessage(this.getUID(), id, max, this.getLocalNodeHandle(), id), (Continuation)new Continuation.StandardContinuation(command){

                public void receiveResult(Object o) {
                    NodeHandleSet replicas = (NodeHandleSet)o;
                    if (Math.min(max, PastImpl.this.endpoint.replicaSet(PastImpl.this.endpoint.getLocalNodeHandle().getId(), PastImpl.this.replicationFactor + 1).size()) > replicas.size()) {
                        this.parent.receiveException(new PastException("Only received " + replicas.size() + " replicas - cannot insert as we know about more nodes."));
                    } else {
                        this.parent.receiveResult(replicas);
                    }
                }
            });
        }
    }

    private void cache(PastContent content) {
        this.cache(content, new Continuation.ListenerContinuation("Caching of " + content, this.environment));
    }

    public void cache(PastContent content, Continuation command) {
        if (this.logger.level <= 400) {
            this.logger.log("Inserting PastContent object " + content + " into cache");
        }
        if (content != null && !content.isMutable()) {
            this.storage.cache(content.getId(), null, content, command);
        } else {
            command.receiveResult(new Boolean(true));
        }
    }

    protected void doInsert(final Id id, final MessageBuilder builder, Continuation command, final boolean useSocket) {
        this.getHandles(id, this.replicationFactor + 1, new Continuation.StandardContinuation(command){

            public void receiveResult(Object o) {
                NodeHandleSet replicas = (NodeHandleSet)o;
                if (PastImpl.this.logger.level <= 400) {
                    PastImpl.this.logger.log("Received replicas " + replicas + " for id " + id);
                }
                Continuation.MultiContinuation multi = new Continuation.MultiContinuation(this.parent, replicas.size()){

                    public boolean isDone() throws Exception {
                        int i;
                        int numSuccess = 0;
                        for (i = 0; i < this.haveResult.length; ++i) {
                            if (!this.haveResult[i] || !Boolean.TRUE.equals(this.result[i])) continue;
                            ++numSuccess;
                        }
                        if ((double)numSuccess >= PastImpl.this.SUCCESSFUL_INSERT_THRESHOLD * (double)this.haveResult.length) {
                            return true;
                        }
                        if (super.isDone()) {
                            for (i = 0; i < this.result.length; ++i) {
                                if (!(this.result[i] instanceof Exception) || PastImpl.this.logger.level > 900) continue;
                                PastImpl.this.logger.logException("result[" + i + "]:", (Exception)this.result[i]);
                            }
                            throw new PastException("Had only " + numSuccess + " successful inserts out of " + this.result.length + " - aborting.");
                        }
                        return false;
                    }

                    public Object getResult() {
                        Boolean[] b = new Boolean[this.result.length];
                        for (int i = 0; i < b.length; ++i) {
                            b[i] = new Boolean(this.result[i] == null || Boolean.TRUE.equals(this.result[i]));
                        }
                        return b;
                    }
                };
                for (int i = 0; i < replicas.size(); ++i) {
                    NodeHandle handle = replicas.getHandle(i);
                    PastMessage m = builder.buildMessage();
                    Continuation.NamedContinuation c = new Continuation.NamedContinuation("InsertMessage to " + replicas.getHandle(i) + " for " + id, multi.getSubContinuation(i));
                    if (useSocket) {
                        PastImpl.this.sendViaSocket(handle, m, c);
                        continue;
                    }
                    PastImpl.this.sendRequest(handle, m, (Continuation)c);
                }
            }
        });
    }

    public void insert(final PastContent obj, Continuation command) {
        if (this.logger.level <= 400) {
            this.logger.log("Inserting the object " + obj + " with the id " + obj.getId());
        }
        if (this.logger.level <= 300) {
            this.logger.log(" Inserting data of class " + obj.getClass().getName() + " under " + obj.getId().toStringFull());
        }
        this.doInsert(obj.getId(), new MessageBuilder(){

            public PastMessage buildMessage() {
                return new InsertMessage(PastImpl.this.getUID(), obj, PastImpl.this.getLocalNodeHandle(), obj.getId());
            }
        }, new Continuation.StandardContinuation(command){

            public void receiveResult(final Object array) {
                PastImpl.this.cache(obj, new Continuation.SimpleContinuation(){

                    public void receiveResult(Object o) {
                        parent.receiveResult(array);
                    }
                });
            }
        }, this.socketStrategy.sendAlongSocket(1, obj));
    }

    public void lookup(Id id, Continuation command) {
        this.lookup(id, true, command);
    }

    public void lookup(final Id id, final boolean cache, final Continuation command) {
        if (this.logger.level <= 400) {
            this.logger.log(" Performing lookup on " + id.toStringFull());
        }
        this.storage.getObject(id, new Continuation.StandardContinuation(command){

            public void receiveResult(Object o) {
                if (o != null) {
                    command.receiveResult(o);
                } else {
                    PastImpl.this.sendRequest(id, (PastMessage)new LookupMessage(PastImpl.this.getUID(), id, PastImpl.this.getLocalNodeHandle(), id), (Continuation)new Continuation.NamedContinuation("LookupMessage for " + id, this){

                        public void receiveResult(final Object o) {
                            if (o != null) {
                                if (cache) {
                                    PastImpl.this.cache((PastContent)o, new Continuation.SimpleContinuation(){

                                        public void receiveResult(Object object) {
                                            command.receiveResult(o);
                                        }
                                    });
                                } else {
                                    command.receiveResult(o);
                                }
                            } else {
                                PastImpl.this.lookupHandles(id, PastImpl.this.replicationFactor + 1, new Continuation(){

                                    public void receiveResult(Object o) {
                                        PastContentHandle[] handles = (PastContentHandle[])o;
                                        for (int i = 0; i < handles.length; ++i) {
                                            if (handles[i] == null) continue;
                                            PastImpl.this.fetch(handles[i], new Continuation.StandardContinuation(parent){

                                                public void receiveResult(final Object o) {
                                                    if (cache) {
                                                        PastImpl.this.cache((PastContent)o, new Continuation.SimpleContinuation(){

                                                            public void receiveResult(Object object) {
                                                                command.receiveResult(o);
                                                            }
                                                        });
                                                    } else {
                                                        command.receiveResult(o);
                                                    }
                                                }
                                            });
                                            return;
                                        }
                                        command.receiveResult(null);
                                    }

                                    public void receiveException(Exception e) {
                                        command.receiveException(e);
                                    }
                                });
                            }
                        }

                        public void receiveException(Exception e) {
                            this.receiveResult(null);
                        }
                    });
                }
            }
        });
    }

    public void lookupHandles(final Id id, int max, Continuation command) {
        if (this.logger.level <= 500) {
            this.logger.log("Retrieving handles of up to " + max + " replicas of the object stored in Past with id " + id);
        }
        if (this.logger.level <= 400) {
            this.logger.log("Fetching up to " + max + " handles of " + id.toStringFull());
        }
        this.getHandles(id, max, new Continuation.StandardContinuation(command){

            public void receiveResult(Object o) {
                NodeHandleSet replicas = (NodeHandleSet)o;
                if (PastImpl.this.logger.level <= 400) {
                    PastImpl.this.logger.log("Receiving replicas " + replicas + " for lookup Id " + id);
                }
                Continuation.MultiContinuation multi = new Continuation.MultiContinuation(this.parent, replicas.size()){

                    public Object getResult() {
                        PastContentHandle[] p = new PastContentHandle[this.result.length];
                        for (int i = 0; i < this.result.length; ++i) {
                            if (!(this.result[i] instanceof PastContentHandle)) continue;
                            p[i] = (PastContentHandle)this.result[i];
                        }
                        return p;
                    }
                };
                for (int i = 0; i < replicas.size(); ++i) {
                    PastImpl.this.lookupHandle(id, replicas.getHandle(i), multi.getSubContinuation(i));
                }
            }
        });
    }

    public void lookupHandle(Id id, NodeHandle handle, Continuation command) {
        if (this.logger.level <= 500) {
            this.logger.log("Retrieving handle for id " + id + " from node " + handle);
        }
        this.sendRequest(handle, (PastMessage)new FetchHandleMessage(this.getUID(), id, this.getLocalNodeHandle(), handle.getId()), (Continuation)new Continuation.NamedContinuation("FetchHandleMessage to " + handle + " for " + id, command));
    }

    public void fetch(PastContentHandle handle, Continuation command) {
        if (this.logger.level <= 500) {
            this.logger.log("Retrieving object associated with content handle " + handle);
        }
        if (this.logger.level <= 400) {
            this.logger.log("Fetching object under id " + handle.getId().toStringFull() + " on " + handle.getNodeHandle());
        }
        NodeHandle han = handle.getNodeHandle();
        this.sendRequest(han, (PastMessage)new FetchMessage(this.getUID(), handle, this.getLocalNodeHandle(), han.getId()), (Continuation)new Continuation.NamedContinuation("FetchMessage to " + handle.getNodeHandle() + " for " + handle.getId(), command));
    }

    public NodeHandle getLocalNodeHandle() {
        return this.endpoint.getLocalNodeHandle();
    }

    public int getReplicationFactor() {
        return this.replicationFactor;
    }

    public boolean forward(RouteMessage message) {
        LookupHandlesMessage lmsg;
        Message internal;
        try {
            internal = message.getMessage(this.endpoint.getDeserializer());
        }
        catch (IOException ioe) {
            throw new RuntimeException(ioe);
        }
        if (internal instanceof LookupMessage) {
            LookupMessage lmsg2 = (LookupMessage)internal;
            Id id = lmsg2.getId();
            if (!lmsg2.isResponse()) {
                if (this.logger.level <= 400) {
                    this.logger.log("Lookup message " + lmsg2 + " is a request; look in the cache");
                }
                if (this.storage.exists(id)) {
                    if (this.logger.level <= 500) {
                        this.logger.log("Request for " + id + " satisfied locally - responding");
                    }
                    this.deliver(this.endpoint.getId(), lmsg2);
                    return false;
                }
            }
        } else if (internal instanceof LookupHandlesMessage && !(lmsg = (LookupHandlesMessage)internal).isResponse() && this.endpoint.replicaSet(lmsg.getId(), lmsg.getMax()).size() == lmsg.getMax()) {
            if (this.logger.level <= 500) {
                this.logger.log("Hijacking lookup handles request for " + lmsg.getId());
            }
            this.deliver(this.endpoint.getId(), lmsg);
            return false;
        }
        return true;
    }

    public void deliver(Id id, Message message) {
        final PastMessage msg = (PastMessage)message;
        if (msg.isResponse()) {
            this.handleResponse((PastMessage)message);
        } else {
            if (this.logger.level <= 800) {
                this.logger.log("Received message " + message + " with destination " + id);
            }
            if (msg instanceof InsertMessage) {
                final InsertMessage imsg = (InsertMessage)msg;
                if (this.policy.allowInsert(imsg.getContent())) {
                    ++this.inserts;
                    final Id msgid = imsg.getContent().getId();
                    this.lockManager.lock(msgid, new Continuation.StandardContinuation(this.getResponseContinuation(msg)){

                        public void receiveResult(Object result) {
                            PastImpl.this.storage.getObject(msgid, new Continuation.StandardContinuation(this.parent){

                                public void receiveResult(Object o) {
                                    try {
                                        PastContent content = imsg.getContent().checkInsert(msgid, (PastContent)o);
                                        PastImpl.this.storage.store(msgid, null, content, new Continuation.StandardContinuation(this.parent){

                                            public void receiveResult(Object result) {
                                                PastImpl.this.getResponseContinuation(msg).receiveResult(result);
                                                PastImpl.this.lockManager.unlock(msgid);
                                            }
                                        });
                                    }
                                    catch (PastException e) {
                                        this.parent.receiveException(e);
                                    }
                                }
                            });
                        }
                    });
                } else {
                    this.getResponseContinuation(msg).receiveResult(new Boolean(false));
                }
            } else if (msg instanceof LookupMessage) {
                final LookupMessage lmsg = (LookupMessage)msg;
                ++this.lookups;
                this.storage.getObject(lmsg.getId(), new Continuation.StandardContinuation(this.getResponseContinuation(lmsg)){

                    public void receiveResult(Object o) {
                        if (PastImpl.this.logger.level <= 500) {
                            PastImpl.this.logger.log("Received object " + o + " for id " + lmsg.getId());
                        }
                        this.parent.receiveResult(o);
                        if (lmsg.getPreviousNodeHandle() != null && o != null && !((PastContent)o).isMutable()) {
                            NodeHandle handle = lmsg.getPreviousNodeHandle();
                            if (PastImpl.this.logger.level <= 500) {
                                PastImpl.this.logger.log("Pushing cached copy of " + ((PastContent)o).getId() + " to " + handle);
                            }
                            CacheMessage cacheMessage = new CacheMessage(PastImpl.this.getUID(), (PastContent)o, PastImpl.this.getLocalNodeHandle(), handle.getId());
                        }
                    }
                });
            } else if (msg instanceof LookupHandlesMessage) {
                LookupHandlesMessage lmsg = (LookupHandlesMessage)msg;
                NodeHandleSet set = this.endpoint.replicaSet(lmsg.getId(), lmsg.getMax());
                if (this.logger.level <= 400) {
                    this.logger.log("Returning replica set " + set + " for lookup handles of id " + lmsg.getId() + " max " + lmsg.getMax() + " at " + this.endpoint.getId());
                }
                this.getResponseContinuation(msg).receiveResult(set);
            } else if (msg instanceof FetchMessage) {
                FetchMessage fmsg = (FetchMessage)msg;
                ++this.lookups;
                Continuation c = this.getFetchResponseContinuation(msg);
                this.storage.getObject(fmsg.getHandle().getId(), c);
            } else if (msg instanceof FetchHandleMessage) {
                final FetchHandleMessage fmsg = (FetchHandleMessage)msg;
                ++this.fetchHandles;
                this.storage.getObject(fmsg.getId(), new Continuation.StandardContinuation(this.getResponseContinuation(msg)){

                    public void receiveResult(Object o) {
                        PastContent content = (PastContent)o;
                        if (content != null) {
                            if (PastImpl.this.logger.level <= 500) {
                                PastImpl.this.logger.log("Retrieved data for fetch handles of id " + fmsg.getId());
                            }
                            this.parent.receiveResult(content.getHandle(PastImpl.this));
                        } else {
                            this.parent.receiveResult(null);
                        }
                    }
                });
            } else if (msg instanceof CacheMessage) {
                this.cache(((CacheMessage)msg).getContent());
            } else if (this.logger.level <= 1000) {
                this.logger.log("ERROR - Received message " + msg + "of unknown type.");
            }
        }
    }

    public void update(NodeHandle handle, boolean joined) {
    }

    public void fetch(final Id id, NodeHandle hint, Continuation command) {
        if (this.logger.level <= 400) {
            this.logger.log("Sending out replication fetch request for the id " + id);
        }
        this.policy.fetch(id, hint, this.backup, this, new Continuation.StandardContinuation(command){

            public void receiveResult(Object o) {
                if (o == null) {
                    if (PastImpl.this.logger.level <= 900) {
                        PastImpl.this.logger.log("Could not fetch id " + id + " - policy returned null in namespace " + PastImpl.this.instance);
                    }
                    this.parent.receiveResult(new Boolean(false));
                } else {
                    if (PastImpl.this.logger.level <= 300) {
                        PastImpl.this.logger.log("inserting replica of id " + id);
                    }
                    if (!(o instanceof PastContent) && PastImpl.this.logger.level <= 900) {
                        PastImpl.this.logger.log("ERROR! Not PastContent " + o.getClass().getName() + " " + o);
                    }
                    PastImpl.this.storage.getStorage().store(((PastContent)o).getId(), null, (PastContent)o, this.parent);
                }
            }
        });
    }

    public void remove(final Id id, Continuation command) {
        if (this.backup != null) {
            this.storage.getObject(id, new Continuation.StandardContinuation(command){

                public void receiveResult(Object o) {
                    PastImpl.this.backup.cache(id, PastImpl.this.storage.getMetadata(id), (Serializable)o, new Continuation.StandardContinuation(this.parent){

                        public void receiveResult(Object o) {
                            PastImpl.this.storage.unstore(id, this.parent);
                        }
                    });
                }
            });
        } else {
            this.storage.unstore(id, command);
        }
    }

    public IdSet scan(IdRange range) {
        return this.storage.getStorage().scan(range);
    }

    public IdSet scan() {
        return this.storage.getStorage().scan();
    }

    public boolean exists(Id id) {
        return this.storage.getStorage().exists(id);
    }

    public void existsInOverlay(Id id, Continuation command) {
        this.lookupHandles(id, this.replicationFactor + 1, new Continuation.StandardContinuation(command){

            public void receiveResult(Object result) {
                Object[] results = (Object[])result;
                for (int i = 0; i < results.length; ++i) {
                    if (!(results[i] instanceof PastContentHandle)) continue;
                    this.parent.receiveResult(Boolean.TRUE);
                    return;
                }
                this.parent.receiveResult(Boolean.FALSE);
            }
        });
    }

    public void reInsert(Id id, Continuation command) {
        this.storage.getObject(id, new Continuation.StandardContinuation(command){

            public void receiveResult(Object o) {
                PastImpl.this.insert((PastContent)o, new Continuation.StandardContinuation(this.parent){

                    public void receiveResult(Object result) {
                        Boolean[] results = (Boolean[])result;
                        for (int i = 0; i < results.length; ++i) {
                            if (!results[i].booleanValue()) continue;
                            this.parent.receiveResult(Boolean.TRUE);
                            return;
                        }
                        this.parent.receiveResult(Boolean.FALSE);
                    }
                });
            }
        });
    }

    public Replication getReplication() {
        return this.replicaManager.getReplication();
    }

    public StorageManager getStorageManager() {
        return this.storage;
    }

    public String getInstance() {
        return this.instance;
    }

    public void setContentDeserializer(PastContentDeserializer deserializer) {
        this.contentDeserializer = deserializer;
    }

    public void setContentHandleDeserializer(PastContentHandleDeserializer deserializer) {
        this.contentHandleDeserializer = deserializer;
    }

    public static interface MessageBuilder {
        public PastMessage buildMessage();
    }

    protected class PastDeserializer
    implements MessageDeserializer {
        protected PastDeserializer() {
        }

        public Message deserialize(InputBuffer buf, short type, int priority, NodeHandle sender) throws IOException {
            try {
                switch (type) {
                    case 1: {
                        return CacheMessage.build(buf, PastImpl.this.endpoint, PastImpl.this.contentDeserializer);
                    }
                    case 2: {
                        return FetchHandleMessage.build(buf, PastImpl.this.endpoint, PastImpl.this.contentHandleDeserializer);
                    }
                    case 3: {
                        return FetchMessage.build(buf, PastImpl.this.endpoint, PastImpl.this.contentDeserializer, PastImpl.this.contentHandleDeserializer);
                    }
                    case 4: {
                        return InsertMessage.build(buf, PastImpl.this.endpoint, PastImpl.this.contentDeserializer);
                    }
                    case 5: {
                        return LookupHandlesMessage.build(buf, PastImpl.this.endpoint);
                    }
                    case 6: {
                        return LookupMessage.build(buf, PastImpl.this.endpoint, PastImpl.this.contentDeserializer);
                    }
                }
            }
            catch (IOException e) {
                if (PastImpl.this.logger.level <= 1000) {
                    PastImpl.this.logger.log("Exception in deserializer in " + PastImpl.this.endpoint.toString() + ":" + PastImpl.this.instance + " " + e);
                }
                throw e;
            }
            throw new IllegalArgumentException("Unknown type:" + type + " in " + PastImpl.this.toString());
        }
    }
}

