/*
 * Decompiled with CFR 0.152.
 */
package org.mpisws.p2p.transport.priority;

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import org.mpisws.p2p.transport.ClosedChannelException;
import org.mpisws.p2p.transport.ErrorHandler;
import org.mpisws.p2p.transport.MessageCallback;
import org.mpisws.p2p.transport.MessageRequestHandle;
import org.mpisws.p2p.transport.P2PSocket;
import org.mpisws.p2p.transport.P2PSocketReceiver;
import org.mpisws.p2p.transport.SocketCallback;
import org.mpisws.p2p.transport.SocketRequestHandle;
import org.mpisws.p2p.transport.TransportLayer;
import org.mpisws.p2p.transport.TransportLayerCallback;
import org.mpisws.p2p.transport.TransportLayerListener;
import org.mpisws.p2p.transport.exception.NodeIsFaultyException;
import org.mpisws.p2p.transport.identity.MemoryExpiredException;
import org.mpisws.p2p.transport.liveness.LivenessListener;
import org.mpisws.p2p.transport.liveness.LivenessProvider;
import org.mpisws.p2p.transport.priority.MessageInfo;
import org.mpisws.p2p.transport.priority.MessageInfoImpl;
import org.mpisws.p2p.transport.priority.PriorityTransportLayer;
import org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl;
import org.mpisws.p2p.transport.priority.PriorityTransportLayerListener;
import org.mpisws.p2p.transport.priority.QueueOverflowException;
import org.mpisws.p2p.transport.proximity.ProximityProvider;
import org.mpisws.p2p.transport.util.DefaultErrorHandler;
import org.mpisws.p2p.transport.util.SocketRequestHandleImpl;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.p2p.commonapi.Cancellable;
import rice.p2p.util.SortedLinkedList;
import rice.p2p.util.tuples.Tuple;
import rice.selector.SelectorManager;
import rice.selector.TimerTask;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class PriorityTransportLayerImpl<Identifier>
implements PriorityTransportLayer<Identifier>,
LivenessListener<Identifier>,
TransportLayerCallback<Identifier, ByteBuffer> {
    TransportLayer<Identifier, ByteBuffer> tl;
    LivenessProvider<Identifier> livenessProvider;
    ProximityProvider<Identifier> proximityProvider;
    public static final byte PASSTHROUGH_SOCKET_B = 0;
    public static final byte PRIMARY_SOCKET_B = 1;
    public static final byte[] PASSTHROUGH_SOCKET = new byte[]{0};
    public static final byte[] PRIMARY_SOCKET = new byte[]{1};
    public int MAX_MSG_SIZE = 10000;
    public int MAX_QUEUE_SIZE = 30;
    public Hashtable sockets;
    public Logger logger;
    protected Map<Identifier, EntityManager> entityManagers;
    private TransportLayerCallback<Identifier, ByteBuffer> callback;
    private ErrorHandler<Identifier> errorHandler;
    protected SelectorManager selectorManager;
    protected Environment environment;
    protected boolean destroyed = false;
    ArrayList<TransportLayerListener<Identifier>> listeners = new ArrayList();
    ArrayList<PriorityTransportLayerListener<Identifier>> plisteners = new ArrayList();

    public PriorityTransportLayerImpl(TransportLayer<Identifier, ByteBuffer> tl, LivenessProvider<Identifier> livenessProvider, ProximityProvider<Identifier> proximityProvider, Environment env, int maxMsgSize, int maxQueueSize, ErrorHandler<Identifier> handler) {
        this.entityManagers = new HashMap<Identifier, EntityManager>();
        this.logger = env.getLogManager().getLogger(PriorityTransportLayerImpl.class, null);
        this.selectorManager = env.getSelectorManager();
        this.environment = env;
        this.MAX_MSG_SIZE = maxMsgSize;
        this.MAX_QUEUE_SIZE = maxQueueSize;
        this.tl = tl;
        if (this.logger.level <= 800) {
            this.logger.log("MAX_QUEUE_SIZE:" + this.MAX_QUEUE_SIZE + " MAX_MSG_SIZE:" + this.MAX_MSG_SIZE);
        }
        this.livenessProvider = livenessProvider;
        this.proximityProvider = proximityProvider;
        tl.setCallback(this);
        livenessProvider.addLivenessListener(this);
        this.errorHandler = handler;
        if (this.errorHandler == null) {
            this.errorHandler = new DefaultErrorHandler(this.logger);
        }
    }

    @Override
    public void incomingSocket(final P2PSocket<Identifier> s) throws IOException {
        s.register(true, false, new P2PSocketReceiver<Identifier>(){

            @Override
            public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                int ret;
                if (socket != s) {
                    throw new IllegalArgumentException("Sockets not equal!!! s:" + s + " socket:" + socket);
                }
                if (canWrite || !canRead) {
                    throw new IllegalArgumentException("Should only be able to read! canRead:" + canRead + " canWrite:" + canWrite);
                }
                ByteBuffer hdr = ByteBuffer.allocate(1);
                try {
                    ret = (int)socket.read(hdr);
                }
                catch (IOException ioe) {
                    socket.close();
                    return;
                }
                switch (ret) {
                    case -1: {
                        socket.close();
                        break;
                    }
                    case 0: {
                        socket.register(true, false, this);
                        break;
                    }
                    case 1: {
                        hdr.flip();
                        byte val = hdr.get();
                        switch (val) {
                            case 0: {
                                PriorityTransportLayerImpl.this.callback.incomingSocket(s);
                                break;
                            }
                            case 1: {
                                if (PriorityTransportLayerImpl.this.logger.level <= 500) {
                                    PriorityTransportLayerImpl.this.logger.log("Opened Primary Socket from " + s.getIdentifier());
                                }
                                PriorityTransportLayerImpl.this.getEntityManager(s.getIdentifier()).primarySocketAvailable(s, null);
                            }
                        }
                        break;
                    }
                    default: {
                        socket.close();
                        throw new IllegalStateException("Read " + ret + " bytes?  Not good.  Expected to read 1 byte.");
                    }
                }
            }

            @Override
            public void receiveException(P2PSocket<Identifier> socket, Exception e) {
                PriorityTransportLayerImpl.this.errorHandler.receivedException(socket.getIdentifier(), e);
            }
        });
    }

    @Override
    public SocketRequestHandle<Identifier> openSocket(Identifier i, final SocketCallback<Identifier> deliverSocketToMe, Map<String, Object> options) {
        if (deliverSocketToMe == null) {
            throw new IllegalArgumentException("No handle to return socket to! (deliverSocketToMe must be non-null!)");
        }
        final SocketRequestHandleImpl<Identifier> handle = new SocketRequestHandleImpl<Identifier>(i, options, this.logger);
        handle.setSubCancellable(this.tl.openSocket(i, new SocketCallback<Identifier>(){

            @Override
            public void receiveResult(SocketRequestHandle<Identifier> cancellable, final P2PSocket<Identifier> sock) {
                handle.setSubCancellable(new Cancellable(){

                    public boolean cancel() {
                        sock.close();
                        return true;
                    }
                });
                sock.register(false, true, new P2PSocketReceiver<Identifier>(){

                    @Override
                    public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                        if (canRead || !canWrite) {
                            throw new IllegalArgumentException("expected to write!  canRead:" + canRead + " canWrite:" + canWrite);
                        }
                        socket.write(ByteBuffer.wrap(PASSTHROUGH_SOCKET));
                        if (deliverSocketToMe != null) {
                            deliverSocketToMe.receiveResult(handle, socket);
                        }
                    }

                    @Override
                    public void receiveException(P2PSocket<Identifier> socket, Exception e) {
                        if (deliverSocketToMe != null) {
                            deliverSocketToMe.receiveException(handle, e);
                        }
                    }
                });
            }

            @Override
            public void receiveException(SocketRequestHandle<Identifier> s, Exception ex) {
                if (handle.getSubCancellable() != null && s != handle.getSubCancellable()) {
                    throw new IllegalArgumentException("s != handle.getSubCancellable() must be a bug. s:" + s + " sub:" + handle.getSubCancellable());
                }
                if (deliverSocketToMe != null) {
                    deliverSocketToMe.receiveException(handle, ex);
                }
            }
        }, options));
        return handle;
    }

    @Override
    public void acceptMessages(boolean b) {
        this.tl.acceptMessages(b);
    }

    @Override
    public void acceptSockets(boolean b) {
        this.tl.acceptSockets(b);
    }

    @Override
    public Identifier getLocalIdentifier() {
        return this.tl.getLocalIdentifier();
    }

    @Override
    public void messageReceived(Identifier i, ByteBuffer m, Map<String, Object> options) throws IOException {
        this.callback.messageReceived(i, m, options);
        this.notifyListenersRead(m.remaining(), i, options);
    }

    @Override
    public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(final Identifier i, ByteBuffer m, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, final Map<String, Object> options) {
        Integer val;
        if (this.logger.level <= 500) {
            this.logger.log("sendMessage(" + i + "," + m + "," + deliverAckToMe + "," + options + ")");
        }
        if (options != null && options.containsKey("transport_type") && (val = (Integer)options.get("transport_type")) != null && val == 0) {
            final int originalSize = m.remaining();
            return this.tl.sendMessage(i, m, new MessageCallback<Identifier, ByteBuffer>(){

                @Override
                public void ack(MessageRequestHandle<Identifier, ByteBuffer> msg) {
                    PriorityTransportLayerImpl.this.notifyListenersWrote(originalSize, i, options);
                }

                @Override
                public void sendFailed(MessageRequestHandle<Identifier, ByteBuffer> msg, Exception reason) {
                    PriorityTransportLayerImpl.this.notifyListenersDropped(originalSize, i, options);
                }
            }, options);
        }
        return this.getEntityManager(i).send(i, m, deliverAckToMe, options);
    }

    @Override
    public void setCallback(TransportLayerCallback<Identifier, ByteBuffer> callback) {
        this.callback = callback;
    }

    @Override
    public void setErrorHandler(ErrorHandler<Identifier> handler) {
        this.errorHandler = handler;
    }

    @Override
    public void destroy() {
        if (this.destroyed) {
            return;
        }
        if (this.environment.getSelectorManager().isSelectorThread()) {
            this.destroyed = true;
            this.tl.destroy();
        } else {
            this.environment.getSelectorManager().invoke(new Runnable(){

                public void run() {
                    PriorityTransportLayerImpl.this.destroy();
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected EntityManager getEntityManager(Identifier i) {
        Map<Identifier, EntityManager> map = this.entityManagers;
        synchronized (map) {
            EntityManager ret = this.entityManagers.get(i);
            if (ret == null) {
                ret = new EntityManager(i);
                this.entityManagers.put(i, ret);
            }
            return ret;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected EntityManager deleteEntityManager(Identifier i) {
        Map<Identifier, EntityManager> map = this.entityManagers;
        synchronized (map) {
            EntityManager ret = this.entityManagers.get(i);
            if (ret != null) {
                ret.clearState();
            }
            return ret;
        }
    }

    @Override
    public void livenessChanged(Identifier i, int val, Map<String, Object> options) {
        if (val >= 3) {
            this.getEntityManager(i).markDead();
        }
    }

    public void cancelLivenessChecker(Identifier i) {
        this.getEntityManager(i).stopLivenessChecker();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void printMemStats(int logLevel) {
        if (logLevel <= 500) {
            Map<Identifier, EntityManager> map = this.entityManagers;
            synchronized (map) {
                int queueSum = 0;
                for (EntityManager em : this.entityManagers.values()) {
                    int queueSize = em.queue.size();
                    queueSum += queueSize;
                    if (logLevel > 300 && (queueSize <= 0 || logLevel > 400)) continue;
                    Object temp = em.identifier.get();
                    String s = "";
                    Map<String, Object> options = null;
                    if (temp != null) {
                        EntityManager.MessageWrapper peek = em.peek();
                        if (peek != null) {
                            options = peek.options;
                        }
                        s = "" + this.livenessProvider.getLiveness(temp, options);
                    }
                    this.logger.log("EM{" + temp + "," + s + "," + em.writingSocket + "," + em.pendingSocket + "} queue:" + queueSize + " reg:" + em.registered + " lChecker:" + em.livenessChecker);
                }
                this.logger.log("NumEMs:" + this.entityManagers.size() + " numPendingMsgs:" + queueSum);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addTransportLayerListener(TransportLayerListener<Identifier> listener) {
        ArrayList<TransportLayerListener<Identifier>> arrayList = this.listeners;
        synchronized (arrayList) {
            this.listeners.add(listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeTransportLayerListener(TransportLayerListener<Identifier> listener) {
        ArrayList<TransportLayerListener<Identifier>> arrayList = this.listeners;
        synchronized (arrayList) {
            this.listeners.remove(listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addPriorityTransportLayerListener(PriorityTransportLayerListener<Identifier> listener) {
        ArrayList<PriorityTransportLayerListener<Identifier>> arrayList = this.plisteners;
        synchronized (arrayList) {
            this.plisteners.add(listener);
        }
        this.addTransportLayerListener(listener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removePriorityTransportLayerListener(PriorityTransportLayerListener<Identifier> listener) {
        ArrayList<PriorityTransportLayerListener<Identifier>> arrayList = this.plisteners;
        synchronized (arrayList) {
            this.plisteners.remove(listener);
        }
        this.removeTransportLayerListener(listener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyListenersRead(int size, Identifier source, Map<String, Object> options) {
        ArrayList<TransportLayerListener<Identifier>> temp;
        if (this.listeners.isEmpty()) {
            return;
        }
        ArrayList<TransportLayerListener<Identifier>> arrayList = this.listeners;
        synchronized (arrayList) {
            temp = new ArrayList<TransportLayerListener<Identifier>>(this.listeners);
        }
        for (TransportLayerListener<Identifier> l : temp) {
            l.read(size, source, options, true, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyListenersWrote(int size, Identifier dest, Map<String, Object> options) {
        ArrayList<TransportLayerListener<Identifier>> temp;
        if (this.listeners.isEmpty()) {
            return;
        }
        ArrayList<TransportLayerListener<Identifier>> arrayList = this.listeners;
        synchronized (arrayList) {
            temp = new ArrayList<TransportLayerListener<Identifier>>(this.listeners);
        }
        for (TransportLayerListener<Identifier> l : temp) {
            l.wrote(size, dest, options, true, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyListenersEnqueued(int size, Identifier dest, Map<String, Object> options) {
        ArrayList<PriorityTransportLayerListener<Identifier>> temp;
        if (this.plisteners.isEmpty()) {
            return;
        }
        ArrayList<PriorityTransportLayerListener<Identifier>> arrayList = this.plisteners;
        synchronized (arrayList) {
            temp = new ArrayList<PriorityTransportLayerListener<Identifier>>(this.plisteners);
        }
        for (PriorityTransportLayerListener<Identifier> l : temp) {
            l.enqueued(size, dest, options, true, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyListenersDropped(int size, Identifier dest, Map<String, Object> options) {
        ArrayList<PriorityTransportLayerListener<Identifier>> temp;
        if (this.plisteners.isEmpty()) {
            return;
        }
        ArrayList<PriorityTransportLayerListener<Identifier>> arrayList = this.plisteners;
        synchronized (arrayList) {
            temp = new ArrayList<PriorityTransportLayerListener<Identifier>>(this.plisteners);
        }
        for (PriorityTransportLayerListener<Identifier> l : temp) {
            l.dropped(size, dest, options, true, true);
        }
    }

    @Override
    public long bytesPending(Identifier i) {
        return this.getEntityManager(i).bytesPending();
    }

    @Override
    public int queueLength(Identifier i) {
        return this.getEntityManager(i).queueLength();
    }

    @Override
    public List<MessageInfo> getPendingMessages(Identifier i) {
        return this.getEntityManager(i).getPendingMessages();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<Identifier> nodesWithPendingMessages() {
        ArrayList ret = new ArrayList();
        Map<Identifier, EntityManager> map = this.entityManagers;
        synchronized (map) {
            for (EntityManager m : this.entityManagers.values()) {
                if (m.peek() == null) continue;
                ret.add(m.identifier.get());
            }
        }
        return ret;
    }

    @Override
    public Map<String, Object> connectionOptions(Identifier i) {
        EntityManager manager = this.getEntityManager(i);
        P2PSocket temp = manager.writingSocket;
        if (temp != null) {
            return temp.getOptions();
        }
        SocketRequestHandle temp2 = manager.pendingSocket;
        if (temp2 != null) {
            return temp2.getOptions();
        }
        return null;
    }

    @Override
    public int connectionStatus(Identifier i) {
        EntityManager manager = this.getEntityManager(i);
        if (manager.writingSocket != null) {
            return 2;
        }
        if (manager.pendingSocket != null) {
            return 1;
        }
        return 0;
    }

    @Override
    public void openPrimaryConnection(Identifier i, Map<String, Object> options) {
        this.getEntityManager(i).openPrimarySocketHelper(i, options);
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class EntityManager
    implements P2PSocketReceiver<Identifier> {
        int seq = Integer.MIN_VALUE;
        SortedLinkedList<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue;
        Collection<P2PSocket<Identifier>> sockets;
        WeakReference<Identifier> identifier;
        SocketRequestHandle<Identifier> pendingSocket;
        P2PSocket<Identifier> writingSocket;
        P2PSocket<Identifier> closeWritingSocket;
        org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper messageThatIsBeingWritten;
        private boolean registered = false;
        TimerTask livenessChecker = null;

        EntityManager(Identifier identifier) {
            this.identifier = new WeakReference(identifier);
            this.queue = new SortedLinkedList();
            this.sockets = new HashSet();
        }

        public String toString() {
            return "EM{" + this.identifier.get() + "}";
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void clearState() {
            if (!PriorityTransportLayerImpl.this.selectorManager.isSelectorThread()) {
                PriorityTransportLayerImpl.this.selectorManager.invoke(new Runnable(){

                    public void run() {
                        EntityManager.this.clearState();
                    }
                });
                return;
            }
            for (P2PSocket socket : this.sockets) {
                socket.close();
            }
            Object object = this.queue;
            synchronized (object) {
                this.queue.clear();
                this.messageThatIsBeingWritten = null;
            }
            object = this;
            synchronized (object) {
                Logger cfr_ignored_0 = PriorityTransportLayerImpl.this.logger;
                if (PriorityTransportLayerImpl.this.logger.level <= 800) {
                    PriorityTransportLayerImpl.this.logger.log(this + ".clearState() setting pendingSocket to null " + this.pendingSocket);
                }
                if (this.pendingSocket != null) {
                    this.pendingSocket.cancel();
                    this.stopLivenessChecker();
                }
                this.pendingSocket = null;
            }
        }

        public boolean closeMe(P2PSocket<Identifier> socket) {
            if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                PriorityTransportLayerImpl.this.logger.logException("closeMe(" + socket + "):" + (socket == this.writingSocket) + "," + this.messageThatIsBeingWritten, new Exception("Stack Trace"));
            }
            if (socket == this.writingSocket) {
                if (this.messageThatIsBeingWritten == null) {
                    this.sockets.remove(socket);
                    socket.close();
                    this.setWritingSocket(null);
                    return true;
                }
                this.closeWritingSocket = this.writingSocket;
                return false;
            }
            this.sockets.remove(socket);
            socket.close();
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void primarySocketAvailable(P2PSocket<Identifier> s, SocketRequestHandle<Identifier> receipt) {
            if (!PriorityTransportLayerImpl.this.selectorManager.isSelectorThread()) {
                throw new IllegalStateException("Must be called on the selector");
            }
            if (PriorityTransportLayerImpl.this.logger.level <= 500) {
                PriorityTransportLayerImpl.this.logger.log("primarySocketAvailable(" + s + "," + receipt + ")");
            }
            EntityManager entityManager = this;
            synchronized (entityManager) {
                if (receipt != null && receipt == this.pendingSocket) {
                    Logger cfr_ignored_0 = PriorityTransportLayerImpl.this.logger;
                    if (PriorityTransportLayerImpl.this.logger.level <= 800) {
                        PriorityTransportLayerImpl.this.logger.log(this + ".primarySocketAvailable setting pendingSocket to null " + this.pendingSocket);
                    }
                    this.stopLivenessChecker();
                    if (PriorityTransportLayerImpl.this.logger.level <= 500) {
                        PriorityTransportLayerImpl.this.logger.log("got socket:" + s + " clearing pendingSocket:" + this.pendingSocket);
                    }
                    this.pendingSocket = null;
                }
            }
            this.sockets.add(s);
            this.scheduleToWriteIfNeeded();
            new SizeReader(s);
        }

        public void setWritingSocket(P2PSocket<Identifier> s) {
            if (PriorityTransportLayerImpl.this.logger.level <= 799) {
                PriorityTransportLayerImpl.this.logger.log(this + ".setWritingSocket(" + s + ")");
            }
            this.writingSocket = s;
        }

        protected void scheduleToWriteIfNeeded() {
            if (!PriorityTransportLayerImpl.this.selectorManager.isSelectorThread()) {
                throw new IllegalStateException("Must be called on the selector");
            }
            Object temp = this.identifier.get();
            if (temp == null) {
                this.purge(new MemoryExpiredException("No record of identifier for " + this));
                return;
            }
            if (this.writingSocket == null) {
                MessageWrapper peek;
                this.registered = false;
                if (!this.sockets.isEmpty()) {
                    this.setWritingSocket(this.sockets.iterator().next());
                } else if (this.pendingSocket == null && (peek = this.peek()) != null) {
                    this.openPrimarySocketHelper(temp, peek.options);
                }
            }
            if (!this.registered && this.writingSocket != null && this.haveMessageToSend()) {
                this.registered = true;
                if (PriorityTransportLayerImpl.this.logger.level <= 300) {
                    PriorityTransportLayerImpl.this.logger.log(this + ".scheduleToWriteIfNeeded() registering to write on " + this.writingSocket);
                }
                this.writingSocket.register(false, true, this);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void openPrimarySocketHelper(final Identifier i, Map<String, Object> options) {
            EntityManager entityManager = this;
            synchronized (entityManager) {
                if (this.pendingSocket != null || this.writingSocket != null) {
                    return;
                }
                if (PriorityTransportLayerImpl.this.logger.level <= 500) {
                    PriorityTransportLayerImpl.this.logger.log("Opening Primary Socket to " + i);
                }
                final SocketRequestHandleImpl handle = new SocketRequestHandleImpl<Identifier>(i, options, PriorityTransportLayerImpl.this.logger){

                    @Override
                    public boolean cancel() {
                        PriorityTransportLayerImpl.this.getEntityManager(i).receiveSocketException(this, new ClosedChannelException("Channel cancelled."));
                        return super.cancel();
                    }
                };
                if (PriorityTransportLayerImpl.this.logger.level <= 800) {
                    PriorityTransportLayerImpl.this.logger.log(this + ".openPrimarySocketHelper() setting pendingSocket to " + handle);
                }
                this.pendingSocket = handle;
                this.startLivenessChecker(i, options);
                handle.setSubCancellable(PriorityTransportLayerImpl.this.tl.openSocket(i, new SocketCallback<Identifier>(){

                    @Override
                    public void receiveResult(SocketRequestHandle<Identifier> cancellable, final P2PSocket<Identifier> sock) {
                        handle.setSubCancellable(new Cancellable(){

                            public boolean cancel() {
                                sock.close();
                                return true;
                            }
                        });
                        sock.register(false, true, new P2PSocketReceiver<Identifier>(){
                            ByteBuffer writeMe = ByteBuffer.wrap(PRIMARY_SOCKET);

                            @Override
                            public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                                if (canRead || !canWrite) {
                                    throw new IllegalArgumentException("expected to write!  canRead:" + canRead + " canWrite:" + canWrite);
                                }
                                if (PriorityTransportLayerImpl.this.logger.level <= 500) {
                                    PriorityTransportLayerImpl.this.logger.log("Opened Primary socket " + socket + " to " + i);
                                }
                                if (socket.write(this.writeMe) == -1L) {
                                    PriorityTransportLayerImpl.this.cancelLivenessChecker(i);
                                    PriorityTransportLayerImpl.this.getEntityManager(socket.getIdentifier()).receiveSocketException(handle, new ClosedChannelException("Channel closed while writing."));
                                    return;
                                }
                                if (this.writeMe.hasRemaining()) {
                                    socket.register(false, true, this);
                                } else {
                                    PriorityTransportLayerImpl.this.getEntityManager(socket.getIdentifier()).primarySocketAvailable(socket, handle);
                                }
                            }

                            @Override
                            public void receiveException(P2PSocket<Identifier> socket, Exception e) {
                                PriorityTransportLayerImpl.this.getEntityManager(socket.getIdentifier()).receiveSocketException(handle, e);
                            }

                            public String toString() {
                                return "PriorityTLi: Primary Socket shim to " + i;
                            }
                        });
                    }

                    @Override
                    public void receiveException(SocketRequestHandle<Identifier> s, Exception ex) {
                        if (handle.getSubCancellable() != null && s != handle.getSubCancellable()) {
                            throw new IllegalArgumentException("s != handle.getSubCancellable() must be a bug. s:" + s + " sub:" + handle.getSubCancellable());
                        }
                        PriorityTransportLayerImpl.this.getEntityManager(s.getIdentifier()).receiveSocketException(handle, ex);
                    }
                }, options));
            }
        }

        public void startLivenessChecker(final Identifier temp, final Map<String, Object> options) {
            if (options == null) {
                throw new IllegalArgumentException("Options is null");
            }
            if (this.livenessChecker == null) {
                if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                    PriorityTransportLayerImpl.this.logger.log("startLivenessChecker(" + temp + "," + options + ") pend:" + this.pendingSocket + " writingS:" + this.writingSocket + " theQueue:" + this.queue.size());
                }
                this.livenessChecker = new TimerTask(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void run() {
                        EntityManager entityManager = EntityManager.this;
                        synchronized (entityManager) {
                            if (this.cancelled) {
                                return;
                            }
                            EntityManager.this.stopLivenessChecker();
                            if (PriorityTransportLayerImpl.this.destroyed) {
                                return;
                            }
                            PriorityTransportLayerImpl.this.livenessProvider.checkLiveness(temp, options);
                            Logger cfr_ignored_0 = PriorityTransportLayerImpl.this.logger;
                            if (PriorityTransportLayerImpl.this.logger.level <= 800) {
                                PriorityTransportLayerImpl.this.logger.log(EntityManager.this + ".liveness checker setting pendingSocket to null " + EntityManager.this.pendingSocket);
                            }
                            EntityManager.this.pendingSocket.cancel();
                            EntityManager.this.pendingSocket = null;
                        }
                        EntityManager.this.scheduleToWriteIfNeeded();
                    }
                };
                int delay = PriorityTransportLayerImpl.this.proximityProvider.proximity(temp, options) * 4;
                if (delay < 5000) {
                    delay = 5000;
                }
                if (delay > 40000) {
                    delay = 40000;
                }
                PriorityTransportLayerImpl.this.selectorManager.schedule(this.livenessChecker, delay);
            }
        }

        public void stopLivenessChecker() {
            if (this.livenessChecker == null) {
                return;
            }
            if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                PriorityTransportLayerImpl.this.logger.log("stopLivenessChecker(" + this.identifier.get() + ") pend:" + this.pendingSocket + " writingS:" + this.writingSocket + " theQueue:" + this.queue.size());
            }
            this.livenessChecker.cancel();
            this.livenessChecker = null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper peek() {
            SortedLinkedList<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> sortedLinkedList = this.queue;
            synchronized (sortedLinkedList) {
                if (this.messageThatIsBeingWritten == null) {
                    return (MessageWrapper)this.queue.peek();
                }
                return this.messageThatIsBeingWritten;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper poll() {
            SortedLinkedList<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> sortedLinkedList = this.queue;
            synchronized (sortedLinkedList) {
                if (this.messageThatIsBeingWritten == null) {
                    this.messageThatIsBeingWritten = (MessageWrapper)this.queue.poll();
                    if (PriorityTransportLayerImpl.this.logger.level <= 300) {
                        PriorityTransportLayerImpl.this.logger.log("poll(" + this.identifier.get() + ") set messageThatIsBeingWritten = " + this.messageThatIsBeingWritten);
                    }
                }
                if (this.queue.size() >= PriorityTransportLayerImpl.this.MAX_QUEUE_SIZE - 1 && PriorityTransportLayerImpl.this.logger.level <= 800) {
                    PriorityTransportLayerImpl.this.logger.log(this + "polling from full queue (this is a good thing) " + this.messageThatIsBeingWritten);
                }
                return this.messageThatIsBeingWritten;
            }
        }

        private boolean haveMessageToSend() {
            return this.messageThatIsBeingWritten != null || !this.queue.isEmpty();
        }

        @Override
        public void receiveException(P2PSocket<Identifier> socket, Exception ioe) {
            if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                PriorityTransportLayerImpl.this.logger.logException(this + ".receiveException(" + socket + "," + ioe + "):" + this.messageThatIsBeingWritten + " wrS:" + this.writingSocket, ioe);
            } else if (PriorityTransportLayerImpl.this.logger.level <= 800) {
                PriorityTransportLayerImpl.this.logger.log(this + ".receiveException(" + socket + "," + ioe + "):" + this.messageThatIsBeingWritten + " wrS:" + this.writingSocket + " " + ioe);
            }
            this.registered = false;
            this.sockets.remove(socket);
            if (!(ioe instanceof java.nio.channels.ClosedChannelException)) {
                socket.close();
            }
            if (socket == this.writingSocket) {
                this.clearAndEnqueue((MessageWrapper)this.messageThatIsBeingWritten);
            }
            this.scheduleToWriteIfNeeded();
        }

        @Override
        public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
            this.registered = false;
            if (canRead || !canWrite) {
                throw new IllegalStateException(this + " Expected only to write. canRead:" + canRead + " canWrite:" + canWrite + " socket:" + socket);
            }
            if (socket != this.writingSocket) {
                if (PriorityTransportLayerImpl.this.logger.level <= 900) {
                    PriorityTransportLayerImpl.this.logger.log("receivedSelectResult(" + socket + ", r:" + canRead + " w:" + canWrite + ") ws:" + this.writingSocket);
                }
                return;
            }
            if (PriorityTransportLayerImpl.this.logger.level <= 300) {
                PriorityTransportLayerImpl.this.logger.log("receivedSelectResult(" + socket + "," + canRead + "," + canWrite);
            }
            MessageWrapper current = this.poll();
            while (current != null && current.receiveSelectResult(this.writingSocket)) {
                current = this.poll();
            }
            this.scheduleToWriteIfNeeded();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void receiveSocketException(SocketRequestHandleImpl<Identifier> handle, Exception ex) {
            EntityManager entityManager = this;
            synchronized (entityManager) {
                if (handle == this.pendingSocket) {
                    Logger cfr_ignored_0 = PriorityTransportLayerImpl.this.logger;
                    if (PriorityTransportLayerImpl.this.logger.level <= 800) {
                        PriorityTransportLayerImpl.this.logger.log(this + ".receiveSocketException(" + ex + ") setting pendingSocket to null " + this.pendingSocket);
                    }
                    this.stopLivenessChecker();
                    this.pendingSocket = null;
                }
            }
            this.scheduleToWriteIfNeeded();
        }

        /*
         * Ignored method signature, as it can't be verified against descriptor
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void enqueue(MessageWrapper ret) {
            SortedLinkedList<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> sortedLinkedList = this.queue;
            synchronized (sortedLinkedList) {
                this.queue.add((org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper)ret);
                while (this.queue.size() > PriorityTransportLayerImpl.this.MAX_QUEUE_SIZE) {
                    MessageWrapper w = (MessageWrapper)this.queue.removeLast();
                    if (PriorityTransportLayerImpl.this.logger.level <= 700) {
                        PriorityTransportLayerImpl.this.logger.log("Dropping " + w + " because queue is full. MAX_QUEUE_SIZE:" + PriorityTransportLayerImpl.this.MAX_QUEUE_SIZE);
                    }
                    w.drop();
                }
            }
        }

        public void markDead() {
            this.purge(new NodeIsFaultyException(this.identifier.get()));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void purge(IOException ioe) {
            if (PriorityTransportLayerImpl.this.logger.level <= 500) {
                PriorityTransportLayerImpl.this.logger.log(this + "purge(" + ioe + "):" + this.messageThatIsBeingWritten);
            }
            ArrayList callSendFailed = new ArrayList();
            Object object = this.queue;
            synchronized (object) {
                if (this.messageThatIsBeingWritten != null) {
                    this.messageThatIsBeingWritten.reset();
                    if (this.messageThatIsBeingWritten.deliverAckToMe != null) {
                        callSendFailed.add(new Tuple(this.messageThatIsBeingWritten.deliverAckToMe, this.messageThatIsBeingWritten));
                    }
                    this.messageThatIsBeingWritten = null;
                }
                for (MessageWrapper messageWrapper : this.queue) {
                    if (messageWrapper.deliverAckToMe == null) continue;
                    callSendFailed.add(new Tuple(messageWrapper.deliverAckToMe, messageWrapper));
                }
                this.queue.clear();
            }
            for (Tuple tuple : callSendFailed) {
                ((MessageCallback)tuple.a()).sendFailed((MessageRequestHandle)tuple.b(), ioe);
            }
            object = this.sockets;
            synchronized (object) {
                for (P2PSocket p2PSocket : this.sockets) {
                    p2PSocket.close();
                }
                this.sockets.clear();
            }
            this.setWritingSocket(null);
            object = this;
            synchronized (object) {
                Logger cfr_ignored_0 = PriorityTransportLayerImpl.this.logger;
                if (PriorityTransportLayerImpl.this.logger.level <= 800) {
                    PriorityTransportLayerImpl.this.logger.log(this + ".purge setting pendingSocket to null " + this.pendingSocket);
                }
                if (this.pendingSocket != null) {
                    this.stopLivenessChecker();
                    this.pendingSocket.cancel();
                }
                this.pendingSocket = null;
            }
        }

        public MessageRequestHandle<Identifier, ByteBuffer> send(Identifier temp, ByteBuffer message, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) {
            int remaining;
            if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                PriorityTransportLayerImpl.this.logger.log(this + "send(" + message + ")");
            }
            int priority = 0;
            if (options != null && options.containsKey("OPTION_PRIORITY")) {
                priority = (Integer)options.get("OPTION_PRIORITY");
            }
            if ((remaining = message.remaining()) > PriorityTransportLayerImpl.this.MAX_MSG_SIZE) {
                MessageWrapper ret = new MessageWrapper(temp, message, deliverAckToMe, options, priority, 0);
                if (deliverAckToMe != null) {
                    deliverAckToMe.sendFailed(ret, new SocketException("Message too large. msg:" + message + " size:" + remaining + " max:" + PriorityTransportLayerImpl.this.MAX_MSG_SIZE));
                }
                return ret;
            }
            if (PriorityTransportLayerImpl.this.livenessProvider.getLiveness(temp, options) >= 3) {
                MessageWrapper ret = new MessageWrapper(temp, message, deliverAckToMe, options, priority, 0);
                if (deliverAckToMe != null) {
                    deliverAckToMe.sendFailed(ret, new NodeIsFaultyException(temp, message));
                }
                return ret;
            }
            MessageWrapper ret = new MessageWrapper(temp, message, deliverAckToMe, options, priority, this.seq++);
            PriorityTransportLayerImpl.this.notifyListenersEnqueued(ret.originalSize, temp, options);
            this.enqueue(ret);
            if (PriorityTransportLayerImpl.this.selectorManager.isSelectorThread()) {
                this.scheduleToWriteIfNeeded();
            } else {
                PriorityTransportLayerImpl.this.selectorManager.invoke(new Runnable(){

                    public void run() {
                        EntityManager.this.scheduleToWriteIfNeeded();
                    }
                });
            }
            return ret;
        }

        /*
         * Ignored method signature, as it can't be verified against descriptor
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected boolean complete(MessageWrapper wrapper) {
            if (PriorityTransportLayerImpl.this.logger.level <= 500) {
                PriorityTransportLayerImpl.this.logger.log(this + ".complete(" + wrapper + ")");
            }
            if (wrapper != this.messageThatIsBeingWritten) {
                throw new IllegalArgumentException("Wrapper:" + wrapper + " messageThatIsBeingWritten:" + this.messageThatIsBeingWritten);
            }
            SortedLinkedList<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> sortedLinkedList = this.queue;
            synchronized (sortedLinkedList) {
                this.messageThatIsBeingWritten = null;
            }
            wrapper.complete();
            if (this.closeWritingSocket == this.writingSocket) {
                this.writingSocket.close();
                this.setWritingSocket(null);
                this.closeWritingSocket = null;
                return false;
            }
            return true;
        }

        /*
         * Ignored method signature, as it can't be verified against descriptor
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void clearAndEnqueue(MessageWrapper wrapper) {
            if (wrapper != this.messageThatIsBeingWritten) {
                throw new IllegalArgumentException("Wrapper:" + wrapper + " messageThatIsBeingWritten:" + this.messageThatIsBeingWritten);
            }
            SortedLinkedList<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> sortedLinkedList = this.queue;
            synchronized (sortedLinkedList) {
                if (this.messageThatIsBeingWritten != null) {
                    this.messageThatIsBeingWritten.reset();
                }
                this.messageThatIsBeingWritten = null;
                if (this.writingSocket != null) {
                    this.sockets.remove(this.writingSocket);
                    this.setWritingSocket(null);
                }
                if (wrapper != null) {
                    wrapper.reset();
                    this.enqueue(wrapper);
                }
            }
        }

        public int queueLength() {
            int ret = this.queue.size();
            if (this.messageThatIsBeingWritten != null) {
                ++ret;
            }
            return ret;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public long bytesPending() {
            long ret = 0L;
            SortedLinkedList<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> sortedLinkedList = this.queue;
            synchronized (sortedLinkedList) {
                if (this.messageThatIsBeingWritten != null) {
                    ret += (long)this.messageThatIsBeingWritten.message.remaining();
                }
                for (MessageWrapper messageWrapper : this.queue) {
                    ret += (long)messageWrapper.message.remaining();
                }
            }
            return ret;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<MessageInfo> getPendingMessages() {
            SortedLinkedList<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> sortedLinkedList = this.queue;
            synchronized (sortedLinkedList) {
                ArrayList<MessageInfo> ret = new ArrayList<MessageInfo>(this.queue.size());
                for (MessageWrapper messageWrapper : this.queue) {
                    ret.add(messageWrapper.getMessageInfo());
                }
                return ret;
            }
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        class BufferReader
        implements P2PSocketReceiver<Identifier> {
            ByteBuffer buf;

            public BufferReader(int size, P2PSocket<Identifier> socket) {
                this.buf = ByteBuffer.allocate(size);
                socket.register(true, false, this);
            }

            @Override
            public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                if (canWrite || !canRead) {
                    throw new IllegalStateException(EntityManager.this + " Expected only to read. canRead:" + canRead + " canWrite:" + canWrite + " socket:" + socket);
                }
                try {
                    if (socket.read(this.buf) == -1L) {
                        EntityManager.this.closeMe(socket);
                        return;
                    }
                }
                catch (IOException ioe) {
                    this.receiveException(socket, ioe);
                    return;
                }
                if (this.buf.remaining() == 0) {
                    this.buf.flip();
                    this.done(socket);
                } else {
                    socket.register(true, false, this);
                }
            }

            @Override
            public void receiveException(P2PSocket<Identifier> socket, Exception e) {
                if (e instanceof java.nio.channels.ClosedChannelException) {
                    return;
                }
                boolean printError = true;
                if (e instanceof NodeIsFaultyException) {
                    printError = false;
                }
                if (e instanceof IOException && e.getMessage().equals("An established connection was aborted by the software in your host machine")) {
                    printError = false;
                }
                if (printError) {
                    PriorityTransportLayerImpl.this.errorHandler.receivedException(socket.getIdentifier(), e);
                }
                EntityManager.this.closeMe(socket);
            }

            public void done(P2PSocket<Identifier> socket) throws IOException {
                if (PriorityTransportLayerImpl.this.logger.level <= 500) {
                    PriorityTransportLayerImpl.this.logger.log(EntityManager.this + " read message of size " + this.buf.capacity() + " from " + socket);
                }
                PriorityTransportLayerImpl.this.notifyListenersRead(this.buf.capacity(), socket.getIdentifier(), socket.getOptions());
                PriorityTransportLayerImpl.this.callback.messageReceived(socket.getIdentifier(), this.buf, socket.getOptions());
                new SizeReader(socket);
            }

            public String toString() {
                return "BufferReader{" + this.buf + "}";
            }
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        class SizeReader
        extends BufferReader {
            public SizeReader(P2PSocket<Identifier> socket) {
                super(4, socket);
            }

            @Override
            public void done(P2PSocket<Identifier> socket) throws IOException {
                int msgSize = this.buf.asIntBuffer().get();
                if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                    PriorityTransportLayerImpl.this.logger.log(EntityManager.this + " reading message of size " + msgSize);
                }
                if (msgSize > PriorityTransportLayerImpl.this.MAX_MSG_SIZE) {
                    if (PriorityTransportLayerImpl.this.logger.level <= 900) {
                        PriorityTransportLayerImpl.this.logger.log(socket + " attempted to send a message of size " + msgSize + ". MAX_MSG_SIZE = " + PriorityTransportLayerImpl.this.MAX_MSG_SIZE);
                    }
                    EntityManager.this.closeMe(socket);
                    return;
                }
                new BufferReader(msgSize, socket);
            }

            @Override
            public String toString() {
                return "SizeReader";
            }
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        class MessageWrapper
        implements Comparable<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper>,
        MessageRequestHandle<Identifier, ByteBuffer> {
            int priority;
            int seq;
            Identifier myIdentifier;
            P2PSocket socket;
            ByteBuffer originalMessage;
            ByteBuffer message;
            MessageCallback<Identifier, ByteBuffer> deliverAckToMe;
            Map<String, Object> options;
            int originalSize;
            boolean cancelled = false;
            boolean completed = false;

            MessageWrapper(Identifier temp, ByteBuffer message, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options, int priority, int seq) {
                this.originalSize = message.remaining();
                this.myIdentifier = temp;
                this.originalMessage = message;
                int size = message.remaining();
                this.message = ByteBuffer.allocate(message.remaining() + 4);
                this.message.put((byte)(size >>> 24 & 0xFF));
                this.message.put((byte)(size >>> 16 & 0xFF));
                this.message.put((byte)(size >>> 8 & 0xFF));
                this.message.put((byte)(size >>> 0 & 0xFF));
                this.message.put(message);
                this.message.clear();
                this.deliverAckToMe = deliverAckToMe;
                this.options = options;
                this.priority = priority;
                this.seq = seq;
            }

            public MessageInfo getMessageInfo() {
                return new MessageInfoImpl(this.originalMessage, this.options, this.priority);
            }

            public void complete() {
                this.completed = true;
                if (this.deliverAckToMe != null) {
                    this.deliverAckToMe.ack(this);
                }
                PriorityTransportLayerImpl.this.notifyListenersWrote(this.originalSize, this.myIdentifier, this.options);
            }

            public boolean receiveSelectResult(P2PSocket<Identifier> socket) throws IOException {
                if (PriorityTransportLayerImpl.this.logger.level <= 300) {
                    PriorityTransportLayerImpl.this.logger.log(this + ".receiveSelectResult(" + socket + ")");
                }
                try {
                    if (this.socket != null && this.socket != socket) {
                        if (PriorityTransportLayerImpl.this.logger.level <= 900) {
                            PriorityTransportLayerImpl.this.logger.log(this + " Socket changed!!! can:" + this.cancelled + " comp:" + this.completed + " socket:" + socket + " writingSocket:" + EntityManager.this.writingSocket + " this.socket:" + this.socket);
                        }
                        socket.shutdownOutput();
                        return false;
                    }
                    this.socket = socket;
                    if (this.cancelled && this.message.position() == 0) {
                        if (PriorityTransportLayerImpl.this.logger.level <= 300) {
                            PriorityTransportLayerImpl.this.logger.log(this + ".rsr(" + socket + ") cancelled");
                        }
                        return true;
                    }
                    long bytesWritten = socket.write(this.message);
                    if (bytesWritten == -1L) {
                        if (PriorityTransportLayerImpl.this.logger.level <= 300) {
                            PriorityTransportLayerImpl.this.logger.log(this + ".rsr(" + socket + ") socket was closed");
                        }
                        EntityManager.this.clearAndEnqueue(this);
                        return false;
                    }
                    if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                        PriorityTransportLayerImpl.this.logger.log(this + " wrote " + bytesWritten + " bytes of " + this.message.capacity() + " remaining:" + this.message.remaining());
                    }
                    if (this.message.hasRemaining()) {
                        if (PriorityTransportLayerImpl.this.logger.level <= 300) {
                            PriorityTransportLayerImpl.this.logger.log(this + ".rsr(" + socket + ") has remaining");
                        }
                        return false;
                    }
                    return EntityManager.this.complete(this);
                }
                catch (IOException ioe) {
                    if (PriorityTransportLayerImpl.this.logger.level <= 300) {
                        PriorityTransportLayerImpl.this.logger.logException(this + ".rsr(" + socket + ")", ioe);
                    }
                    throw ioe;
                }
            }

            public void drop() {
                if (this.deliverAckToMe != null) {
                    this.deliverAckToMe.sendFailed(this, new QueueOverflowException(EntityManager.this.identifier.get(), this.originalMessage));
                }
                PriorityTransportLayerImpl.this.notifyListenersDropped(this.originalSize, this.myIdentifier, this.options);
            }

            /*
             * Ignored method signature, as it can't be verified against descriptor
             */
            @Override
            public int compareTo(MessageWrapper that) {
                if (this.priority == that.priority) {
                    return this.seq - that.seq;
                }
                return this.priority - that.priority;
            }

            @Override
            public Identifier getIdentifier() {
                return this.myIdentifier;
            }

            @Override
            public ByteBuffer getMessage() {
                return this.originalMessage;
            }

            @Override
            public Map<String, Object> getOptions() {
                return this.options;
            }

            public void reset() {
                this.message.clear();
                this.socket = null;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean cancel() {
                this.cancelled = true;
                SortedLinkedList<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> sortedLinkedList = EntityManager.this.queue;
                synchronized (sortedLinkedList) {
                    if (this.equals(EntityManager.this.messageThatIsBeingWritten)) {
                        if (this.message.position() == 0) {
                            EntityManager.this.messageThatIsBeingWritten = null;
                            return true;
                        }
                        return false;
                    }
                    return EntityManager.this.queue.remove(this);
                }
            }

            public String toString() {
                return "MessagWrapper{" + this.message + "}@" + System.identityHashCode(this) + "->" + EntityManager.this.identifier.get() + " pri:" + this.priority + " seq:" + this.seq + " s:" + this.socket;
            }
        }
    }
}

