/*
 * Decompiled with CFR 0.152.
 */
package org.mpisws.p2p.transport.identity;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.mpisws.p2p.transport.ErrorHandler;
import org.mpisws.p2p.transport.MessageCallback;
import org.mpisws.p2p.transport.MessageRequestHandle;
import org.mpisws.p2p.transport.P2PSocket;
import org.mpisws.p2p.transport.P2PSocketReceiver;
import org.mpisws.p2p.transport.SocketCallback;
import org.mpisws.p2p.transport.SocketRequestHandle;
import org.mpisws.p2p.transport.TransportLayer;
import org.mpisws.p2p.transport.TransportLayerCallback;
import org.mpisws.p2p.transport.exception.NodeIsFaultyException;
import org.mpisws.p2p.transport.identity.IdentitySerializer;
import org.mpisws.p2p.transport.identity.LowerIdentity;
import org.mpisws.p2p.transport.identity.NodeChangeStrategy;
import org.mpisws.p2p.transport.identity.SanityChecker;
import org.mpisws.p2p.transport.identity.UpperIdentity;
import org.mpisws.p2p.transport.liveness.LivenessListener;
import org.mpisws.p2p.transport.liveness.LivenessProvider;
import org.mpisws.p2p.transport.proximity.ProximityListener;
import org.mpisws.p2p.transport.proximity.ProximityProvider;
import org.mpisws.p2p.transport.util.DefaultErrorHandler;
import org.mpisws.p2p.transport.util.InsufficientBytesException;
import org.mpisws.p2p.transport.util.MessageRequestHandleImpl;
import org.mpisws.p2p.transport.util.OptionsFactory;
import org.mpisws.p2p.transport.util.SocketInputBuffer;
import org.mpisws.p2p.transport.util.SocketRequestHandleImpl;
import org.mpisws.p2p.transport.util.SocketWrapperSocket;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.p2p.commonapi.Cancellable;
import rice.p2p.util.rawserialization.SimpleInputBuffer;
import rice.p2p.util.rawserialization.SimpleOutputBuffer;
import rice.pastry.socket.SocketNodeHandle;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class IdentityImpl<UpperIdentifier, MiddleIdentifier, UpperMsgType, LowerIdentifier> {
    protected byte[] localIdentifier;
    protected LowerIdentityImpl lower;
    protected UpperIdentityImpl upper;
    protected Map<UpperIdentifier, Set<IdentityMessageHandle>> pendingMessages;
    protected Set<UpperIdentifier> deadForever;
    protected Environment environment;
    protected Logger logger;
    protected IdentitySerializer<UpperIdentifier, MiddleIdentifier, LowerIdentifier> serializer;
    protected NodeChangeStrategy<UpperIdentifier, LowerIdentifier> nodeChangeStrategy;
    protected SanityChecker<UpperIdentifier, MiddleIdentifier> sanityChecker;
    protected Map<LowerIdentifier, UpperIdentifier> bindings;
    Map<Integer, UpperIdentifier> intendedDest;
    Map<UpperIdentifier, Integer> reverseIntendedDest;
    int intendedDestCtr = Integer.MIN_VALUE;
    public static final byte SUCCESS = 1;
    public static final byte FAILURE = 0;
    public static final byte NO_ID = 2;
    public static final byte NORMAL = 1;
    public static final byte INCORRECT_IDENTITY = 0;
    public static final String NODE_HANDLE_TO_INDEX = "identity.node_handle_to_index";
    public static final String NODE_HANDLE_FROM_INDEX = "identity.node_handle_to_index";

    public IdentityImpl(byte[] localIdentifier, IdentitySerializer<UpperIdentifier, MiddleIdentifier, LowerIdentifier> serializer, NodeChangeStrategy<UpperIdentifier, LowerIdentifier> nodeChangeStrategy, SanityChecker<UpperIdentifier, MiddleIdentifier> sanityChecker, Environment environment) {
        this.logger = environment.getLogManager().getLogger(IdentityImpl.class, null);
        this.sanityChecker = sanityChecker;
        if (sanityChecker == null) {
            throw new IllegalArgumentException("SanityChecker is null");
        }
        this.localIdentifier = localIdentifier;
        this.serializer = serializer;
        this.nodeChangeStrategy = nodeChangeStrategy;
        this.environment = environment;
        this.pendingMessages = new HashMap<UpperIdentifier, Set<IdentityMessageHandle>>();
        this.deadForever = Collections.synchronizedSet(new HashSet());
        this.intendedDest = new HashMap<Integer, UpperIdentifier>();
        this.reverseIntendedDest = new HashMap<UpperIdentifier, Integer>();
        this.bindings = new HashMap<LowerIdentifier, UpperIdentifier>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addPendingMessage(UpperIdentifier i, IdentityMessageHandle ret) {
        if (this.logger.level <= 400) {
            this.logger.log("addPendingMessage(" + i + "," + ret + ")");
        }
        Map<UpperIdentifier, Set<IdentityMessageHandle>> map = this.pendingMessages;
        synchronized (map) {
            Set<IdentityMessageHandle> set = this.pendingMessages.get(i);
            if (set == null) {
                set = new HashSet<IdentityMessageHandle>();
                this.pendingMessages.put(i, set);
            }
            set.add(ret);
        }
    }

    public void setDeadForever(UpperIdentifier i, Map<String, Integer> options) {
        if (this.deadForever.contains(i)) {
            return;
        }
        if (this.logger.level <= 800) {
            this.logger.log("setDeadForever(" + i + ")");
        }
        this.deadForever.add(i);
        this.upper.notifyLivenessListeners(i, 4, options);
        Set<IdentityMessageHandle> cancelMe = this.pendingMessages.remove(i);
        if (cancelMe != null) {
            for (IdentityMessageHandle msg : cancelMe) {
                msg.deadForever();
            }
        }
        this.upper.clearState(i);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int addIntendedDest(UpperIdentifier i) {
        Map<Integer, UpperIdentifier> map = this.intendedDest;
        synchronized (map) {
            if (this.reverseIntendedDest.containsKey(i)) {
                return this.reverseIntendedDest.get(i);
            }
            this.intendedDest.put(this.intendedDestCtr, i);
            this.reverseIntendedDest.put(i, this.intendedDestCtr);
            ++this.intendedDestCtr;
            if (this.logger.level <= 400) {
                SocketNodeHandle snh;
                this.logger.log("addIntendedDest(" + i + " hash:" + i.hashCode() + "):" + (this.intendedDestCtr - 1));
                if (i instanceof SocketNodeHandle && (snh = (SocketNodeHandle)i).getId().toString().startsWith("<0x000")) {
                    this.logger.logException("StackTrace snh:" + i + " epoch:" + snh.getEpoch(), new Exception("foo"));
                }
            }
            return this.intendedDestCtr - 1;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean addBinding(UpperIdentifier u, LowerIdentifier l, Map<String, Integer> options) {
        Map<LowerIdentifier, UpperIdentifier> map = this.bindings;
        synchronized (map) {
            if (this.deadForever.contains(u)) {
                return false;
            }
            UpperIdentifier old = this.bindings.get(l);
            if (old == null) {
                this.bindings.put(l, u);
                return true;
            }
            if (old.equals(u)) {
                return true;
            }
            if (this.destinationChanged(old, u, l, options)) {
                this.bindings.put(l, u);
                return true;
            }
            if (this.logger.level <= 900) {
                this.logger.log("The nodeChangeStrategy found identifier " + u + " to be stale.  Should be using " + old);
            }
            this.setDeadForever(u, options);
            return false;
        }
    }

    public boolean destinationChanged(UpperIdentifier oldDest, UpperIdentifier newDest, LowerIdentifier i, Map<String, Integer> options) {
        if (oldDest.equals(newDest)) {
            return true;
        }
        if (this.deadForever.contains(oldDest)) {
            return true;
        }
        if (this.nodeChangeStrategy.canChange(oldDest, newDest, i)) {
            this.setDeadForever(oldDest, options);
            return true;
        }
        return false;
    }

    public void initLowerLayer(TransportLayer<LowerIdentifier, ByteBuffer> tl, ErrorHandler<LowerIdentifier> handler) {
        this.lower = new LowerIdentityImpl(tl, handler);
    }

    public LowerIdentity<LowerIdentifier, ByteBuffer> getLowerIdentity() {
        return this.lower;
    }

    public UpperIdentity<UpperIdentifier, UpperMsgType> getUpperIdentity() {
        return this.upper;
    }

    public void initUpperLayer(UpperIdentifier localIdentifier, TransportLayer<MiddleIdentifier, UpperMsgType> tl, LivenessProvider<MiddleIdentifier> live, ProximityProvider<MiddleIdentifier> prox) {
        if (this.upper != null) {
            throw new IllegalStateException("upper already initialized:" + this.upper);
        }
        this.upper = new UpperIdentityImpl(localIdentifier, tl, live, prox);
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class IdentityMessageHandle
    implements MessageRequestHandle<UpperIdentifier, UpperMsgType>,
    MessageCallback<MiddleIdentifier, UpperMsgType> {
        private Cancellable subCancellable;
        private UpperIdentifier identifier;
        private UpperMsgType message;
        private Map<String, Integer> options;
        private MessageCallback<UpperIdentifier, UpperMsgType> deliverAckToMe;

        public IdentityMessageHandle(UpperIdentifier identifier, UpperMsgType message, Map<String, Integer> options, MessageCallback<UpperIdentifier, UpperMsgType> deliverAckToMe) {
            this.identifier = identifier;
            this.message = message;
            this.options = options;
            this.deliverAckToMe = deliverAckToMe;
        }

        @Override
        public UpperIdentifier getIdentifier() {
            return this.identifier;
        }

        @Override
        public UpperMsgType getMessage() {
            return this.message;
        }

        @Override
        public Map<String, Integer> getOptions() {
            return this.options;
        }

        void deadForever() {
            this.cancel();
            if (this.deliverAckToMe != null) {
                this.deliverAckToMe.sendFailed(this, new NodeIsFaultyException(this.identifier, this.message));
            }
        }

        @Override
        public boolean cancel() {
            IdentityImpl.this.pendingMessages.get(this.identifier).remove(this);
            return this.subCancellable.cancel();
        }

        public void setSubCancellable(Cancellable cancellable) {
            this.subCancellable = cancellable;
        }

        public Cancellable getSubCancellable() {
            return this.subCancellable;
        }

        @Override
        public void ack(MessageRequestHandle<MiddleIdentifier, UpperMsgType> msg) {
            IdentityImpl.this.pendingMessages.get(this.identifier).remove(this);
            if (this.deliverAckToMe != null) {
                this.deliverAckToMe.ack(this);
            }
        }

        @Override
        public void sendFailed(MessageRequestHandle<MiddleIdentifier, UpperMsgType> msg, IOException reason) {
            IdentityImpl.this.pendingMessages.get(this.identifier).remove(this);
            if (this.deliverAckToMe != null) {
                this.deliverAckToMe.sendFailed(this, reason);
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class UpperIdentityImpl
    implements UpperIdentity<UpperIdentifier, UpperMsgType>,
    TransportLayerCallback<MiddleIdentifier, UpperMsgType>,
    LivenessListener<MiddleIdentifier>,
    ProximityListener<MiddleIdentifier> {
        TransportLayer<MiddleIdentifier, UpperMsgType> tl;
        ProximityProvider<MiddleIdentifier> prox;
        private ErrorHandler<UpperIdentifier> errorHandler;
        private TransportLayerCallback<UpperIdentifier, UpperMsgType> callback;
        Logger logger;
        private LivenessProvider<MiddleIdentifier> livenessProvider;
        private UpperIdentifier localIdentifier;
        List<LivenessListener<UpperIdentifier>> livenessListeners = new ArrayList();
        Collection<ProximityListener<UpperIdentifier>> proxListeners = new ArrayList();

        public UpperIdentityImpl(UpperIdentifier local, TransportLayer<MiddleIdentifier, UpperMsgType> tl, LivenessProvider<MiddleIdentifier> live, ProximityProvider<MiddleIdentifier> prox) {
            this.localIdentifier = local;
            this.tl = tl;
            this.livenessProvider = live;
            this.prox = prox;
            this.logger = IdentityImpl.this.environment.getLogManager().getLogger(IdentityImpl.class, "upper");
            tl.setCallback(this);
            this.livenessProvider.addLivenessListener(this);
            prox.addProximityListener(this);
        }

        @Override
        public void clearState(UpperIdentifier i) {
            this.livenessProvider.clearState(IdentityImpl.this.serializer.translateDown(i));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public SocketRequestHandle<UpperIdentifier> openSocket(final UpperIdentifier i, final SocketCallback<UpperIdentifier> deliverSocketToMe, final Map<String, Integer> options) {
            if (this.logger.level <= 500) {
                this.logger.log("openSocket(" + i + "," + deliverSocketToMe + "," + options + ")");
            }
            final SocketRequestHandleImpl handle = new SocketRequestHandleImpl(i, options);
            Set set = IdentityImpl.this.deadForever;
            synchronized (set) {
                if (IdentityImpl.this.deadForever.contains(i)) {
                    deliverSocketToMe.receiveException(handle, new NodeIsFaultyException(i));
                    return handle;
                }
            }
            Map<String, Integer> newOptions = OptionsFactory.copyOptions(options);
            newOptions.put("identity.node_handle_to_index", IdentityImpl.this.addIntendedDest(i));
            handle.setSubCancellable(this.tl.openSocket(IdentityImpl.this.serializer.translateDown(i), new SocketCallback<MiddleIdentifier>(){

                @Override
                public void receiveException(SocketRequestHandle<MiddleIdentifier> s, IOException ex) {
                    deliverSocketToMe.receiveException(handle, ex);
                }

                @Override
                public void receiveResult(SocketRequestHandle<MiddleIdentifier> cancellable, P2PSocket<MiddleIdentifier> sock) {
                    deliverSocketToMe.receiveResult(handle, new SocketWrapperSocket(i, sock, UpperIdentityImpl.this.logger, options));
                }
            }, newOptions));
            return handle;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public MessageRequestHandle<UpperIdentifier, UpperMsgType> sendMessage(UpperIdentifier i, UpperMsgType m, MessageCallback<UpperIdentifier, UpperMsgType> deliverAckToMe, Map<String, Integer> options) {
            IdentityMessageHandle ret;
            if (this.logger.level <= 500) {
                this.logger.log("sendMessage(" + i + "," + m + "," + options + ")");
            }
            Set set = IdentityImpl.this.deadForever;
            synchronized (set) {
                if (IdentityImpl.this.deadForever.contains(i)) {
                    MessageRequestHandleImpl mrh = new MessageRequestHandleImpl(i, m, options);
                    deliverAckToMe.sendFailed(mrh, new NodeIsFaultyException(i, m));
                    return mrh;
                }
                options = OptionsFactory.copyOptions(options);
                options.put("identity.node_handle_to_index", IdentityImpl.this.addIntendedDest(i));
                ret = new IdentityMessageHandle(i, m, options, deliverAckToMe);
                IdentityImpl.this.addPendingMessage(i, ret);
            }
            ret.setSubCancellable(this.tl.sendMessage(IdentityImpl.this.serializer.translateDown(i), m, ret, options));
            return ret;
        }

        @Override
        public void incomingSocket(P2PSocket<MiddleIdentifier> s) throws IOException {
            int index;
            Object from;
            if (this.logger.level <= 500) {
                this.logger.log("incomingSocket(" + s + ")");
            }
            if (IdentityImpl.this.sanityChecker.isSane(from = IdentityImpl.this.intendedDest.get(index = s.getOptions().get("identity.node_handle_to_index").intValue()), s.getIdentifier())) {
                this.callback.incomingSocket(new SocketWrapperSocket(from, s, this.logger, s.getOptions()));
            } else {
                if (this.logger.level <= 900) {
                    this.logger.logException("incomingSocket() Sanity checker did not match " + from + " to " + s.getIdentifier() + " options:" + s.getOptions(), new Exception("Stack Trace"));
                }
                s.close();
            }
        }

        @Override
        public void messageReceived(MiddleIdentifier i, UpperMsgType m, Map<String, Integer> options) throws IOException {
            int index;
            Object from;
            if (this.logger.level <= 500) {
                this.logger.log("messageReceived(" + i + "," + m + "," + options + ")");
            }
            if (IdentityImpl.this.sanityChecker.isSane(from = IdentityImpl.this.intendedDest.get(index = options.get("identity.node_handle_to_index").intValue()), i)) {
                this.callback.messageReceived(from, m, options);
            } else if (this.logger.level <= 900) {
                this.logger.logException("messageReceived() Sanity checker did not match " + from + " to " + i + " options:" + options, new Exception("Stack Trace"));
            }
        }

        @Override
        public boolean checkLiveness(UpperIdentifier i, Map<String, Integer> options) {
            if (this.logger.level <= 500) {
                this.logger.log("checkLiveness(" + i + "," + options + ")");
            }
            if (IdentityImpl.this.deadForever.contains(i)) {
                return false;
            }
            options = OptionsFactory.copyOptions(options);
            options.put("identity.node_handle_to_index", IdentityImpl.this.addIntendedDest(i));
            return this.livenessProvider.checkLiveness(IdentityImpl.this.serializer.translateDown(i), options);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void addLivenessListener(LivenessListener<UpperIdentifier> name) {
            List list = this.livenessListeners;
            synchronized (list) {
                this.livenessListeners.add(name);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean removeLivenessListener(LivenessListener<UpperIdentifier> name) {
            List list = this.livenessListeners;
            synchronized (list) {
                return this.livenessListeners.remove(name);
            }
        }

        @Override
        public int getLiveness(UpperIdentifier i, Map<String, Integer> options) {
            if (this.logger.level <= 400) {
                this.logger.log("getLiveness(" + i + "," + options + ")");
            }
            if (IdentityImpl.this.deadForever.contains(i)) {
                return 4;
            }
            options = OptionsFactory.copyOptions(options);
            options.put("identity.node_handle_to_index", IdentityImpl.this.addIntendedDest(i));
            return this.livenessProvider.getLiveness(IdentityImpl.this.serializer.translateDown(i), options);
        }

        @Override
        public void livenessChanged(MiddleIdentifier i, int val, Map<String, Integer> options) {
            if (IdentityImpl.this.deadForever.contains(i)) {
                if (val < 3 && this.logger.level <= 1000) {
                    this.logger.log("Node " + i + " came back from the dead!  It's a miracle! " + val + " Ignoring.");
                }
                return;
            }
            Object upper = IdentityImpl.this.serializer.translateUp(i);
            this.notifyLivenessListeners(upper, val, options);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void notifyLivenessListeners(UpperIdentifier i, int liveness, Map<String, Integer> options) {
            ArrayList temp;
            if (this.logger.level <= 400) {
                this.logger.log("notifyLivenessListeners(" + i + "," + liveness + ")");
            }
            List list = this.livenessListeners;
            synchronized (list) {
                temp = new ArrayList(this.livenessListeners);
            }
            for (LivenessListener livenessListener : temp) {
                livenessListener.livenessChanged(i, liveness, options);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void addProximityListener(ProximityListener<UpperIdentifier> name) {
            Collection collection = this.proxListeners;
            synchronized (collection) {
                this.proxListeners.add(name);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean removeProximityListener(ProximityListener<UpperIdentifier> name) {
            Collection collection = this.proxListeners;
            synchronized (collection) {
                return this.proxListeners.remove(name);
            }
        }

        @Override
        public int proximity(UpperIdentifier i) {
            if (this.logger.level <= 500) {
                this.logger.log("proximity(" + i + ")");
            }
            if (IdentityImpl.this.deadForever.contains(i)) {
                return Integer.MAX_VALUE;
            }
            return this.prox.proximity(IdentityImpl.this.serializer.translateDown(i));
        }

        @Override
        public void proximityChanged(MiddleIdentifier i, int newProx, Map<String, Integer> options) {
            this.notifyProximityListeners(IdentityImpl.this.serializer.translateUp(i), newProx, options);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void notifyProximityListeners(UpperIdentifier i, int newProx, Map<String, Integer> options) {
            ArrayList temp;
            if (this.logger.level <= 400) {
                this.logger.log("notifyProximityListeners(" + i + "," + newProx + ")");
            }
            Collection collection = this.proxListeners;
            synchronized (collection) {
                temp = new ArrayList(this.proxListeners);
            }
            for (ProximityListener proximityListener : temp) {
                proximityListener.proximityChanged(i, newProx, options);
            }
        }

        @Override
        public void acceptMessages(boolean b) {
            this.tl.acceptMessages(b);
        }

        @Override
        public void acceptSockets(boolean b) {
            this.tl.acceptSockets(b);
        }

        @Override
        public UpperIdentifier getLocalIdentifier() {
            return this.localIdentifier;
        }

        @Override
        public void setCallback(TransportLayerCallback<UpperIdentifier, UpperMsgType> callback) {
            this.callback = callback;
        }

        @Override
        public void setErrorHandler(ErrorHandler<UpperIdentifier> handler) {
            this.errorHandler = handler;
        }

        @Override
        public void destroy() {
            if (this.logger.level <= 800) {
                this.logger.log("destroy()");
            }
            this.tl.destroy();
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class LowerIdentityImpl
    implements LowerIdentity<LowerIdentifier, ByteBuffer>,
    TransportLayerCallback<LowerIdentifier, ByteBuffer> {
        TransportLayer<LowerIdentifier, ByteBuffer> tl;
        TransportLayerCallback<LowerIdentifier, ByteBuffer> callback;
        ErrorHandler<LowerIdentifier> handler;
        Logger logger;

        public LowerIdentityImpl(TransportLayer<LowerIdentifier, ByteBuffer> tl, ErrorHandler<LowerIdentifier> handler) {
            this.tl = tl;
            this.logger = IdentityImpl.this.environment.getLogManager().getLogger(IdentityImpl.class, "lower");
            this.handler = handler != null ? handler : new DefaultErrorHandler(this.logger);
            tl.setCallback(this);
        }

        @Override
        public SocketRequestHandle<LowerIdentifier> openSocket(final LowerIdentifier i, final SocketCallback<LowerIdentifier> deliverSocketToMe, final Map<String, Integer> options) {
            ByteBuffer buf;
            final SocketRequestHandleImpl ret = new SocketRequestHandleImpl(i, options);
            int index = options.get("identity.node_handle_to_index");
            Object dest = IdentityImpl.this.intendedDest.get(index);
            if (!IdentityImpl.this.addBinding(dest, i, options)) {
                deliverSocketToMe.receiveException(ret, new NodeIsFaultyException(i));
                return ret;
            }
            if (this.logger.level <= 500) {
                this.logger.log("openSocket(" + i + ") dest:" + dest);
            }
            try {
                SimpleOutputBuffer sob = new SimpleOutputBuffer((int)((double)IdentityImpl.this.localIdentifier.length * 2.5));
                IdentityImpl.this.serializer.serialize(sob, dest);
                sob.write(IdentityImpl.this.localIdentifier);
                buf = ByteBuffer.wrap(sob.getBytes());
            }
            catch (IOException ioe) {
                deliverSocketToMe.receiveException(ret, ioe);
                return ret;
            }
            ret.setSubCancellable(this.tl.openSocket(i, new SocketCallback<LowerIdentifier>(){

                @Override
                public void receiveException(SocketRequestHandle<LowerIdentifier> s, IOException ex) {
                    deliverSocketToMe.receiveException(ret, ex);
                }

                @Override
                public void receiveResult(SocketRequestHandle<LowerIdentifier> cancellable, P2PSocket<LowerIdentifier> sock) {
                    sock.register(false, true, new P2PSocketReceiver<LowerIdentifier>(){

                        @Override
                        public void receiveSelectResult(P2PSocket<LowerIdentifier> socket, boolean canRead, boolean canWrite) throws IOException {
                            if (canRead) {
                                throw new IOException("Never asked to read!");
                            }
                            if (!canWrite) {
                                throw new IOException("Can't write!");
                            }
                            socket.write(buf);
                            if (buf.hasRemaining()) {
                                socket.register(false, true, this);
                            } else {
                                socket.register(true, false, new P2PSocketReceiver<LowerIdentifier>(){
                                    ByteBuffer responseBuffer = ByteBuffer.allocate(1);

                                    @Override
                                    public void receiveException(P2PSocket<LowerIdentifier> socket, IOException ioe) {
                                        deliverSocketToMe.receiveException(ret, ioe);
                                    }

                                    @Override
                                    public void receiveSelectResult(P2PSocket<LowerIdentifier> socket, boolean canRead, boolean canWrite) throws IOException {
                                        if (!canRead) {
                                            throw new IOException("Can't read!");
                                        }
                                        if (canWrite) {
                                            throw new IOException("Never asked to write!");
                                        }
                                        if (socket.read(this.responseBuffer) == -1L) {
                                            socket.close();
                                            deliverSocketToMe.receiveException(ret, new ClosedChannelException());
                                            return;
                                        }
                                        if (this.responseBuffer.remaining() > 0) {
                                            socket.register(true, false, this);
                                        } else {
                                            byte answer = this.responseBuffer.array()[0];
                                            if (answer == 0) {
                                                if (LowerIdentityImpl.this.logger.level <= 800) {
                                                    LowerIdentityImpl.this.logger.log("openSocket(" + i + "," + deliverSocketToMe + ") answer = FAILURE");
                                                }
                                                Object newDest = IdentityImpl.this.serializer.deserialize(new SocketInputBuffer(socket, IdentityImpl.this.localIdentifier.length), i);
                                                IdentityImpl.this.addBinding(newDest, i, options);
                                                IdentityImpl.this.upper.notifyLivenessListeners(newDest, 1, options);
                                                deliverSocketToMe.receiveException(ret, new NodeIsFaultyException(i));
                                            } else {
                                                deliverSocketToMe.receiveResult(ret, socket);
                                            }
                                        }
                                    }
                                });
                            }
                        }

                        @Override
                        public void receiveException(P2PSocket<LowerIdentifier> socket, IOException ioe) {
                            deliverSocketToMe.receiveException(ret, ioe);
                        }
                    });
                }
            }, options));
            return ret;
        }

        @Override
        public void incomingSocket(P2PSocket<LowerIdentifier> s) throws IOException {
            if (this.logger.level <= 500) {
                this.logger.log("incomingSocket(" + s + ")");
            }
            s.register(true, false, new P2PSocketReceiver<LowerIdentifier>(){
                ByteBuffer buf;
                {
                    this.buf = ByteBuffer.allocate(IdentityImpl.this.localIdentifier.length);
                }

                @Override
                public void receiveException(P2PSocket<LowerIdentifier> socket, IOException ioe) {
                    LowerIdentityImpl.this.handler.receivedException(socket.getIdentifier(), ioe);
                }

                @Override
                public void receiveSelectResult(P2PSocket<LowerIdentifier> socket, boolean canRead, boolean canWrite) throws IOException {
                    if (canWrite) {
                        throw new IOException("Never asked to write!");
                    }
                    if (!canRead) {
                        throw new IOException("Can't read!");
                    }
                    if (socket.read(this.buf) == -1L) {
                        LowerIdentityImpl.this.handler.receivedException(socket.getIdentifier(), new ClosedChannelException());
                        return;
                    }
                    if (this.buf.hasRemaining()) {
                        socket.register(true, false, this);
                        return;
                    }
                    if (Arrays.equals(this.buf.array(), IdentityImpl.this.localIdentifier)) {
                        final SocketInputBuffer sib = new SocketInputBuffer(socket, 1024);
                        new P2PSocketReceiver<LowerIdentifier>(){

                            @Override
                            public void receiveException(P2PSocket<LowerIdentifier> socket, IOException ioe) {
                                LowerIdentityImpl.this.handler.receivedException(socket.getIdentifier(), ioe);
                            }

                            @Override
                            public void receiveSelectResult(P2PSocket<LowerIdentifier> socket, boolean canRead, boolean canWrite) throws IOException {
                                Object from;
                                if (canWrite) {
                                    throw new IOException("Never asked to write!");
                                }
                                if (!canRead) {
                                    throw new IOException("Can't read!");
                                }
                                final Map<String, Integer> newOptions = OptionsFactory.copyOptions(socket.getOptions());
                                try {
                                    from = IdentityImpl.this.serializer.deserialize(sib, socket.getIdentifier());
                                    newOptions.put("identity.node_handle_to_index", IdentityImpl.this.addIntendedDest(from));
                                }
                                catch (InsufficientBytesException ibe) {
                                    socket.register(true, false, this);
                                    return;
                                }
                                if (!IdentityImpl.this.addBinding(from, socket.getIdentifier(), socket.getOptions())) {
                                    if (LowerIdentityImpl.this.logger.level <= 900) {
                                        LowerIdentityImpl.this.logger.log("Serious error.  There was an attempt to open a socket from a supposedly stale identifier:" + from + ". Current identifier is " + IdentityImpl.this.bindings.get(socket.getIdentifier()) + " lower:" + socket.getIdentifier());
                                    }
                                    socket.close();
                                    return;
                                }
                                byte[] result = new byte[]{1};
                                final ByteBuffer writeMe = ByteBuffer.wrap(result);
                                socket.register(false, true, new P2PSocketReceiver<LowerIdentifier>(){

                                    @Override
                                    public void receiveException(P2PSocket<LowerIdentifier> socket, IOException ioe) {
                                        LowerIdentityImpl.this.handler.receivedException(socket.getIdentifier(), ioe);
                                    }

                                    @Override
                                    public void receiveSelectResult(P2PSocket<LowerIdentifier> socket, boolean canRead, boolean canWrite) throws IOException {
                                        if (canRead) {
                                            throw new IOException("Not expecting to read.");
                                        }
                                        if (!canWrite) {
                                            throw new IOException("Expecting to write.");
                                        }
                                        if (socket.write(writeMe) == -1L) {
                                            LowerIdentityImpl.this.handler.receivedException(socket.getIdentifier(), new ClosedChannelException());
                                            return;
                                        }
                                        if (writeMe.hasRemaining()) {
                                            socket.register(false, true, this);
                                            return;
                                        }
                                        SocketWrapperSocket returnMe = new SocketWrapperSocket(socket.getIdentifier(), socket, LowerIdentityImpl.this.logger, newOptions);
                                        LowerIdentityImpl.this.callback.incomingSocket(returnMe);
                                    }
                                });
                            }
                        }.receiveSelectResult(socket, canRead, canWrite);
                    } else {
                        if (LowerIdentityImpl.this.logger.level <= 800) {
                            LowerIdentityImpl.this.logger.log("incomingSocket() FAILURE expected " + Arrays.toString(this.buf.array()) + " me:" + Arrays.toString(IdentityImpl.this.localIdentifier));
                        }
                        byte[] result = new byte[1 + IdentityImpl.this.localIdentifier.length];
                        result[0] = 0;
                        System.arraycopy(IdentityImpl.this.localIdentifier, 0, result, 1, IdentityImpl.this.localIdentifier.length);
                        final ByteBuffer writeMe = ByteBuffer.wrap(result);
                        socket.register(false, true, new P2PSocketReceiver<LowerIdentifier>(){

                            @Override
                            public void receiveException(P2PSocket<LowerIdentifier> socket, IOException ioe) {
                                LowerIdentityImpl.this.handler.receivedException(socket.getIdentifier(), ioe);
                            }

                            @Override
                            public void receiveSelectResult(P2PSocket<LowerIdentifier> socket, boolean canRead, boolean canWrite) throws IOException {
                                if (canRead) {
                                    throw new IOException("Not expecting to read.");
                                }
                                if (!canWrite) {
                                    throw new IOException("Expecting to write.");
                                }
                                if (socket.write(writeMe) == -1L) {
                                    LowerIdentityImpl.this.handler.receivedException(socket.getIdentifier(), new ClosedChannelException());
                                    return;
                                }
                                if (buf.hasRemaining()) {
                                    socket.register(false, true, this);
                                    return;
                                }
                            }
                        });
                    }
                }
            });
        }

        @Override
        public MessageRequestHandle<LowerIdentifier, ByteBuffer> sendMessage(final LowerIdentifier i, ByteBuffer m, final MessageCallback<LowerIdentifier, ByteBuffer> deliverAckToMe, Map<String, Integer> options) {
            byte[] msgWithHeader;
            if (this.logger.level <= 300) {
                byte[] b = new byte[m.remaining()];
                System.arraycopy(m.array(), m.position(), b, 0, b.length);
                this.logger.log("sendMessage(" + i + "," + m + ")" + Arrays.toString(b));
            } else if (this.logger.level <= 500) {
                this.logger.log("sendMessage(" + i + "," + m + ")");
            }
            final MessageRequestHandleImpl ret = new MessageRequestHandleImpl(i, m, options);
            Integer index = null;
            if (options != null) {
                index = options.get("identity.node_handle_to_index");
            }
            if (index == null) {
                msgWithHeader = new byte[1 + IdentityImpl.this.localIdentifier.length + m.remaining()];
                msgWithHeader[0] = 2;
                System.arraycopy(IdentityImpl.this.localIdentifier, 0, msgWithHeader, 1, IdentityImpl.this.localIdentifier.length);
                m.get(msgWithHeader, 1 + IdentityImpl.this.localIdentifier.length, m.remaining());
            } else {
                byte[] destBytes;
                Object dest = IdentityImpl.this.intendedDest.get((int)index);
                if (!IdentityImpl.this.addBinding(dest, i, options)) {
                    deliverAckToMe.sendFailed(ret, new NodeIsFaultyException(i, m));
                    return ret;
                }
                try {
                    SimpleOutputBuffer sob = new SimpleOutputBuffer((int)((double)IdentityImpl.this.localIdentifier.length * 2.5));
                    IdentityImpl.this.serializer.serialize(sob, dest);
                    destBytes = sob.getBytes();
                }
                catch (IOException ioe) {
                    deliverAckToMe.sendFailed(ret, ioe);
                    return ret;
                }
                msgWithHeader = new byte[1 + destBytes.length + IdentityImpl.this.localIdentifier.length + m.remaining()];
                msgWithHeader[0] = 1;
                System.arraycopy(destBytes, 0, msgWithHeader, 1, destBytes.length);
                System.arraycopy(IdentityImpl.this.localIdentifier, 0, msgWithHeader, 1 + destBytes.length, IdentityImpl.this.localIdentifier.length);
                m.get(msgWithHeader, 1 + destBytes.length + IdentityImpl.this.localIdentifier.length, m.remaining());
            }
            ByteBuffer buf = ByteBuffer.wrap(msgWithHeader);
            ret.setSubCancellable(this.tl.sendMessage(i, buf, new MessageCallback<LowerIdentifier, ByteBuffer>(){

                @Override
                public void ack(MessageRequestHandle<LowerIdentifier, ByteBuffer> msg) {
                    if (ret.getSubCancellable() != null && msg != ret.getSubCancellable()) {
                        throw new RuntimeException("msg != cancellable.getSubCancellable() (indicates a bug in the code) msg:" + msg + " sub:" + ret.getSubCancellable());
                    }
                    if (deliverAckToMe != null) {
                        deliverAckToMe.ack(ret);
                    }
                }

                @Override
                public void sendFailed(MessageRequestHandle<LowerIdentifier, ByteBuffer> msg, IOException ex) {
                    if (ret.getSubCancellable() != null && msg != ret.getSubCancellable()) {
                        throw new RuntimeException("msg != cancellable.getSubCancellable() (indicates a bug in the code) msg:" + msg + " sub:" + ret.getSubCancellable());
                    }
                    if (deliverAckToMe == null) {
                        LowerIdentityImpl.this.handler.receivedException(i, ex);
                    } else {
                        deliverAckToMe.sendFailed(ret, ex);
                    }
                }
            }, options));
            return ret;
        }

        @Override
        public void messageReceived(LowerIdentifier i, ByteBuffer m, Map<String, Integer> options) throws IOException {
            HashMap<String, Integer> newOptions = new HashMap<String, Integer>(options);
            byte msgType = m.get();
            if (this.logger.level <= 500) {
                this.logger.log("messageReceived(" + i + "," + m + "):" + msgType);
            }
            switch (msgType) {
                case 1: {
                    byte[] dest = new byte[IdentityImpl.this.localIdentifier.length];
                    m.get(dest);
                    if (!Arrays.equals(dest, IdentityImpl.this.localIdentifier)) {
                        if (this.logger.level <= 800) {
                            this.logger.log("received message for wrong node from:" + i + " intended:" + Arrays.toString(dest) + " me:" + Arrays.toString(IdentityImpl.this.localIdentifier));
                        }
                        byte[] errorMessage = new byte[1 + IdentityImpl.this.localIdentifier.length];
                        errorMessage[0] = 0;
                        System.arraycopy(IdentityImpl.this.localIdentifier, 0, errorMessage, 1, IdentityImpl.this.localIdentifier.length);
                        ByteBuffer buf = ByteBuffer.wrap(errorMessage);
                        this.tl.sendMessage(i, buf, null, options);
                        return;
                    }
                }
                case 2: {
                    SimpleInputBuffer sib = new SimpleInputBuffer(m.array(), m.position());
                    Object from = IdentityImpl.this.serializer.deserialize(sib, i);
                    m.position(m.array().length - sib.bytesRemaining());
                    if (!IdentityImpl.this.addBinding(from, i, options)) {
                        if (this.logger.level <= 900) {
                            this.logger.log("Warning.  Received message from stale identifier:" + from + ". Current identifier is " + IdentityImpl.this.bindings.get(i) + " lower:" + i + " Probably a delayed message, dropping.");
                        }
                        this.handler.receivedUnexpectedData(i, m.array(), m.position(), newOptions);
                        return;
                    }
                    newOptions.put("identity.node_handle_to_index", IdentityImpl.this.addIntendedDest(from));
                    if (this.logger.level <= 300) {
                        byte[] b = new byte[m.remaining()];
                        System.arraycopy(m.array(), m.position(), b, 0, b.length);
                        this.logger.log("received message for me from:" + from + "(" + from + "(" + i + ")) " + Arrays.toString(b));
                    } else if (this.logger.level <= 400) {
                        this.logger.log("received message for me from:" + from + "(" + i + ") " + m);
                    }
                    this.callback.messageReceived(i, m, newOptions);
                    break;
                }
                case 0: {
                    Object oldDest = IdentityImpl.this.bindings.get(i);
                    Object newDest = IdentityImpl.this.serializer.deserialize(new SimpleInputBuffer(m.array(), m.position()), i);
                    if (this.logger.level <= 800) {
                        this.logger.log("received INCORRECT_IDENTITY:" + i + " old:" + oldDest + " new:" + newDest);
                    }
                    IdentityImpl.this.addBinding(newDest, i, options);
                    IdentityImpl.this.upper.notifyLivenessListeners(newDest, 1, options);
                }
            }
        }

        @Override
        public void acceptMessages(boolean b) {
            this.tl.acceptMessages(b);
        }

        @Override
        public void acceptSockets(boolean b) {
            this.tl.acceptMessages(b);
        }

        @Override
        public LowerIdentifier getLocalIdentifier() {
            return this.tl.getLocalIdentifier();
        }

        @Override
        public void setCallback(TransportLayerCallback<LowerIdentifier, ByteBuffer> callback) {
            this.callback = callback;
        }

        @Override
        public void setErrorHandler(ErrorHandler<LowerIdentifier> handler) {
            this.handler = handler;
        }

        @Override
        public void destroy() {
            if (this.logger.level <= 800) {
                this.logger.log("destroy()");
            }
            this.tl.destroy();
        }
    }
}

