/*
 * Decompiled with CFR 0.152.
 */
package rice.pastry.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.WeakHashMap;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.environment.params.Parameters;
import rice.environment.time.TimeSource;
import rice.p2p.commonapi.Message;
import rice.p2p.commonapi.NodeHandle;
import rice.p2p.commonapi.rawserialization.InputBuffer;
import rice.p2p.commonapi.rawserialization.MessageDeserializer;
import rice.p2p.util.MathUtils;
import rice.p2p.util.TimerWeakHashMap;
import rice.pastry.messaging.PRawMessage;
import rice.pastry.socket.EpochInetSocketAddress;
import rice.pastry.socket.PingResponseListener;
import rice.pastry.socket.SocketBuffer;
import rice.pastry.socket.SocketChannelRepeater;
import rice.pastry.socket.SocketCollectionManager;
import rice.pastry.socket.SocketPastryNode;
import rice.pastry.socket.SocketSourceRouteManager;
import rice.pastry.socket.SourceRoute;
import rice.pastry.socket.messaging.DatagramMessage;
import rice.pastry.socket.messaging.IPAddressRequestMessage;
import rice.pastry.socket.messaging.IPAddressResponseMessage;
import rice.pastry.socket.messaging.PingMessage;
import rice.pastry.socket.messaging.PingResponseMessage;
import rice.pastry.socket.messaging.WrongEpochMessage;
import rice.selector.SelectionKeyHandler;

