/*
 * Decompiled with CFR 0.152.
 */
package rice.pastry.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import rice.environment.logging.Logger;
import rice.environment.params.Parameters;
import rice.environment.random.RandomSource;
import rice.p2p.commonapi.appsocket.AppSocketReceiver;
import rice.p2p.commonapi.rawserialization.MessageDeserializer;
import rice.p2p.util.MathUtils;
import rice.pastry.messaging.JavaSerializedDeserializer;
import rice.pastry.messaging.Message;
import rice.pastry.socket.EpochInetSocketAddress;
import rice.pastry.socket.PingManager;
import rice.pastry.socket.PingResponseListener;
import rice.pastry.socket.SocketAppSocket;
import rice.pastry.socket.SocketBuffer;
import rice.pastry.socket.SocketChannelRepeater;
import rice.pastry.socket.SocketManager;
import rice.pastry.socket.SocketPastryNode;
import rice.pastry.socket.SocketSourceRouteManager;
import rice.pastry.socket.SourceRoute;
import rice.selector.SelectionKeyHandler;
import rice.selector.TimerTask;

public class SocketCollectionManager
extends SelectionKeyHandler {
    public final int MAX_OPEN_SOCKETS;
    public final int MAX_OPEN_SOURCE_ROUTES;
    public final int SOCKET_BUFFER_SIZE;
    public final int PING_DELAY;
    public final float PING_JITTER;
    public final int NUM_PING_TRIES;
    public final int WRITE_WAIT_TIME;
    public final long BACKOFF_INITIAL;
    public final int BACKOFF_LIMIT;
    SocketPastryNode pastryNode;
    EpochInetSocketAddress localAddress;
    private LinkedList socketQueue;
    public Hashtable sockets;
    public LinkedList appSockets;
    HashSet unIdentifiedSM = new HashSet();
    private LinkedList sourceRouteQueue;
    private SelectionKey key;
    private PingManager pingManager;
    SocketSourceRouteManager manager;
    private boolean resigned;
    protected Logger logger;
    protected RandomSource random;
    MessageDeserializer defaultDeserializer;
    protected static byte[] HEADER_DIRECT = new byte[]{6, 27, 73, 116};
    protected static byte[] HEADER_SOURCE_ROUTE = new byte[]{25, 83, 19, 0};
    public static int HEADER_SIZE = HEADER_DIRECT.length;
    protected static byte[] PASTRY_MAGIC_NUMBER = new byte[]{39, 64, 117, 58};
    public static int TOTAL_HEADER_SIZE = PASTRY_MAGIC_NUMBER.length + 4 + HEADER_SIZE;

    public SocketCollectionManager(SocketPastryNode node, SocketSourceRouteManager manager, EpochInetSocketAddress bindAddress, EpochInetSocketAddress proxyAddress, RandomSource random) throws IOException {
        this.pastryNode = node;
        this.defaultDeserializer = new JavaSerializedDeserializer(node);
        this.manager = manager;
        this.localAddress = proxyAddress;
        this.pingManager = new PingManager(node, manager, bindAddress, proxyAddress);
        this.socketQueue = new LinkedList();
        this.appSockets = new LinkedList();
        this.sockets = new Hashtable();
        this.sourceRouteQueue = new LinkedList();
        this.resigned = false;
        this.logger = node.getEnvironment().getLogManager().getLogger(SocketCollectionManager.class, null);
        this.random = random;
        if (random == null) {
            this.random = node.getEnvironment().getRandomSource();
        }
        Parameters p = this.pastryNode.getEnvironment().getParameters();
        this.MAX_OPEN_SOCKETS = p.getInt("pastry_socket_scm_max_open_sockets");
        this.MAX_OPEN_SOURCE_ROUTES = p.getInt("pastry_socket_scm_max_open_source_routes");
        this.SOCKET_BUFFER_SIZE = p.getInt("pastry_socket_scm_socket_buffer_size");
        this.PING_DELAY = p.getInt("pastry_socket_scm_ping_delay");
        this.PING_JITTER = p.getFloat("pastry_socket_scm_ping_jitter");
        this.NUM_PING_TRIES = p.getInt("pastry_socket_scm_num_ping_tries");
        this.WRITE_WAIT_TIME = p.getInt("pastry_socket_scm_write_wait_time");
        this.BACKOFF_INITIAL = p.getInt("pastry_socket_scm_backoff_initial");
        this.BACKOFF_LIMIT = p.getInt("pastry_socket_scm_backoff_limit");
        if (this.logger.level <= 500) {
            this.logger.log("BINDING TO ADDRESS " + bindAddress + " AND CLAIMING " + this.localAddress);
        }
        ServerSocketChannel temp = null;
        try {
            ServerSocketChannel channel;
            temp = channel = ServerSocketChannel.open();
            channel.configureBlocking(false);
            channel.socket().setReuseAddress(true);
            channel.socket().bind(bindAddress.getAddress());
            this.key = this.pastryNode.getEnvironment().getSelectorManager().register(channel, this, 0);
            this.key.interestOps(16);
        }
        catch (IOException e) {
            try {
                if (temp != null) {
                    temp.close();
                }
            }
            catch (IOException e2) {
                // empty catch block
            }
            this.pingManager.resign();
            throw e;
        }
    }

    public boolean isOpen(SourceRoute route) {
        return this.sockets.containsKey(route);
    }

    protected SourceRoute getSocketToClose() {
        for (int i = this.socketQueue.size() - 1; i >= 0; --i) {
            if (!((SocketManager)this.sockets.get(this.socketQueue.get((int)i))).writer.isEmpty()) continue;
            return (SourceRoute)this.socketQueue.get(i);
        }
        return null;
    }

    public int getNumSourceRoutes() {
        return this.sourceRouteQueue.size();
    }

    public int getNumSockets() {
        return this.socketQueue.size();
    }

    public PingManager getPingManager() {
        return this.pingManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void bootstrap(SourceRoute path, Message message) throws IOException {
        if (!this.resigned) {
            Hashtable hashtable = this.sockets;
            synchronized (hashtable) {
                this.openSocket(path, true);
                ((SocketManager)this.sockets.get(path)).send(message);
            }
        }
    }

    public void send(SourceRoute path, SocketBuffer message, SocketSourceRouteManager.AddressManager am) {
        if (!this.sendInternal(path, message)) {
            new MessageRetry(path, message, am);
        }
    }

    public void connect(SourceRoute path, int appId, AppSocketReceiver receiver, int timeout) {
        this.openAppSocket(path, appId, receiver, timeout);
    }

    public void ping(SourceRoute route) {
        if (!this.resigned) {
            this.pingManager.ping(route, null);
        }
    }

    protected void checkLiveness(SourceRoute path) {
        if (path.getLastHop().equals(this.localAddress)) {
            return;
        }
        if (!this.resigned) {
            int prox = this.manager.proximity(path);
            if (prox < 0) {
                this.manager.proximity(path);
            }
            if (this.logger.level <= 500) {
                this.logger.log("CHECK DEAD: " + this.localAddress + " CHECKING DEATH OF PATH " + path + " prox:" + prox);
            }
            DeadChecker checker = new DeadChecker(path, this.NUM_PING_TRIES);
            int delay = prox * 2;
            if (delay > this.PING_DELAY) {
                delay = this.PING_DELAY;
            }
            this.pastryNode.getTimer().schedule(checker, delay);
            this.pingManager.ping(path, checker);
        }
    }

    public void declaredDead(EpochInetSocketAddress address) {
        SourceRoute[] routes = this.sockets.keySet().toArray(new SourceRoute[0]);
        for (int i = 0; i < routes.length; ++i) {
            if (!routes[i].getLastHop().equals(address)) continue;
            if (this.logger.level <= 500) {
                this.logger.log("WRITE_TIMER::Closing active socket to " + routes[i]);
            }
            ((SocketManager)this.sockets.get(routes[i])).close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean sendInternal(SourceRoute path, SocketBuffer message) {
        if (!this.resigned) {
            Hashtable hashtable = this.sockets;
            synchronized (hashtable) {
                if (!this.sockets.containsKey(path)) {
                    if (this.logger.level <= 500) {
                        this.logger.log("(SCM) No connection open to path " + path + " - opening one");
                    }
                    this.openSocket(path, false);
                }
                if (this.sockets.containsKey(path)) {
                    if (this.logger.level <= 500) {
                        this.logger.log("(SCM) Found connection open to path " + path + " - sending now");
                    }
                    ((SocketManager)this.sockets.get(path)).send(message);
                    this.socketUpdated(path);
                    return true;
                }
                if (this.logger.level <= 900) {
                    this.logger.log("(SCM) ERROR: Could not connect to remote address " + path + " delaying " + message);
                }
                return false;
            }
        }
        return true;
    }

    public void accept(SelectionKey key) {
        block2: {
            try {
                new SocketAccepter(key);
            }
            catch (IOException e) {
                if (this.logger.level > 900) break block2;
                this.logger.log("ERROR (accepting connection): " + e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void openSocket(SourceRoute path, boolean bootstrap) {
        try {
            Hashtable hashtable = this.sockets;
            synchronized (hashtable) {
                if (!(this.sockets.containsKey(path) || this.sockets.size() >= this.MAX_OPEN_SOCKETS && this.getSocketToClose() == null)) {
                    this.socketOpened(path, new SocketManager(this, path, bootstrap));
                }
            }
        }
        catch (IOException e) {
            if (this.logger.level <= 900) {
                this.logger.logException("GOT ERROR " + e + " OPENING PATH - MARKING PATH " + path + " AS DEAD!", e);
            }
            this.closeSocket(path);
            this.manager.markDead(path);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void openAppSocket(SourceRoute path, int appId, AppSocketReceiver connector, int timeout) {
        try {
            Hashtable hashtable = this.sockets;
            synchronized (hashtable) {
                this.appSocketOpened(new SocketAppSocket(this, path, appId, connector, timeout));
            }
        }
        catch (IOException e) {
            if (this.logger.level <= 900) {
                this.logger.logException("GOT ERROR " + e + " OPENING PATH - MARKING PATH " + path + " AS DEAD!", e);
            }
            this.manager.markDead(path);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void closeSocket(SourceRoute path) {
        Hashtable hashtable = this.sockets;
        synchronized (hashtable) {
            if (this.sockets.containsKey(path)) {
                ((SocketManager)this.sockets.get(path)).shutdown();
            } else if (this.logger.level <= 1000) {
                this.logger.log("(SCM) SERIOUS ERROR: Request to close socket to non-open handle to path " + path);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void socketOpened(SourceRoute path, SocketManager manager) {
        Hashtable hashtable = this.sockets;
        synchronized (hashtable) {
            if (!this.sockets.containsKey(path)) {
                this.unIdentifiedSM.remove(manager);
                this.sockets.put(path, manager);
                this.socketQueue.addFirst(path);
                if (this.logger.level <= 500) {
                    this.logger.log("(SCM) Recorded opening of socket to path " + path);
                }
                if (this.sockets.size() + this.appSockets.size() > this.MAX_OPEN_SOCKETS) {
                    this.closeOneSocket();
                }
            } else {
                if (this.logger.level <= 500) {
                    this.logger.logException("(SCM) Request to record path opening for already-open path " + path, new Exception("stack trace"));
                }
                String local = "" + this.localAddress.getAddress().getAddress().getHostAddress() + ":" + this.localAddress.getAddress().getPort();
                String remote = "" + path.getLastHop().getAddress().getAddress().getHostAddress() + ":" + path.getLastHop().getAddress().getPort();
                if (this.logger.level <= 500) {
                    this.logger.log("(SCM) RESOLVE: Comparing paths " + local + " and " + remote);
                }
                if (remote.compareTo(local) < 0) {
                    if (this.logger.level <= 500) {
                        this.logger.log("(SCM) RESOLVE: Cancelling existing connection to " + path);
                    }
                    SocketManager toClose = (SocketManager)this.sockets.get(path);
                    this.socketClosed(path, toClose);
                    this.socketOpened(path, manager);
                    toClose.close();
                } else if (this.logger.level <= 500) {
                    this.logger.log("(SCM) RESOLVE: Implicitly cancelling new connection to path " + path);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void appSocketOpened(SocketAppSocket sas) {
        Hashtable hashtable = this.sockets;
        synchronized (hashtable) {
            if (this.logger.level <= 500) {
                this.logger.log("(SCM) Recorded opening of app socket " + sas);
            }
            this.appSockets.addFirst(this.manager);
            if (this.sockets.size() + this.appSockets.size() > this.MAX_OPEN_SOCKETS) {
                this.closeOneSocket();
            }
        }
    }

    protected void closeOneSocket() {
        SourceRoute toClose = this.getSocketToClose();
        this.socketQueue.remove(toClose);
        if (this.logger.level <= 500) {
            this.logger.log("(SCM) Too many sockets open - closing currently unused socket to path " + toClose);
        }
        this.closeSocket(toClose);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void socketClosed(SourceRoute path, SocketManager manager) {
        Hashtable hashtable = this.sockets;
        synchronized (hashtable) {
            if (this.sockets.containsKey(path)) {
                if (this.sockets.get(path) == manager) {
                    if (this.logger.level <= 500) {
                        this.logger.log("(SCM) Recorded closing of socket to " + path);
                    }
                    this.socketQueue.remove(path);
                    this.sockets.remove(path);
                } else if (this.logger.level <= 500) {
                    this.logger.log("(SCM) SocketClosed called with corrent address, but incorrect manager - not a big deal.");
                }
            } else if (this.logger.level <= 500) {
                this.logger.log("(SCM) SocketClosed called with socket not in the list: path:" + path + " manager:" + manager);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void appSocketClosed(SocketAppSocket sas) {
        Hashtable hashtable = this.sockets;
        synchronized (hashtable) {
            if (this.appSockets.contains(sas)) {
                if (this.logger.level <= 500) {
                    this.logger.log("(SCM) Recorded closing of app socket to " + sas);
                }
                this.appSockets.remove(sas);
            } else if (this.logger.level <= 500) {
                this.logger.log("(SCM) appSocketClosed called with socket not in the list: path:" + sas);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void socketUpdated(SourceRoute path) {
        Hashtable hashtable = this.sockets;
        synchronized (hashtable) {
            if (this.sockets.containsKey(path)) {
                this.socketQueue.remove(path);
                this.socketQueue.addFirst(path);
            } else if (this.logger.level <= 1000) {
                this.logger.log("(SCM) SERIOUS ERROR: Request to record update for non-existant socket to " + path);
            }
        }
    }

    protected void sourceRouteOpened(SourceRouteManager manager) {
        if (!this.sourceRouteQueue.contains(manager)) {
            this.sourceRouteQueue.addFirst(manager);
            if (this.logger.level <= 500) {
                this.logger.log("(SCM) Recorded opening of source route manager " + manager);
            }
            if (this.sourceRouteQueue.size() > this.MAX_OPEN_SOURCE_ROUTES) {
                SourceRouteManager toClose = (SourceRouteManager)this.sourceRouteQueue.removeLast();
                if (this.logger.level <= 500) {
                    this.logger.log("(SCM) Too many source routes open - closing source route manager " + toClose);
                }
                toClose.close();
                this.sourceRouteClosed(toClose);
            }
        } else {
            if (this.logger.level <= 500) {
                this.logger.log("(SCM) ERROR: Request to record source route opening for already-open manager " + manager);
            }
            this.sourceRouteUpdated(manager);
        }
    }

    protected void sourceRouteClosed(SourceRouteManager manager) {
        if (this.sourceRouteQueue.contains(manager)) {
            this.sourceRouteQueue.remove(manager);
            if (this.logger.level <= 500) {
                this.logger.log("(SCM) Recorded closing of source route manager " + manager);
            }
        } else if (this.logger.level <= 900) {
            this.logger.log("(SCM) ERROR: Request to record source route closing for unknown manager " + manager);
        }
    }

    protected void sourceRouteUpdated(SourceRouteManager manager) {
        if (this.sourceRouteQueue.contains(manager)) {
            this.sourceRouteQueue.remove(manager);
            this.sourceRouteQueue.addFirst(manager);
        } else if (this.logger.level <= 1000) {
            this.logger.log("(SCM) SERIOUS ERROR: Request to record update for unknown source route " + manager);
        }
    }

    public void destroy() throws IOException {
        this.resigned = true;
        this.pingManager.resign();
        while (this.socketQueue.size() > 0) {
            ((SocketManager)this.sockets.get(this.socketQueue.getFirst())).close();
        }
        while (this.sourceRouteQueue.size() > 0) {
            ((SourceRouteManager)this.sourceRouteQueue.getFirst()).close();
        }
        while (this.sockets.size() > 0) {
            ((SocketManager)this.sockets.values().iterator().next()).close();
        }
        while (this.unIdentifiedSM.size() > 0) {
            ((SocketManager)this.unIdentifiedSM.iterator().next()).close();
        }
        this.key.channel().close();
        this.key.cancel();
        this.key.attach(null);
    }

    public void stall() {
        this.key.interestOps(this.key.interestOps() & 0xFFFFFFEF);
        Iterator i = this.sockets.keySet().iterator();
        while (i.hasNext()) {
            SelectionKey key = ((SocketManager)this.sockets.get(i.next())).key;
            key.interestOps(key.interestOps() & 0xFFFFFFFE);
        }
        this.pingManager.stall();
    }

    protected class SocketAccepter
    extends SelectionKeyHandler {
        private SelectionKey key;
        private ByteBuffer buffer = ByteBuffer.allocateDirect(TOTAL_HEADER_SIZE);
        ByteBuffer appTypeBuffer = null;
        byte[] array = null;

        public SocketAccepter(SelectionKey key) throws IOException {
            this.acceptConnection(key);
        }

        public void close() {
            block3: {
                try {
                    if (this.key != null) {
                        this.key.channel().close();
                        this.key.cancel();
                        this.key.attach(null);
                        this.key = null;
                    }
                }
                catch (IOException e) {
                    if (SocketCollectionManager.this.logger.level > 900) break block3;
                    SocketCollectionManager.this.logger.log("(SA) ERROR: Recevied exception " + e + " while closing just accepted socket!");
                }
            }
        }

        protected void acceptConnection(SelectionKey serverKey) throws IOException {
            SocketChannel channel = ((ServerSocketChannel)serverKey.channel()).accept();
            channel.socket().setSendBufferSize(SocketCollectionManager.this.SOCKET_BUFFER_SIZE);
            channel.socket().setReceiveBufferSize(SocketCollectionManager.this.SOCKET_BUFFER_SIZE);
            channel.configureBlocking(false);
            if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("(SA) Accepted incoming connection from " + channel.socket().getRemoteSocketAddress());
            }
            SocketCollectionManager.this.pastryNode.broadcastChannelOpened((InetSocketAddress)channel.socket().getRemoteSocketAddress(), 3);
            this.key = SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().register(channel, this, 1);
        }

        public void read(SelectionKey key) {
            try {
                int read = ((SocketChannel)key.channel()).read(this.buffer);
                if (SocketCollectionManager.this.logger.level <= 500) {
                    SocketCollectionManager.this.logger.log("(SA)1 Read " + read + " bytes from newly accepted connection.");
                }
                if (read == -1) {
                    throw new IOException("Error on read - the channel has been closed.");
                }
                if (this.buffer.remaining() == 0) {
                    this.processBuffer();
                }
            }
            catch (IOException e) {
                if (SocketCollectionManager.this.logger.level <= 500) {
                    SocketCollectionManager.this.logger.log("(SA) ERROR " + e + " reading source route - cancelling.");
                }
                this.close();
            }
        }

        private void processBuffer() throws IOException {
            if (this.appTypeBuffer == null) {
                this.buffer.flip();
                this.array = new byte[HEADER_SIZE];
                this.buffer.get(this.array, 0, HEADER_SIZE);
                if (!Arrays.equals(this.array, PASTRY_MAGIC_NUMBER)) {
                    throw new IOException("Not a pastry socket:" + this.array[0] + "," + this.array[1] + "," + this.array[2] + "," + this.array[3]);
                }
                this.buffer.get(this.array, 0, HEADER_SIZE);
                int version = MathUtils.byteArrayToInt(this.array);
                if (version != 0) {
                    throw new IOException("Unknown Version:" + version);
                }
                this.buffer.get(this.array, 0, HEADER_SIZE);
                this.appTypeBuffer = ByteBuffer.allocateDirect(4);
            }
            if (Arrays.equals(this.array, HEADER_DIRECT)) {
                int read = ((SocketChannel)this.key.channel()).read(this.appTypeBuffer);
                if (SocketCollectionManager.this.logger.level <= 500) {
                    SocketCollectionManager.this.logger.log("(SA)2 Read " + read + " bytes from newly accepted connection.");
                }
                if (this.appTypeBuffer.hasRemaining()) {
                    return;
                }
                this.appTypeBuffer.flip();
                byte[] appIDbytes = new byte[4];
                this.appTypeBuffer.get(appIDbytes, 0, 4);
                int appId = MathUtils.byteArrayToInt(appIDbytes);
                if (appId == 0) {
                    SocketCollectionManager.this.unIdentifiedSM.add(new SocketManager(SocketCollectionManager.this, this.key));
                } else {
                    if (SocketCollectionManager.this.logger.level <= 500) {
                        SocketCollectionManager.this.logger.log("Found connection with AppId " + appId);
                    }
                    SocketCollectionManager.this.appSockets.add(new SocketAppSocket(SocketCollectionManager.this, this.key, appId));
                }
            } else if (Arrays.equals(this.array, HEADER_SOURCE_ROUTE)) {
                new SourceRouteManager(this.key);
            } else {
                if (SocketCollectionManager.this.logger.level <= 900) {
                    SocketCollectionManager.this.logger.log("ERROR: Improperly formatted header received accepted connection - ignoring.");
                }
                if (SocketCollectionManager.this.logger.level <= 900) {
                    SocketCollectionManager.this.logger.log("READ " + this.array[0] + " " + this.array[1] + " " + this.array[2] + " " + this.array[3]);
                }
                throw new IOException("Improperly formatted header received - unknown header.");
            }
        }
    }

    protected class SourceRouteManager
    extends SelectionKeyHandler {
        private SocketChannel channel1;
        private SocketChannel channel2;
        private SocketChannelRepeater repeater;

        public SourceRouteManager(SelectionKey key) throws IOException {
            this.repeater = new SocketChannelRepeater(SocketCollectionManager.this.pastryNode, this);
            SocketCollectionManager.this.sourceRouteOpened(this);
            this.acceptConnection(key);
        }

        SocketChannel otherChannel(SelectableChannel channel) {
            return channel == this.channel1 ? this.channel2 : this.channel1;
        }

        protected void addInterestOp(SelectableChannel channel, int op) throws IOException {
            String k = "unlogged";
            if (SocketCollectionManager.this.logger.level <= 400) {
                k = channel == this.channel1 ? "1" : "2";
                SocketCollectionManager.this.logger.log("(SRM) " + this + "   adding interest op " + op + " to key " + k);
            }
            if (SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().getKey(channel) == null) {
                if (SocketCollectionManager.this.logger.level <= 400) {
                    SocketCollectionManager.this.logger.log("(SRM) " + this + "   key " + k + " is null - reregistering with ops " + op);
                }
                SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().register(channel, this, op);
            } else {
                SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().register(channel, this, SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().getKey(channel).interestOps() | op);
                if (SocketCollectionManager.this.logger.level <= 400) {
                    SocketCollectionManager.this.logger.log("(SRM) " + this + "   interest ops for key " + k + " are now " + SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().getKey(channel).interestOps());
                }
            }
        }

        protected void removeInterestOp(SelectableChannel channel, int op) throws IOException {
            SelectionKey key;
            String k = "unlogged";
            if (SocketCollectionManager.this.logger.level <= 400) {
                k = channel == this.channel1 ? "1" : "2";
                SocketCollectionManager.this.logger.log("(SRM) " + this + "   removing interest op " + op + " from key " + k);
            }
            if ((key = SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().getKey(channel)) != null) {
                key.interestOps(key.interestOps() & ~op);
                if (key.interestOps() == 0) {
                    if (SocketCollectionManager.this.logger.level <= 400) {
                        SocketCollectionManager.this.logger.log("(SRM) " + this + "   key " + k + " has no interest ops - cancelling");
                    }
                    SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().cancel(key);
                }
            }
        }

        public void shutdown(SocketChannel channel) {
            try {
                if (SocketCollectionManager.this.logger.level <= 500) {
                    SocketCollectionManager.this.logger.log("(SRM) " + this + " shutting down output to key " + (channel == this.channel1 ? "1" : "2"));
                }
                channel.socket().shutdownOutput();
                SocketCollectionManager.this.sourceRouteClosed(this);
            }
            catch (IOException e) {
                if (SocketCollectionManager.this.logger.level <= 1000) {
                    SocketCollectionManager.this.logger.log("ERROR: Received exception " + e + " while shutting down SR output.");
                }
                this.close();
            }
        }

        public void close() {
            block7: {
                if (SocketCollectionManager.this.logger.level <= 500) {
                    SocketCollectionManager.this.logger.log("(SRM) " + this + " closing source route");
                }
                try {
                    SelectionKey key;
                    if (this.channel1 != null) {
                        key = SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().getKey(this.channel1);
                        if (key != null) {
                            key.cancel();
                        }
                        this.channel1.close();
                        this.channel1 = null;
                    }
                    if (this.channel2 != null) {
                        key = SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().getKey(this.channel2);
                        if (key != null) {
                            key.cancel();
                        }
                        this.channel2.close();
                        this.channel2 = null;
                    }
                    SocketCollectionManager.this.sourceRouteClosed(this);
                }
                catch (IOException e) {
                    if (SocketCollectionManager.this.logger.level > 900) break block7;
                    SocketCollectionManager.this.logger.log("ERROR: Recevied exception " + e + " while closing intermediateSourceRoute!");
                }
            }
        }

        public void connect(SelectionKey key) {
            if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("(SRM) " + this + " connecting to key " + (key.channel() == this.channel1 ? "1" : "2"));
            }
            try {
                if (((SocketChannel)key.channel()).finishConnect()) {
                    this.removeInterestOp(key.channel(), 8);
                }
                if (SocketCollectionManager.this.logger.level <= 500) {
                    SocketCollectionManager.this.logger.log("(SRM) Found connectable source route channel - completed connection");
                }
            }
            catch (IOException e) {
                if (SocketCollectionManager.this.logger.level <= 900) {
                    SocketCollectionManager.this.logger.log("(SRM) Got exception " + e + " on connect - killing off source route");
                }
                this.close();
            }
        }

        public void read(SelectionKey key) {
            block12: {
                String k = "unlogged";
                if (SocketCollectionManager.this.logger.level <= 500) {
                    k = key.channel() == this.channel1 ? "1" : "2";
                    SocketCollectionManager.this.logger.log("(SRM) " + this + " reading from key " + k + " " + key.interestOps());
                }
                try {
                    try {
                        if (this.repeater.read((SocketChannel)key.channel())) {
                            this.addInterestOp(this.otherChannel(key.channel()), 4);
                            this.removeInterestOp(key.channel(), 1);
                        }
                        if (SocketCollectionManager.this.logger.level <= 500) {
                            SocketCollectionManager.this.logger.log("(SRM) " + this + " done reading from key " + k);
                        }
                    }
                    catch (ClosedChannelException e) {
                        if (SocketCollectionManager.this.logger.level <= 500) {
                            SocketCollectionManager.this.logger.log("(SRM) " + this + " reading from key " + k + " returned -1 - processing shutdown");
                        }
                        if (this.otherChannel(key.channel()).socket().isInputShutdown()) {
                            if (SocketCollectionManager.this.logger.level <= 500) {
                                SocketCollectionManager.this.logger.log("(SRM) " + this + " other key is shut down - closing");
                            }
                            this.close();
                            break block12;
                        }
                        ((SocketChannel)key.channel()).socket().shutdownInput();
                        this.removeInterestOp(key.channel(), 1);
                        this.removeInterestOp(this.otherChannel(key.channel()), 4);
                        if (SocketCollectionManager.this.logger.level <= 500) {
                            SocketCollectionManager.this.logger.log("(SRM) " + this + " other key not yet closed - shutting it down");
                        }
                        this.shutdown(this.otherChannel(key.channel()));
                    }
                }
                catch (IOException e) {
                    if (SocketCollectionManager.this.logger.level <= 500) {
                        SocketCollectionManager.this.logger.logException("(SRM) ERROR " + e + " reading source route - cancelling.", e);
                    }
                    this.close();
                }
            }
        }

        public synchronized void write(SelectionKey key) {
            String k;
            String string = k = key.channel() == this.channel1 ? "1" : "2";
            if (SocketCollectionManager.this.logger.level <= 400) {
                SocketCollectionManager.this.logger.log("(SRM) " + this + " writing to key " + k + " " + key.interestOps());
            }
            try {
                if (this.repeater.write((SocketChannel)key.channel())) {
                    this.addInterestOp(this.otherChannel(key.channel()), 1);
                    this.removeInterestOp(key.channel(), 4);
                }
                if (SocketCollectionManager.this.logger.level <= 400) {
                    SocketCollectionManager.this.logger.log("(SRM) " + this + " done writing to key " + k);
                }
            }
            catch (IOException e) {
                if (SocketCollectionManager.this.logger.level <= 900) {
                    SocketCollectionManager.this.logger.log("ERROR " + e + " writing source route - cancelling.");
                }
                this.close();
            }
        }

        protected void acceptConnection(SelectionKey key) throws IOException {
            if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("(SRM) " + this + " accepted connection for key 1 as " + ((SocketChannel)key.channel()).socket().getRemoteSocketAddress());
            }
            if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("(SRM) Accepted source route connection from " + ((SocketChannel)key.channel()).socket().getRemoteSocketAddress());
            }
            SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().register(key.channel(), this, 1);
            this.channel1 = (SocketChannel)key.channel();
        }

        protected void createConnection(EpochInetSocketAddress address) throws IOException {
            if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("(SRM) " + this + " creating connection for key 2 as " + address.getAddress());
            }
            this.channel2 = SocketChannel.open();
            this.channel2.socket().setSendBufferSize(SocketCollectionManager.this.SOCKET_BUFFER_SIZE);
            this.channel2.socket().setReceiveBufferSize(SocketCollectionManager.this.SOCKET_BUFFER_SIZE);
            this.channel2.configureBlocking(false);
            if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("(SRM) Initiating source route connection to " + address);
            }
            SocketCollectionManager.this.pastryNode.broadcastChannelOpened(address.address, 1);
            boolean done = this.channel2.connect(address.getAddress());
            if (done) {
                SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().register(this.channel2, this, 1);
            } else {
                SocketCollectionManager.this.pastryNode.getEnvironment().getSelectorManager().register(this.channel2, this, 9);
            }
            if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("(SRM) " + this + "   setting initial ops to " + 1 + " for key 2");
            }
        }

        public String toString() {
            String s1 = null;
            if (this.channel1 != null) {
                s1 = this.channel1.socket() != null ? (this.channel1.socket().getRemoteSocketAddress() != null ? this.channel1.socket().getRemoteSocketAddress().toString() : this.channel1.socket().toString()) : this.channel1.toString();
            }
            String s2 = null;
            if (this.channel2 != null) {
                s2 = this.channel2.socket() != null ? (this.channel2.socket().getRemoteSocketAddress() != null ? this.channel2.socket().getRemoteSocketAddress().toString() : this.channel2.socket().toString()) : this.channel2.toString();
            }
            return "SourceRouteManager " + s1 + " to " + s2;
        }
    }

    protected class DeadChecker
    extends TimerTask
    implements PingResponseListener {
        protected int tries = 1;
        protected int numTries;
        protected SourceRoute path;

        public DeadChecker(SourceRoute path, int numTries) {
            if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("DeadChecker(" + path + ") started.");
            }
            this.path = path;
            this.numTries = numTries;
        }

        public void pingResponse(SourceRoute path, long RTT, long timeHeardFrom) {
            if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("Terminated DeadChecker(" + path + ") due to ping.");
            }
            SocketCollectionManager.this.manager.markAlive(path);
            this.cancel();
        }

        public void run() {
            if (this.tries < this.numTries) {
                ++this.tries;
                if (SocketCollectionManager.this.manager.getLiveness(this.path.getLastHop()) == 1) {
                    SocketCollectionManager.this.manager.markSuspected(this.path);
                }
                SocketCollectionManager.this.pingManager.ping(this.path, this);
                int absPD = (int)((double)SocketCollectionManager.this.PING_DELAY * Math.pow(2.0, this.tries - 1));
                int jitterAmt = (int)((float)absPD * SocketCollectionManager.this.PING_JITTER);
                int scheduledTime = absPD - jitterAmt + SocketCollectionManager.this.random.nextInt(jitterAmt * 2);
                SocketCollectionManager.this.pastryNode.getTimer().schedule(this, scheduledTime);
            } else {
                if (SocketCollectionManager.this.logger.level <= 500) {
                    SocketCollectionManager.this.logger.log("DeadChecker(" + this.path + ") expired - marking as dead.");
                }
                SocketCollectionManager.this.manager.markDead(this.path);
                this.cancel();
            }
        }

        public boolean cancel() {
            SocketCollectionManager.this.pingManager.removePingResponseListener(this.path, this);
            return super.cancel();
        }

        public String toString() {
            return "DeadChecker(" + this.path + " #" + System.identityHashCode(this) + "):" + this.tries;
        }
    }

    protected class MessageRetry
    extends TimerTask {
        protected int tries = 0;
        protected long timeout;
        protected SourceRoute route;
        protected SocketBuffer message;
        protected SocketSourceRouteManager.AddressManager am;

        public MessageRetry(SourceRoute route, SocketBuffer message, SocketSourceRouteManager.AddressManager am) {
            this.timeout = SocketCollectionManager.this.BACKOFF_INITIAL;
            this.am = am;
            this.message = message;
            this.route = route;
            this.timeout = (long)((double)this.timeout * (0.8 + 0.4 * SocketCollectionManager.this.random.nextDouble()));
            SocketCollectionManager.this.pastryNode.getTimer().schedule(this, this.timeout);
        }

        public void run() {
            if (!SocketCollectionManager.this.sendInternal(this.route, this.message)) {
                if (SocketCollectionManager.this.logger.level <= 500) {
                    SocketCollectionManager.this.logger.log("BACKOFF: Could not send message " + this.message + " after " + this.tries + " timeout " + this.timeout + " retries - retrying.");
                }
                if (this.tries < SocketCollectionManager.this.BACKOFF_LIMIT) {
                    ++this.tries;
                    this.timeout = (long)((double)(2L * this.timeout) * (0.8 + 0.4 * SocketCollectionManager.this.random.nextDouble()));
                    SocketCollectionManager.this.pastryNode.getTimer().schedule(this, this.timeout);
                } else if (SocketCollectionManager.this.logger.level <= 900) {
                    SocketCollectionManager.this.logger.log("WARNING: Could not send message " + this.message + " after " + this.tries + " retries.  Dropping on the floor.");
                }
            } else if (SocketCollectionManager.this.logger.level <= 500) {
                SocketCollectionManager.this.logger.log("BACKOFF: Was able to send message " + this.message + " after " + this.tries + " timeout " + this.timeout + " retries.");
            }
        }
    }
}

