/*
 * Decompiled with CFR 0.152.
 */
package org.mpisws.p2p.transport.priority;

import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import org.mpisws.p2p.transport.ErrorHandler;
import org.mpisws.p2p.transport.MessageCallback;
import org.mpisws.p2p.transport.MessageRequestHandle;
import org.mpisws.p2p.transport.P2PSocket;
import org.mpisws.p2p.transport.P2PSocketReceiver;
import org.mpisws.p2p.transport.SocketCallback;
import org.mpisws.p2p.transport.SocketRequestHandle;
import org.mpisws.p2p.transport.TransportLayer;
import org.mpisws.p2p.transport.TransportLayerCallback;
import org.mpisws.p2p.transport.exception.NodeIsFaultyException;
import org.mpisws.p2p.transport.liveness.LivenessListener;
import org.mpisws.p2p.transport.liveness.LivenessProvider;
import org.mpisws.p2p.transport.priority.PriorityTransportLayer;
import org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl;
import org.mpisws.p2p.transport.priority.QueueOverflowException;
import org.mpisws.p2p.transport.util.DefaultErrorHandler;
import org.mpisws.p2p.transport.util.SocketRequestHandleImpl;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.selector.SelectorManager;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class PriorityTransportLayerImpl<Identifier>
implements PriorityTransportLayer<Identifier>,
LivenessListener<Identifier>,
TransportLayerCallback<Identifier, ByteBuffer> {
    TransportLayer<Identifier, ByteBuffer> tl;
    LivenessProvider<Identifier> livenessProvider;
    public static final byte PASSTHROUGH_SOCKET_B = 0;
    public static final byte PRIMARY_SOCKET_B = 1;
    public static final byte[] PASSTHROUGH_SOCKET = new byte[]{0};
    public static final byte[] PRIMARY_SOCKET = new byte[]{1};
    public int MAX_MSG_SIZE = 10000;
    public int MAX_QUEUE_SIZE = 30;
    public Hashtable sockets;
    public Logger logger;
    protected Map<Identifier, EntityManager> entityManagers = new HashMap<Identifier, EntityManager>();
    private TransportLayerCallback<Identifier, ByteBuffer> callback;
    private ErrorHandler<Identifier> errorHandler;
    private SelectorManager selectorManager;

    public PriorityTransportLayerImpl(TransportLayer<Identifier, ByteBuffer> tl, LivenessProvider<Identifier> livenessProvider, Environment env, int maxMsgSize, int maxQueueSize, ErrorHandler<Identifier> handler) {
        this.logger = env.getLogManager().getLogger(PriorityTransportLayerImpl.class, null);
        this.selectorManager = env.getSelectorManager();
        this.MAX_MSG_SIZE = maxMsgSize;
        this.MAX_QUEUE_SIZE = maxQueueSize;
        this.tl = tl;
        this.livenessProvider = livenessProvider;
        tl.setCallback(this);
        livenessProvider.addLivenessListener(this);
        this.errorHandler = handler;
        if (this.errorHandler == null) {
            this.errorHandler = new DefaultErrorHandler(this.logger);
        }
    }

    @Override
    public void acceptMessages(boolean b) {
        this.tl.acceptMessages(b);
    }

    @Override
    public void acceptSockets(boolean b) {
        this.tl.acceptSockets(b);
    }

    @Override
    public Identifier getLocalIdentifier() {
        return this.tl.getLocalIdentifier();
    }

    @Override
    public SocketRequestHandle<Identifier> openSocket(Identifier i, final SocketCallback<Identifier> deliverSocketToMe, Map<String, Integer> options) {
        if (deliverSocketToMe == null) {
            throw new IllegalArgumentException("No handle to return socket to! (deliverSocketToMe must be non-null!)");
        }
        final SocketRequestHandleImpl<Identifier> handle = new SocketRequestHandleImpl<Identifier>(i, options);
        handle.setSubCancellable(this.tl.openSocket(i, new SocketCallback<Identifier>(){

            @Override
            public void receiveResult(SocketRequestHandle<Identifier> cancellable, P2PSocket<Identifier> sock) {
                sock.register(false, true, new P2PSocketReceiver<Identifier>(){

                    @Override
                    public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                        if (canRead || !canWrite) {
                            throw new IllegalArgumentException("expected to write!  canRead:" + canRead + " canWrite:" + canWrite);
                        }
                        socket.write(ByteBuffer.wrap(PASSTHROUGH_SOCKET));
                        if (deliverSocketToMe != null) {
                            deliverSocketToMe.receiveResult(handle, socket);
                        }
                    }

                    @Override
                    public void receiveException(P2PSocket<Identifier> socket, IOException e) {
                        if (deliverSocketToMe != null) {
                            deliverSocketToMe.receiveException(handle, e);
                        }
                    }
                });
            }

            @Override
            public void receiveException(SocketRequestHandle<Identifier> s, IOException ex) {
                if (s != handle.getSubCancellable()) {
                    throw new IllegalArgumentException("s != handle.getSubCancellable() must be a bug. s:" + s + " sub:" + handle.getSubCancellable());
                }
                if (deliverSocketToMe != null) {
                    deliverSocketToMe.receiveException(handle, ex);
                }
            }
        }, options));
        return handle;
    }

    protected SocketRequestHandle<Identifier> openPrimarySocket(Identifier i, Map<String, Integer> options) {
        if (this.logger.level <= 500) {
            this.logger.log("Opening Primary Socket to " + i);
        }
        final SocketRequestHandleImpl<Identifier> handle = new SocketRequestHandleImpl<Identifier>(i, options);
        handle.setSubCancellable(this.tl.openSocket(i, new SocketCallback<Identifier>(){

            @Override
            public void receiveResult(SocketRequestHandle<Identifier> cancellable, P2PSocket<Identifier> sock) {
                sock.register(false, true, new P2PSocketReceiver<Identifier>(){

                    @Override
                    public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                        if (canRead || !canWrite) {
                            throw new IllegalArgumentException("expected to write!  canRead:" + canRead + " canWrite:" + canWrite);
                        }
                        socket.write(ByteBuffer.wrap(PRIMARY_SOCKET));
                        PriorityTransportLayerImpl.this.getEntityManager(socket.getIdentifier()).incomingSocket(socket, handle);
                    }

                    @Override
                    public void receiveException(P2PSocket<Identifier> socket, IOException e) {
                        PriorityTransportLayerImpl.this.getEntityManager(socket.getIdentifier()).receiveSocketException(handle, e);
                    }
                });
            }

            @Override
            public void receiveException(SocketRequestHandle<Identifier> s, IOException ex) {
                if (s != handle.getSubCancellable()) {
                    throw new IllegalArgumentException("s != handle.getSubCancellable() must be a bug. s:" + s + " sub:" + handle.getSubCancellable());
                }
                PriorityTransportLayerImpl.this.getEntityManager(s.getIdentifier()).receiveSocketException(handle, ex);
            }
        }, options));
        return handle;
    }

    @Override
    public void incomingSocket(final P2PSocket<Identifier> s) throws IOException {
        s.register(true, false, new P2PSocketReceiver<Identifier>(){

            @Override
            public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                if (socket != s) {
                    throw new IllegalArgumentException("Sockets not equal!!! s:" + s + " socket:" + socket);
                }
                if (canWrite || !canRead) {
                    throw new IllegalArgumentException("Should only be able to read! canRead:" + canRead + " canWrite:" + canWrite);
                }
                ByteBuffer hdr = ByteBuffer.allocate(1);
                int ret = (int)socket.read(hdr);
                switch (ret) {
                    case -1: {
                        socket.close();
                        break;
                    }
                    case 0: {
                        socket.register(true, false, this);
                        break;
                    }
                    case 1: {
                        hdr.flip();
                        byte val = hdr.get();
                        switch (val) {
                            case 0: {
                                PriorityTransportLayerImpl.this.callback.incomingSocket(s);
                                break;
                            }
                            case 1: {
                                if (PriorityTransportLayerImpl.this.logger.level <= 500) {
                                    PriorityTransportLayerImpl.this.logger.log("Opened Primary Socket from " + s.getIdentifier());
                                }
                                PriorityTransportLayerImpl.this.getEntityManager(s.getIdentifier()).incomingSocket(s, null);
                            }
                        }
                        break;
                    }
                    default: {
                        socket.close();
                        throw new IllegalStateException("Read " + ret + " bytes?  Not good.  Expected to read 1 byte.");
                    }
                }
            }

            @Override
            public void receiveException(P2PSocket<Identifier> socket, IOException e) {
                PriorityTransportLayerImpl.this.errorHandler.receivedException(socket.getIdentifier(), e);
            }
        });
    }

    @Override
    public void messageReceived(Identifier i, ByteBuffer m, Map<String, Integer> options) throws IOException {
        this.callback.messageReceived(i, m, options);
    }

    @Override
    public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Integer> options) {
        Integer val;
        if (options != null && options.containsKey("transport_type") && (val = options.get("transport_type")) != null && val == 0) {
            return this.tl.sendMessage(i, m, deliverAckToMe, options);
        }
        return this.getEntityManager(i).send(m, deliverAckToMe, options);
    }

    @Override
    public void setCallback(TransportLayerCallback<Identifier, ByteBuffer> callback) {
        this.callback = callback;
    }

    @Override
    public void setErrorHandler(ErrorHandler<Identifier> handler) {
        this.errorHandler = handler;
    }

    @Override
    public void destroy() {
        this.tl.destroy();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected EntityManager getEntityManager(Identifier i) {
        Map<Identifier, EntityManager> map = this.entityManagers;
        synchronized (map) {
            EntityManager ret = this.entityManagers.get(i);
            if (ret == null) {
                ret = new EntityManager(i);
                this.entityManagers.put(i, ret);
            }
            return ret;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected EntityManager deleteEntityManager(Identifier i) {
        Map<Identifier, EntityManager> map = this.entityManagers;
        synchronized (map) {
            EntityManager ret = this.entityManagers.get(i);
            if (ret != null) {
                ret.clearState();
            }
            return ret;
        }
    }

    @Override
    public void livenessChanged(Identifier i, int val, Map<String, Integer> options) {
        if (val >= 3) {
            this.getEntityManager(i).markDead();
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class EntityManager {
        int seq = Integer.MIN_VALUE;
        Queue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue;
        Collection<P2PSocket<Identifier>> sockets;
        Identifier identifier;
        SocketRequestHandle<Identifier> pendingSocket;
        P2PSocket<Identifier> writingSocket;
        P2PSocket<Identifier> closeWritingSocket;
        org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper messageThatIsBeingWritten;

        EntityManager(Identifier identifier) {
            this.identifier = identifier;
            this.queue = new PriorityQueue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper>();
            this.sockets = new HashSet();
        }

        public void clearState() {
            for (P2PSocket socket : this.sockets) {
                socket.close();
            }
            this.queue.clear();
            this.messageThatIsBeingWritten = null;
            if (this.pendingSocket != null) {
                this.pendingSocket.cancel();
            }
            this.pendingSocket = null;
        }

        public String toString() {
            return "EM{" + this.identifier + "}";
        }

        public boolean closeMe(P2PSocket<Identifier> socket) {
            if (socket == this.writingSocket) {
                if (this.messageThatIsBeingWritten == null) {
                    this.sockets.remove(socket);
                    socket.close();
                    this.writingSocket = null;
                    if (!this.sockets.isEmpty()) {
                        this.writingSocket = this.sockets.iterator().next();
                    }
                    return true;
                }
                this.closeWritingSocket = this.writingSocket;
                return false;
            }
            this.sockets.remove(socket);
            socket.close();
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void incomingSocket(P2PSocket<Identifier> s, SocketRequestHandle<Identifier> receipt) {
            if (receipt != null) {
                if (receipt == this.pendingSocket) {
                    this.pendingSocket = null;
                } else {
                    PriorityTransportLayerImpl.this.logger.log("receipt != pendingSocket!!! receipt:" + receipt + " pendingSocket:" + this.pendingSocket);
                }
            }
            this.sockets.add(s);
            if (this.writingSocket == null) {
                if (this.messageThatIsBeingWritten != null) {
                    throw new IllegalStateException("This is a bug, if there is no writingSocket, there should be no messageThatIsBeingWritten. writingSocket:" + this.writingSocket + " pending:" + this.messageThatIsBeingWritten);
                }
                this.writingSocket = s;
                Queue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue = this.queue;
                synchronized (queue) {
                    if (!this.queue.isEmpty()) {
                        this.messageThatIsBeingWritten = (MessageWrapper)this.queue.poll();
                    }
                }
                if (this.messageThatIsBeingWritten != null) {
                    this.messageThatIsBeingWritten.register(this.writingSocket);
                }
            }
            new SizeReader(s);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void receiveSocketException(SocketRequestHandleImpl<Identifier> handle, IOException ex) {
            if (ex instanceof NodeIsFaultyException) {
                this.markDead();
                return;
            }
            if (handle == this.pendingSocket) {
                this.pendingSocket = null;
                if (this.messageThatIsBeingWritten == null && this.queue.isEmpty()) {
                    return;
                }
                if (this.sockets.isEmpty()) {
                    if (this.pendingSocket == null) {
                        this.pendingSocket = PriorityTransportLayerImpl.this.openPrimarySocket(this.identifier, handle.getOptions());
                    }
                } else {
                    this.writingSocket = this.sockets.iterator().next();
                    if (this.messageThatIsBeingWritten == null) {
                        Queue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue = this.queue;
                        synchronized (queue) {
                            this.messageThatIsBeingWritten = (MessageWrapper)this.queue.poll();
                            if (this.messageThatIsBeingWritten != null) {
                                this.messageThatIsBeingWritten.register(this.writingSocket);
                            }
                        }
                    } else if (this.messageThatIsBeingWritten.socket == null) {
                        this.messageThatIsBeingWritten.register(this.writingSocket);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public MessageRequestHandle<Identifier, ByteBuffer> send(ByteBuffer message, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, final Map<String, Integer> options) {
            MessageWrapper ret;
            int remaining;
            if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                PriorityTransportLayerImpl.this.logger.log(this + "send(" + message + ")");
            }
            int priority = 0;
            if (options != null && options.containsKey("OPTION_PRIORITY")) {
                priority = options.get("OPTION_PRIORITY");
            }
            if ((remaining = message.remaining()) > PriorityTransportLayerImpl.this.MAX_MSG_SIZE) {
                MessageWrapper ret2 = new MessageWrapper(message, deliverAckToMe, options, priority, 0);
                if (deliverAckToMe != null) {
                    deliverAckToMe.sendFailed(ret2, new SocketException("Message too large. msg:" + message + " size:" + remaining + " max:" + PriorityTransportLayerImpl.this.MAX_MSG_SIZE));
                }
                return ret2;
            }
            if (PriorityTransportLayerImpl.this.livenessProvider.getLiveness(this.identifier, options) >= 3) {
                MessageWrapper ret3 = new MessageWrapper(message, deliverAckToMe, options, priority, 0);
                if (deliverAckToMe != null) {
                    deliverAckToMe.sendFailed(ret3, new NodeIsFaultyException(this.identifier, message));
                }
                return ret3;
            }
            Queue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue = this.queue;
            synchronized (queue) {
                ret = new MessageWrapper(message, deliverAckToMe, options, priority, this.seq++);
                this.queue.add((org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper)ret);
                if (this.queue.size() > PriorityTransportLayerImpl.this.MAX_MSG_SIZE) {
                    Iterator it = this.queue.iterator();
                    int ctr = 0;
                    while (it.hasNext()) {
                        MessageWrapper w = (MessageWrapper)it.next();
                        if (ctr >= PriorityTransportLayerImpl.this.MAX_QUEUE_SIZE) {
                            it.remove();
                            if (PriorityTransportLayerImpl.this.logger.level <= 700) {
                                PriorityTransportLayerImpl.this.logger.log("Dropping " + w + " because queue is full. MAX_QUEUE_SIZE:" + PriorityTransportLayerImpl.this.MAX_QUEUE_SIZE);
                            }
                            w.drop();
                        }
                        ++ctr;
                    }
                }
            }
            PriorityTransportLayerImpl.this.selectorManager.invoke(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void run() {
                    if (EntityManager.this.messageThatIsBeingWritten == null && EntityManager.this.queue.isEmpty()) {
                        return;
                    }
                    if (EntityManager.this.sockets.isEmpty()) {
                        if (EntityManager.this.pendingSocket == null) {
                            EntityManager.this.pendingSocket = PriorityTransportLayerImpl.this.openPrimarySocket(EntityManager.this.identifier, options);
                        }
                    } else if (EntityManager.this.writingSocket == null) {
                        EntityManager.this.writingSocket = EntityManager.this.sockets.iterator().next();
                    }
                    if (EntityManager.this.writingSocket != null && EntityManager.this.messageThatIsBeingWritten == null) {
                        Queue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue = EntityManager.this.queue;
                        synchronized (queue) {
                            EntityManager.this.messageThatIsBeingWritten = (MessageWrapper)EntityManager.this.queue.poll();
                            EntityManager.this.messageThatIsBeingWritten.register(EntityManager.this.writingSocket);
                        }
                    }
                }
            });
            return ret;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void markDead() {
            Collection<Object> collection = this.queue;
            synchronized (collection) {
                if (this.messageThatIsBeingWritten != null) {
                    if (this.messageThatIsBeingWritten.deliverAckToMe != null) {
                        this.messageThatIsBeingWritten.deliverAckToMe.sendFailed(this.messageThatIsBeingWritten, new NodeIsFaultyException(this.identifier));
                    }
                    this.messageThatIsBeingWritten = null;
                }
                for (MessageWrapper messageWrapper : this.queue) {
                    if (messageWrapper.deliverAckToMe == null) continue;
                    messageWrapper.deliverAckToMe.sendFailed(messageWrapper, new NodeIsFaultyException(this.identifier));
                }
                this.queue.clear();
            }
            collection = this.sockets;
            synchronized (collection) {
                for (P2PSocket p2PSocket : this.sockets) {
                    p2PSocket.close();
                }
            }
            if (this.pendingSocket != null) {
                this.pendingSocket.cancel();
            }
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        class BufferReader
        implements P2PSocketReceiver<Identifier> {
            ByteBuffer buf;

            public BufferReader(int size, P2PSocket<Identifier> socket) {
                this.buf = ByteBuffer.allocate(size);
                socket.register(true, false, this);
            }

            @Override
            public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                if (canWrite || !canRead) {
                    throw new IllegalStateException(EntityManager.this + " Expected only to read. canRead:" + canRead + " canWrite:" + canWrite + " socket:" + socket);
                }
                try {
                    if (socket.read(this.buf) == -1L) {
                        EntityManager.this.closeMe(socket);
                        return;
                    }
                }
                catch (IOException ioe) {
                    this.receiveException(socket, ioe);
                    return;
                }
                if (this.buf.remaining() == 0) {
                    this.buf.flip();
                    this.done(socket);
                } else {
                    socket.register(true, false, this);
                }
            }

            @Override
            public void receiveException(P2PSocket<Identifier> socket, IOException e) {
                PriorityTransportLayerImpl.this.errorHandler.receivedException(socket.getIdentifier(), e);
                EntityManager.this.closeMe(socket);
            }

            public void done(P2PSocket<Identifier> socket) throws IOException {
                if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                    PriorityTransportLayerImpl.this.logger.log(EntityManager.this + " read message of size " + this.buf.capacity() + " from " + socket);
                }
                PriorityTransportLayerImpl.this.callback.messageReceived(EntityManager.this.identifier, this.buf, socket.getOptions());
                new SizeReader(socket);
            }

            public String toString() {
                return "BufferReader{" + this.buf + "}";
            }
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        class SizeReader
        extends BufferReader {
            public SizeReader(P2PSocket<Identifier> socket) {
                super(4, socket);
            }

            @Override
            public void done(P2PSocket<Identifier> socket) throws IOException {
                int msgSize = this.buf.asIntBuffer().get();
                if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                    PriorityTransportLayerImpl.this.logger.log(EntityManager.this + " reading message of size " + msgSize);
                }
                if (msgSize > PriorityTransportLayerImpl.this.MAX_MSG_SIZE) {
                    if (PriorityTransportLayerImpl.this.logger.level <= 900) {
                        PriorityTransportLayerImpl.this.logger.log(socket + " attempted to send a message of size " + msgSize + ". MAX_MSG_SIZE = " + PriorityTransportLayerImpl.this.MAX_MSG_SIZE);
                    }
                    EntityManager.this.closeMe(socket);
                    return;
                }
                new BufferReader(msgSize, socket);
            }

            @Override
            public String toString() {
                return "SizeReader";
            }
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        class MessageWrapper
        implements Comparable<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper>,
        MessageRequestHandle<Identifier, ByteBuffer>,
        P2PSocketReceiver<Identifier> {
            int priority;
            int seq;
            P2PSocket socket;
            ByteBuffer originalMessage;
            ByteBuffer message;
            MessageCallback<Identifier, ByteBuffer> deliverAckToMe;
            Map<String, Integer> options;
            boolean cancelled = false;

            MessageWrapper(ByteBuffer message, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Integer> options, int priority, int seq) {
                this.originalMessage = message;
                int size = message.remaining();
                this.message = ByteBuffer.allocate(message.remaining() + 4);
                this.message.put((byte)(size >>> 24 & 0xFF));
                this.message.put((byte)(size >>> 16 & 0xFF));
                this.message.put((byte)(size >>> 8 & 0xFF));
                this.message.put((byte)(size >>> 0 & 0xFF));
                this.message.put(message);
                this.message.clear();
                this.deliverAckToMe = deliverAckToMe;
                this.options = options;
                this.priority = priority;
                this.seq = seq;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                if (canRead || !canWrite) {
                    throw new IllegalStateException(this + " Expected only to write. canRead:" + canRead + " canWrite:" + canWrite + " socket:" + socket);
                }
                if (this.socket != null && this.socket != socket) {
                    PriorityTransportLayerImpl.this.logger.log(this + "Socket changed!!! socket:" + socket + " writingSocket:" + EntityManager.this.writingSocket);
                    socket.shutdownOutput();
                    return;
                }
                this.socket = socket;
                if (!this.cancelled || this.message.position() != 0) {
                    long bytesWritten = socket.write(this.message);
                    if (bytesWritten == -1L) {
                        EntityManager.this.sockets.remove(socket);
                        socket.close();
                        this.reset();
                        EntityManager.this.writingSocket = null;
                        socket = null;
                        EntityManager.this.messageThatIsBeingWritten = null;
                        Queue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue = EntityManager.this.queue;
                        synchronized (queue) {
                            EntityManager.this.queue.add((org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper)this);
                        }
                        if (EntityManager.this.sockets.isEmpty()) {
                            if (EntityManager.this.pendingSocket == null) {
                                EntityManager.this.pendingSocket = PriorityTransportLayerImpl.this.openPrimarySocket(EntityManager.this.identifier, socket.getOptions());
                            }
                        } else {
                            EntityManager.this.writingSocket = EntityManager.this.sockets.iterator().next();
                            queue = EntityManager.this.queue;
                            synchronized (queue) {
                                EntityManager.this.messageThatIsBeingWritten = (MessageWrapper)EntityManager.this.queue.poll();
                            }
                        }
                        return;
                    }
                    if (PriorityTransportLayerImpl.this.logger.level <= 400) {
                        PriorityTransportLayerImpl.this.logger.log(this + " wrote " + bytesWritten + " bytes of " + this.message.capacity() + " remaining:" + this.message.remaining());
                    }
                    if (this.message.hasRemaining()) {
                        socket.register(false, true, this);
                        return;
                    }
                }
                if (!this.cancelled && this.deliverAckToMe != null) {
                    this.deliverAckToMe.ack(this);
                }
                EntityManager.this.messageThatIsBeingWritten = null;
                if (EntityManager.this.closeWritingSocket == EntityManager.this.writingSocket) {
                    EntityManager.this.writingSocket.close();
                    EntityManager.this.writingSocket = null;
                    EntityManager.this.closeWritingSocket = null;
                    if (EntityManager.this.sockets.isEmpty()) {
                        boolean emptyQueue = EntityManager.this.queue.isEmpty();
                        if (!emptyQueue && EntityManager.this.pendingSocket == null) {
                            EntityManager.this.pendingSocket = PriorityTransportLayerImpl.this.openPrimarySocket(EntityManager.this.identifier, null);
                            return;
                        }
                    } else {
                        EntityManager.this.writingSocket = EntityManager.this.sockets.iterator().next();
                    }
                }
                if (EntityManager.this.writingSocket != null) {
                    Queue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue = EntityManager.this.queue;
                    synchronized (queue) {
                        if (!EntityManager.this.queue.isEmpty()) {
                            EntityManager.this.messageThatIsBeingWritten = (MessageWrapper)EntityManager.this.queue.poll();
                        }
                    }
                    if (EntityManager.this.messageThatIsBeingWritten != null) {
                        EntityManager.this.messageThatIsBeingWritten.receiveSelectResult(socket, canRead, canWrite);
                    }
                }
            }

            public void drop() {
                if (this.deliverAckToMe != null) {
                    this.deliverAckToMe.sendFailed(this, new QueueOverflowException(EntityManager.this.identifier, this.originalMessage));
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receiveException(P2PSocket<Identifier> socket, IOException e) {
                Queue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue;
                EntityManager.this.sockets.remove(socket);
                socket.close();
                if (this.socket == socket) {
                    this.socket = null;
                    this.reset();
                    if (this == EntityManager.this.messageThatIsBeingWritten) {
                        EntityManager.this.messageThatIsBeingWritten = null;
                        queue = EntityManager.this.queue;
                        synchronized (queue) {
                            EntityManager.this.queue.add((org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper)this);
                        }
                    }
                }
                if (socket == EntityManager.this.writingSocket) {
                    if (EntityManager.this.messageThatIsBeingWritten != null) {
                        throw new IllegalStateException("Pending should be null! pending:" + EntityManager.this.messageThatIsBeingWritten + " this:" + this + " socket:" + socket);
                    }
                    EntityManager.this.writingSocket = null;
                    EntityManager.this.closeWritingSocket = null;
                    if (EntityManager.this.sockets.isEmpty()) {
                        boolean emptyQueue = EntityManager.this.queue.isEmpty();
                        if (!emptyQueue && EntityManager.this.pendingSocket == null) {
                            EntityManager.this.pendingSocket = PriorityTransportLayerImpl.this.openPrimarySocket(EntityManager.this.identifier, ((MessageWrapper)EntityManager.this.queue.peek()).getOptions());
                            return;
                        }
                    } else {
                        EntityManager.this.writingSocket = EntityManager.this.sockets.iterator().next();
                        queue = EntityManager.this.queue;
                        synchronized (queue) {
                            if (!EntityManager.this.queue.isEmpty()) {
                                EntityManager.this.messageThatIsBeingWritten = (MessageWrapper)EntityManager.this.queue.poll();
                            }
                        }
                        EntityManager.this.messageThatIsBeingWritten.register(EntityManager.this.writingSocket);
                    }
                }
            }

            public void register(P2PSocket<Identifier> socket) {
                if (socket != this.socket) {
                    this.reset();
                }
                this.socket = socket;
                socket.register(false, true, this);
            }

            /*
             * Ignored method signature, as it can't be verified against descriptor
             */
            @Override
            public int compareTo(MessageWrapper that) {
                if (this.priority == that.priority) {
                    return this.seq - that.seq;
                }
                return this.priority - that.priority;
            }

            @Override
            public Identifier getIdentifier() {
                return EntityManager.this.identifier;
            }

            @Override
            public ByteBuffer getMessage() {
                return this.originalMessage;
            }

            @Override
            public Map<String, Integer> getOptions() {
                return this.options;
            }

            public void reset() {
                this.message.clear();
                this.socket = null;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean cancel() {
                this.cancelled = true;
                if (this.equals(EntityManager.this.messageThatIsBeingWritten)) {
                    return this.message.position() == 0;
                }
                Queue<org.mpisws.p2p.transport.priority.PriorityTransportLayerImpl$EntityManager.MessageWrapper> queue = EntityManager.this.queue;
                synchronized (queue) {
                    return EntityManager.this.queue.remove(this);
                }
            }

            public String toString() {
                return "MessagWrapper{" + this.message + "}->" + EntityManager.this.identifier + " pri:" + this.priority + " seq:" + this.seq;
            }
        }
    }
}