public class PingManager
extends SelectionKeyHandler {
    public final int DATAGRAM_RECEIVE_BUFFER_SIZE;
    public final int DATAGRAM_SEND_BUFFER_SIZE;
    protected WeakHashMap pingListeners;
    protected WeakHashMap lastPingTime;
    protected ArrayList pendingMsgs;
    private ByteBuffer buffer;
    private DatagramChannel channel;
    private SelectionKey key;
    private SocketSourceRouteManager manager;
    private EpochInetSocketAddress localAddress;
    private SocketPastryNode spn;
    private Logger logger;
    private TimeSource timeSource;
    private Environment environment;
    private boolean testSourceRouting;
    MessageDeserializer deserializer;
    long lastTimePrinted = 0L;
    public static final int PING_THROTTLE = 500;
    public static int HEADER_SIZE = SocketCollectionManager.PASTRY_MAGIC_NUMBER.length;

    public PingManager(SocketPastryNode spn, SocketSourceRouteManager manager, EpochInetSocketAddress bindAddress, EpochInetSocketAddress proxyAddress) throws IOException {
        this.spn = spn;
        this.environment = spn.getEnvironment();
        this.logger = this.environment.getLogManager().getLogger(PingManager.class, null);
        this.deserializer = new PMDeserializer(this.logger);
        this.timeSource = this.environment.getTimeSource();
        Parameters p = this.environment.getParameters();
        this.pingListeners = new TimerWeakHashMap(this.environment.getSelectorManager().getTimer(), 60000);
        this.lastPingTime = new TimerWeakHashMap(this.environment.getSelectorManager().getTimer(), 5000);
        this.manager = manager;
        this.pendingMsgs = new ArrayList();
        this.localAddress = proxyAddress;
        this.testSourceRouting = p.getBoolean("pastry_socket_pingmanager_testSourceRouting");
        this.DATAGRAM_RECEIVE_BUFFER_SIZE = p.getInt("pastry_socket_pingmanager_datagram_receive_buffer_size");
        this.DATAGRAM_SEND_BUFFER_SIZE = p.getInt("pastry_socket_pingmanager_datagram_send_buffer_size");
        this.buffer = ByteBuffer.allocateDirect(this.DATAGRAM_SEND_BUFFER_SIZE);
        this.channel = DatagramChannel.open();
        this.channel.configureBlocking(false);
        this.channel.socket().setReuseAddress(true);
        this.channel.socket().bind(bindAddress.getAddress());
        this.channel.socket().setSendBufferSize(this.DATAGRAM_SEND_BUFFER_SIZE);
        this.channel.socket().setReceiveBufferSize(this.DATAGRAM_RECEIVE_BUFFER_SIZE);
        this.key = this.environment.getSelectorManager().register(this.channel, this, 0);
        this.key.interestOps(1);
        if (this.logger.level <= 800) {
            this.logger.log("PingManager binding to " + bindAddress);
        }
    }

    protected void ping(SourceRoute path, PingResponseListener prl) {
        Long time;
        if (prl == null && path.getLastHop().equals(this.localAddress)) {
            return;
        }
        long curTime = this.timeSource.currentTimeMillis();
        if (prl == null && (time = (Long)this.lastPingTime.get(path)) != null && time + 500L > curTime) {
            if (this.logger.level <= 500) {
                this.logger.log("(PM) Suppressing ping via path " + path + " local " + this.localAddress);
            }
            return;
        }
        if (this.logger.level <= 500) {
            this.logger.log("(PM) Sending Ping[" + curTime + "] via path " + path + " local " + this.localAddress + " for " + prl);
        }
        this.lastPingTime.put(path, new Long(curTime));
        this.addPingResponseListener(path, prl);
        this.enqueue(path, new PingMessage(curTime));
    }

    protected void resign() throws IOException {
        if (this.key != null) {
            if (this.key.channel() != null) {
                this.key.channel().close();
            }
            this.key.cancel();
            this.key.attach(null);
        }
    }

    public void stall() {
        this.key.interestOps(this.key.interestOps() & 0xFFFFFFFE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removePingResponseListener(SourceRoute path, PingResponseListener prl) {
        if (prl == null) {
            return;
        }
        WeakHashMap weakHashMap = this.pingListeners;
        synchronized (weakHashMap) {
            ArrayList list = (ArrayList)this.pingListeners.get(path);
            if (list != null) {
                while (list.remove(prl)) {
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addPingResponseListener(SourceRoute path, PingResponseListener prl) {
        if (prl == null) {
            return;
        }
        WeakHashMap weakHashMap = this.pingListeners;
        synchronized (weakHashMap) {
            ArrayList<PingResponseListener> list = (ArrayList<PingResponseListener>)this.pingListeners.get(path);
            if (list == null) {
                list = new ArrayList<PingResponseListener>();
                this.pingListeners.put(path, list);
            }
            list.add(prl);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void notifyPingResponseListeners(SourceRoute path, int proximity, long lastTimePinged) {
        ArrayList list;
        WeakHashMap weakHashMap = this.pingListeners;
        synchronized (weakHashMap) {
            list = (ArrayList)this.pingListeners.remove(path);
        }
        if (list != null) {
            Iterator i = list.iterator();
            while (i.hasNext()) {
                ((PingResponseListener)i.next()).pingResponse(path, proximity, lastTimePinged);
            }
        }
    }

    public void enqueue(SourceRoute path, PRawMessage msg) {
        block3: {
            if (this.logger.level <= 397) {
                this.logger.log("enqueue(" + path + "," + msg + ")");
            }
            try {
                this.enqueue(path, new SocketBuffer(this.localAddress, path, msg));
            }
            catch (IOException e) {
                if (this.logger.level > 1000) break block3;
                this.logger.log("ERROR: Received exceptoin " + e + " while enqueuing ping " + msg);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enqueue(SourceRoute path, SocketBuffer msg) {
        ArrayList arrayList = this.pendingMsgs;
        synchronized (arrayList) {
            this.pendingMsgs.add(new Envelope(path.getFirstHop(), msg));
        }
        if (this.spn != null) {
            this.spn.broadcastSentListeners(msg, path.getLastHop().address, msg.getBuffer().limit(), 1);
        }
        if (this.logger.level <= 397) {
            this.logger.log("COUNT: Sent message " + msg.getType() + " of size " + msg.getBuffer().limit() + " to " + path);
        }
        this.environment.getSelectorManager().modifyKey(this.key);
    }

    public void receiveMessage(SourceRoute sr, DatagramMessage dm, int size, SourceRoute fromPath) throws IOException {
        long start = dm.getStartTime();
        SourceRoute inboundPath = sr.removeLastHop();
        SourceRoute outboundPath = inboundPath.reverse();
        if (this.spn != null) {
            this.spn.broadcastReceivedListeners(dm, outboundPath.getLastHop().address, size, 1);
        }
        if (dm instanceof PingMessage) {
            if (this.logger.level <= 500) {
                this.logger.log("(PM) Sending PingResponse[" + start + "] via path " + outboundPath + " local " + this.localAddress + " sr:" + sr + " ib:" + inboundPath);
            } else if (this.logger.level <= 400) {
                this.logger.log("COUNT: Read message(1) " + dm.getClass() + " of size " + size + " from " + outboundPath);
            }
            this.enqueue(outboundPath, new PingResponseMessage(start));
        } else if (dm instanceof PingResponseMessage) {
            int ping = (int)(this.environment.getTimeSource().currentTimeMillis() - start);
            if (this.logger.level <= 500) {
                this.logger.log("COUNT: Read PingResponse[" + start + "]:RTT=" + ping + " of size " + size + " from " + inboundPath);
            }
            if (ping > 0) {
                this.manager.markAlive(outboundPath);
                this.manager.markProximity(outboundPath, ping);
                this.notifyPingResponseListeners(outboundPath, ping, start);
            } else if (this.logger.level <= 900) {
                this.logger.log("COUNT: Read PingResponse[" + start + "]:RTT=" + ping + "!!! of size " + size + " from " + inboundPath);
            }
        } else if (dm instanceof WrongEpochMessage) {
            WrongEpochMessage wem = (WrongEpochMessage)dm;
            if (this.logger.level <= 395) {
                this.logger.log("COUNT: Read message(3) " + dm.getClass() + " of size " + size + " from " + outboundPath.reverse());
            }
            this.manager.markAlive(outboundPath);
            this.manager.markDead(wem.getIncorrect());
        } else if (dm instanceof IPAddressRequestMessage) {
            if (this.logger.level <= 395) {
                this.logger.log("COUNT: Read message(4) " + dm.getClass() + " of size " + size + " from " + fromPath);
            }
            this.enqueue(fromPath, new IPAddressResponseMessage(fromPath.path[0].address, this.environment.getTimeSource().currentTimeMillis()));
        } else if (this.logger.level <= 900) {
            this.logger.log("ERROR: Received unknown DatagramMessage " + dm);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void read(SelectionKey key) {
        try {
            InetSocketAddress address = null;
            while ((address = (InetSocketAddress)this.channel.receive(this.buffer)) != null) {
                this.buffer.flip();
                if (this.testSourceRouting && address.getPort() % 2 == this.localAddress.getAddress().getPort() % 2) {
                    this.buffer.clear();
                    if (this.logger.level <= 800) {
                        this.logger.log("Dropping packet");
                    }
                    return;
                }
                if (this.buffer.remaining() > 0) {
                    this.readHeader(address);
                    continue;
                }
                if (this.logger.level <= 800) {
                    this.logger.log("(PM) Read from datagram channel, but no bytes were there - no bad, but wierd.");
                }
                break;
            }
        }
        catch (IOException e) {
            if (this.logger.level <= 900) {
                this.logger.logException("ERROR (datagrammanager:read): ", e);
            }
        }
        finally {
            this.buffer.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void write(SelectionKey key) {
        Envelope write = null;
        try {
            try {
                ArrayList arrayList = this.pendingMsgs;
                synchronized (arrayList) {
                    Iterator i = this.pendingMsgs.iterator();
                    while (i.hasNext()) {
                        write = (Envelope)i.next();
                        if (this.logger.level <= 300) {
                            byte[] metadata = new byte[]{write.data.getBuffer().get(HEADER_SIZE + 4), write.data.getBuffer().get(HEADER_SIZE + 5)};
                            byte[] route = new byte[SocketChannelRepeater.HEADER_BUFFER_SIZE * metadata[1]];
                            System.arraycopy(write.data.getBuffer().array(), HEADER_SIZE + 6, route, 0, route.length);
                            this.logger.log("write(" + write.destination + ") (" + metadata[0] + " " + metadata[1] + ") local " + this.localAddress);
                            for (int ii = 0; ii < metadata[1]; ++ii) {
                                this.logger.log("  " + SocketChannelRepeater.decodeHeader(route, ii));
                            }
                        }
                        if (write.data.getBuffer().get(HEADER_SIZE) != 0) {
                            throw new IOException("Attempting to send Invalid version");
                        }
                        try {
                            if (this.channel.send(write.data.getBuffer(), write.destination.getAddress()) != write.data.getBuffer().limit()) break;
                            i.remove();
                        }
                        catch (IOException e) {
                            i.remove();
                            throw e;
                        }
                    }
                }
            }
            catch (IOException e) {
                if (this.logger.level <= 900) {
                    long now = this.timeSource.currentTimeMillis();
                    if (this.lastTimePrinted + 1000L > now) {
                        Object var10_12 = null;
                        if (!this.pendingMsgs.isEmpty()) return;
                        key.interestOps(key.interestOps() & 0xFFFFFFFB);
                        return;
                    }
                    this.lastTimePrinted = now;
                    this.logger.logException("ERROR (datagrammanager:write) to " + (write == null ? null : write.destination.getAddress()), e);
                }
                Object var10_13 = null;
                if (!this.pendingMsgs.isEmpty()) return;
                key.interestOps(key.interestOps() & 0xFFFFFFFB);
                return;
            }
            Object var10_11 = null;
            if (!this.pendingMsgs.isEmpty()) return;
            key.interestOps(key.interestOps() & 0xFFFFFFFB);
            return;
        }
        catch (Throwable throwable) {
            Object var10_14 = null;
            if (!this.pendingMsgs.isEmpty()) throw throwable;
            key.interestOps(key.interestOps() & 0xFFFFFFFB);
            throw throwable;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void modifyKey(SelectionKey key) {
        ArrayList arrayList = this.pendingMsgs;
        synchronized (arrayList) {
            if (!this.pendingMsgs.isEmpty()) {
                key.interestOps(key.interestOps() | 4);
            }
        }
    }

    public SourceRoute decodeHeader(byte[] header) throws IOException {
        EpochInetSocketAddress[] route = new EpochInetSocketAddress[header.length / SocketChannelRepeater.HEADER_BUFFER_SIZE];
        for (int i = 0; i < route.length; ++i) {
            route[i] = SocketChannelRepeater.decodeHeader(header, i);
        }
        return SourceRoute.build(route);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void readHeader(InetSocketAddress address) throws IOException {
        byte[] header = new byte[HEADER_SIZE];
        this.buffer.get(header, 0, HEADER_SIZE);
        if (!Arrays.equals(header, SocketCollectionManager.PASTRY_MAGIC_NUMBER)) {
            throw new IOException("Not a pastry message from " + address + ":" + header[0] + "," + header[1] + "," + header[2] + "," + header[3]);
        }
        this.buffer.get(header, 0, HEADER_SIZE);
        int version = MathUtils.byteArrayToInt(header);
        if (version != 0) {
            throw new IOException("Unknown Version:" + version);
        }
        byte[] metadata = new byte[2];
        this.buffer.get(metadata);
        byte[] route = new byte[SocketChannelRepeater.HEADER_BUFFER_SIZE * metadata[1]];
        this.buffer.get(route);
        if (this.logger.level <= 300) {
            this.logger.log("readHeader(" + address + ") (" + metadata[0] + " " + metadata[1] + ") local " + this.localAddress);
            for (int i = 0; i < metadata[1]; ++i) {
                this.logger.log("  " + SocketChannelRepeater.decodeHeader(route, i));
            }
        }
        EpochInetSocketAddress eisa = SocketChannelRepeater.decodeHeader(route, metadata[0]);
        SourceRoute fromPath = SourceRoute.build(new EpochInetSocketAddress(address));
        if (eisa.equals(this.localAddress) || eisa.getAddress().equals(this.localAddress.getAddress()) && eisa.getEpoch() == -1L) {
            if (metadata[0] + 1 == metadata[1]) {
                byte[] array = new byte[this.buffer.remaining()];
                this.buffer.get(array);
                this.buffer.clear();
                SourceRoute inbound = this.decodeHeader(route);
                SocketBuffer delivery = new SocketBuffer(array, this.spn);
                this.receiveMessage(inbound, (DatagramMessage)delivery.deserialize(this.deserializer), array.length, fromPath);
            } else {
                EpochInetSocketAddress next = SocketChannelRepeater.decodeHeader(route, metadata[0] + 1);
                this.buffer.position(0);
                byte[] packet = new byte[this.buffer.remaining()];
                this.buffer.get(packet);
                int n = HEADER_SIZE + 4;
                packet[n] = (byte)(packet[n] + 1);
                if (this.logger.level <= 500) {
                    this.logger.log("Forwarding (" + metadata[0] + " " + metadata[1] + ") from " + address + " to " + next + " at " + this.localAddress);
                }
                if (this.spn != null) {
                    this.spn.broadcastReceivedListeners(packet, address, packet.length, 17);
                    this.spn.broadcastSentListeners(packet, next.address, packet.length, 17);
                }
                ArrayList delivery = this.pendingMsgs;
                synchronized (delivery) {
                    this.pendingMsgs.add(new Envelope(next, new SocketBuffer(packet)));
                }
                this.environment.getSelectorManager().modifyKey(this.key);
            }
        } else if (eisa.getAddress().equals(this.localAddress.getAddress())) {
            SourceRoute back = SourceRoute.build(new EpochInetSocketAddress[0]);
            SourceRoute outbound = SourceRoute.build(new EpochInetSocketAddress[0]);
            for (int i = 0; i < metadata[0]; ++i) {
                back = back.append(SocketChannelRepeater.decodeHeader(route, i));
                if (i <= 0) continue;
                outbound = outbound.append(SocketChannelRepeater.decodeHeader(route, i));
            }
            outbound = outbound.append(this.localAddress);
            WrongEpochMessage wem = new WrongEpochMessage(eisa, this.localAddress, this.environment.getTimeSource().currentTimeMillis());
            if (this.spn != null) {
                this.spn.broadcastReceivedListeners(null, address, this.buffer.remaining(), 1);
            }
            this.enqueue(back.reverse(), wem);
        } else {
            if (this.logger.level <= 900) {
                this.logger.log("WARNING: Received packet destined for EISA (" + metadata[0] + " " + metadata[1] + ") " + eisa + " but the local address is " + this.localAddress + " - dropping silently.");
            }
            throw new IOException("Received packet destined for EISA (" + metadata[0] + " " + metadata[1] + ") " + eisa + " but the local address is " + this.localAddress + " - dropping silently.");
        }
    }

    static class PMDeserializer
    implements MessageDeserializer {
        Logger logger;

        public PMDeserializer(Logger logger) {
            this.logger = logger;
        }

        public Message deserialize(InputBuffer buf, short type, byte priority, NodeHandle sender) throws IOException {
            switch (type) {
                case 2: {
                    return new IPAddressRequestMessage(buf);
                }
                case 3: {
                    return new IPAddressResponseMessage(buf);
                }
                case 8: {
                    return new PingMessage(buf);
                }
                case 9: {
                    return new PingResponseMessage(buf);
                }
                case 14: {
                    return new WrongEpochMessage(buf);
                }
            }
            if (this.logger.level <= 1000) {
                this.logger.logException("PM SERIOUS ERROR: Received unknown message address: 0 type:" + type, new Exception("stack trace"));
            }
            return null;
        }
    }

    public class Envelope {
        protected EpochInetSocketAddress destination;
        protected SocketBuffer data;

        public Envelope(EpochInetSocketAddress destination, SocketBuffer data) {
            this.destination = destination;
            this.data = data;
            if (((PingManager)PingManager.this).logger.level <= 300) {
                try {
                    byte[] metadata = new byte[]{data.getBuffer().get(HEADER_SIZE + 4), data.getBuffer().get(HEADER_SIZE + 5)};
                    byte[] route = new byte[SocketChannelRepeater.HEADER_BUFFER_SIZE * metadata[1]];
                    System.arraycopy(data.getBuffer().array(), HEADER_SIZE + 6, route, 0, route.length);
                    PingManager.this.logger.log("enqueue(" + destination + ") (" + metadata[0] + " " + metadata[1] + ") local " + PingManager.this.localAddress);
                    for (int ii = 0; ii < metadata[1]; ++ii) {
                        PingManager.this.logger.log("  " + SocketChannelRepeater.decodeHeader(route, ii));
                    }
                }
                catch (IOException ioe) {
                    PingManager.this.logger.logException("", ioe);
                }
            }
            if (data.getBuffer().get(HEADER_SIZE) != 0) {
                throw new RuntimeException("Attempting to send Invalid version");
            }
        }
    }
}

