/*
 * Decompiled with CFR 0.152.
 */
package rice.pastry.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import rice.p2p.commonapi.NodeHandle;
import rice.p2p.commonapi.rawserialization.InputBuffer;
import rice.p2p.commonapi.rawserialization.MessageDeserializer;
import rice.pastry.NodeHandleFactory;
import rice.pastry.messaging.Message;
import rice.pastry.messaging.PJavaSerializedMessage;
import rice.pastry.messaging.PRawMessage;
import rice.pastry.socket.SocketBuffer;
import rice.pastry.socket.SocketChannelReader;
import rice.pastry.socket.SocketChannelWriter;
import rice.pastry.socket.SocketCollectionManager;
import rice.pastry.socket.SourceRoute;
import rice.pastry.socket.messaging.LeafSetResponseMessage;
import rice.pastry.socket.messaging.NodeIdResponseMessage;
import rice.pastry.socket.messaging.RouteRowResponseMessage;
import rice.pastry.socket.messaging.RoutesResponseMessage;
import rice.selector.SelectionKeyHandler;
import rice.selector.TimerTask;

class SocketManager
extends SelectionKeyHandler {
    private final SocketCollectionManager manager;
    protected SelectionKey key;
    protected SocketChannel channel;
    protected SocketChannelReader reader;
    protected SocketChannelWriter writer;
    protected TimerTask timer;
    protected SourceRoute path;
    protected boolean bootstrap;
    MessageDeserializer deserializer = new SMDeserializer();

    public SocketManager(SocketCollectionManager manager, SelectionKey key) throws IOException {
        this.manager = manager;
        this.reader = new SocketChannelReader(manager.pastryNode, null);
        this.writer = new SocketChannelWriter(manager.pastryNode, null);
        this.bootstrap = false;
        this.acceptConnection(key);
    }

    public SocketManager(SocketCollectionManager manager, SourceRoute path, boolean bootstrap) throws IOException {
        this.manager = manager;
        this.reader = new SocketChannelReader(manager.pastryNode, path.reverse());
        this.writer = new SocketChannelWriter(manager.pastryNode, path);
        this.bootstrap = bootstrap;
        if (manager.logger.level <= 500) {
            manager.logger.log("Opening connection with path " + path);
        }
        this.createConnection(path);
        this.send(new SocketBuffer(path, 0));
        if (!bootstrap) {
            this.send(new SocketBuffer(path.reverse(manager.localAddress)));
        }
    }

    protected void setTimer() {
        if (this.timer == null) {
            this.timer = new TimerTask(){

                public void run() {
                    if (((SocketManager)SocketManager.this).manager.logger.level <= 500) {
                        ((SocketManager)SocketManager.this).manager.logger.log("WRITE_TIMER::Timer expired, checking liveness...");
                    }
                    ((SocketManager)SocketManager.this).manager.manager.checkLiveness(SocketManager.this.path.getLastHop());
                }
            };
            this.manager.pastryNode.getEnvironment().getSelectorManager().schedule(this.timer, this.manager.WRITE_WAIT_TIME);
        }
    }

    public String toString() {
        return "SM " + this.channel;
    }

    public void shutdown() {
        try {
            if (this.manager.logger.level <= 500) {
                this.manager.logger.log("Shutting down output on connection with path " + this.path);
            }
            if (this.channel != null) {
                this.channel.socket().shutdownOutput();
            } else if (this.manager.logger.level <= 1000) {
                this.manager.logger.log("ERROR: Unable to shutdown output on channel; channel is null!");
            }
            this.manager.socketClosed(this.path, this);
            this.manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(this.key);
        }
        catch (IOException e) {
            if (this.manager.logger.level <= 1000) {
                this.manager.logger.log("ERROR: Received exception " + e + " while shutting down output.");
            }
            this.close();
        }
    }

    public void close() {
        block11: {
            try {
                if (this.manager.logger.level <= 500) {
                    if (this.path != null) {
                        this.manager.logger.log("Closing connection with path " + this.path);
                    } else {
                        this.manager.logger.log("Closing connection to " + (InetSocketAddress)this.channel.socket().getRemoteSocketAddress());
                    }
                }
                if (this.manager.pastryNode != null) {
                    this.manager.pastryNode.broadcastChannelClosed((InetSocketAddress)this.channel.socket().getRemoteSocketAddress());
                }
                this.clearTimer();
                if (this.key != null) {
                    if (this.manager.logger.level <= 900 && !this.manager.pastryNode.getEnvironment().getSelectorManager().isSelectorThread()) {
                        this.manager.logger.logException("WARNING: cancelling key:" + this.key + " on the wrong thread.", new Exception("Stack Trace"));
                    }
                    this.key.cancel();
                    this.key.attach(null);
                    this.key = null;
                }
                this.manager.unIdentifiedSM.remove(this);
                if (this.channel != null) {
                    this.channel.close();
                }
                if (this.path != null) {
                    this.manager.socketClosed(this.path, this);
                    Iterator i = this.writer.getQueue().iterator();
                    this.writer.reset();
                    while (i.hasNext()) {
                        Object o = i.next();
                        if (!(o instanceof Message) || this.manager.manager == null) continue;
                        this.manager.manager.reroute(this.path.getLastHop(), (SocketBuffer)o);
                    }
                    this.path = null;
                }
            }
            catch (IOException e) {
                if (this.manager.logger.level > 1000) break block11;
                this.manager.logger.log("ERROR: Recevied exception " + e + " while closing socket!");
            }
        }
    }

    public void send(Message msg) throws IOException {
        PRawMessage rm = msg instanceof PRawMessage ? (PRawMessage)msg : new PJavaSerializedMessage(msg);
        SocketBuffer buffer = new SocketBuffer(this.manager.defaultDeserializer, (NodeHandleFactory)this.manager.pastryNode);
        buffer.serialize(rm, true);
        this.send(buffer);
    }

    public void send(SocketBuffer message) {
        this.writer.enqueue(message);
        if (this.key != null) {
            this.manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(this.key);
        }
    }

    public synchronized void modifyKey(SelectionKey key) {
        if (this.channel.socket().isOutputShutdown()) {
            key.interestOps(key.interestOps() & 0xFFFFFFFB);
            this.clearTimer();
        } else if (!this.writer.isEmpty() && (key.interestOps() & 4) == 0) {
            key.interestOps(key.interestOps() | 4);
            this.setTimer();
        }
    }

    public void connect(SelectionKey key) {
        try {
            if (this.channel.finishConnect()) {
                key.interestOps(key.interestOps() & 0xFFFFFFF7);
            }
            this.manager.manager.markAlive(this.path);
            if (this.manager.logger.level <= 500) {
                this.manager.logger.log("(SM) Found connectable channel - completed connection");
            }
        }
        catch (Exception e) {
            if (this.manager.logger.level <= 500) {
                this.manager.logger.logException("(SM) Unable to connect to path " + this.path + " (" + e + ") marking as dead.", e);
            }
            this.manager.manager.markDead(this.path);
            this.close();
        }
    }

    public void read(SelectionKey key) {
        try {
            SocketBuffer o = this.reader.read(this.channel);
            if (o != null) {
                if (this.manager.logger.level <= 500) {
                    this.manager.logger.log("(SM) Read message " + o + " from socket.");
                }
                this.receive(o);
            }
        }
        catch (IOException e) {
            if (this.manager.logger.level <= 500) {
                this.manager.logger.log("(SM) WARNING " + e + " reading - cancelling.");
            }
            if (this.path != null && !((SocketChannel)key.channel()).socket().isOutputShutdown()) {
                this.manager.checkLiveness(this.path);
            }
            this.close();
        }
    }

    public synchronized void write(SelectionKey key) {
        try {
            this.clearTimer();
            if (this.writer.write(this.channel)) {
                key.interestOps(key.interestOps() & 0xFFFFFFFB);
                if (this.bootstrap) {
                    this.close();
                }
            } else {
                this.setTimer();
            }
        }
        catch (IOException e) {
            if (this.manager.logger.level <= 900) {
                this.manager.logger.log("(SM) ERROR " + e + " writing - cancelling.");
            }
            this.close();
        }
    }

    protected void acceptConnection(SelectionKey key) throws IOException {
        this.channel = (SocketChannel)key.channel();
        this.key = this.manager.pastryNode.getEnvironment().getSelectorManager().register(key.channel(), this, 0);
        this.key.interestOps(1);
        if (this.manager.logger.level <= 500) {
            this.manager.logger.log("(SM) Accepted connection from " + this.channel.socket().getRemoteSocketAddress());
        }
    }

    protected void createConnection(SourceRoute path) throws IOException {
        this.path = path;
        this.channel = SocketChannel.open();
        this.channel.socket().setSendBufferSize(this.manager.SOCKET_BUFFER_SIZE);
        this.channel.socket().setReceiveBufferSize(this.manager.SOCKET_BUFFER_SIZE);
        this.channel.configureBlocking(false);
        this.key = this.manager.pastryNode.getEnvironment().getSelectorManager().register(this.channel, this, 0);
        if (this.manager.logger.level <= 500) {
            this.manager.logger.log("(SM) Initiating socket connection to path " + path);
        }
        this.manager.pastryNode.broadcastChannelOpened(path.getFirstHop().getAddress(), 0);
        if (this.channel.connect(path.getFirstHop().getAddress())) {
            this.key.interestOps(1);
        } else {
            this.key.interestOps(9);
        }
    }

    protected void receive(SocketBuffer delivery) {
        if (delivery.getAddress() == 0) {
            try {
                delivery.deserialize(this.deserializer);
            }
            catch (IOException ioe) {
                if (this.manager.logger.level <= 1000) {
                    this.manager.logger.logException("Internal error while deserializing.", ioe);
                }
            }
        } else {
            long start = this.manager.pastryNode.getEnvironment().getTimeSource().currentTimeMillis();
            this.manager.pastryNode.receiveMessage(delivery);
            if (this.manager.logger.level <= 400) {
                this.manager.logger.log("ST: " + (this.manager.pastryNode.getEnvironment().getTimeSource().currentTimeMillis() - start) + " deliver of " + delivery);
            }
            return;
        }
    }

    protected void clearTimer() {
        if (this.timer != null) {
            this.timer.cancel();
        }
        this.timer = null;
    }

    class SMDeserializer
    implements MessageDeserializer {
        SMDeserializer() {
        }

        public rice.p2p.commonapi.Message deserialize(InputBuffer buf, short type, byte priority, NodeHandle sender) throws IOException {
            switch (type) {
                case 1: {
                    SourceRoute tempPath = SourceRoute.build(buf);
                    if (SocketManager.this.path == null) {
                        SocketManager.this.path = tempPath;
                        SocketManager.this.manager.socketOpened(SocketManager.this.path, SocketManager.this);
                        ((SocketManager)SocketManager.this).manager.manager.markAlive(SocketManager.this.path);
                        SocketManager.this.writer.setPath(SocketManager.this.path);
                        SocketManager.this.reader.setPath(SocketManager.this.path.reverse());
                        if (((SocketManager)SocketManager.this).manager.logger.level <= 500) {
                            ((SocketManager)SocketManager.this).manager.logger.log("Read open connection with path " + SocketManager.this.path);
                        }
                    } else if (((SocketManager)SocketManager.this).manager.logger.level <= 1000) {
                        ((SocketManager)SocketManager.this).manager.logger.log("SERIOUS ERROR: Received duplicate path assignments: " + SocketManager.this.path + " and " + tempPath);
                    }
                    return null;
                }
                case 6: {
                    byte version = buf.readByte();
                    switch (version) {
                        case 0: {
                            SocketManager.this.send(new NodeIdResponseMessage(((SocketManager)SocketManager.this).manager.pastryNode.getNodeId(), ((SocketManager)SocketManager.this).manager.localAddress.getEpoch()));
                            break;
                        }
                        default: {
                            throw new IOException("Unknown Version: " + version);
                        }
                    }
                    return null;
                }
                case 4: {
                    byte version = buf.readByte();
                    switch (version) {
                        case 0: {
                            SocketManager.this.send(new LeafSetResponseMessage(((SocketManager)SocketManager.this).manager.pastryNode.getLeafSet()));
                            break;
                        }
                        default: {
                            throw new IOException("Unknown Version: " + version);
                        }
                    }
                    return null;
                }
                case 12: {
                    byte version = buf.readByte();
                    switch (version) {
                        case 0: {
                            SocketManager.this.send(new RoutesResponseMessage(((SocketManager)SocketManager.this).manager.manager.getBest().values().toArray(new SourceRoute[0])));
                            break;
                        }
                        default: {
                            throw new IOException("Unknown Version: " + version);
                        }
                    }
                    return null;
                }
                case 10: {
                    byte version = buf.readByte();
                    switch (version) {
                        case 0: {
                            SocketManager.this.send(new RouteRowResponseMessage(((SocketManager)SocketManager.this).manager.pastryNode.getRoutingTable().getRow(buf.readInt())));
                            break;
                        }
                        default: {
                            throw new IOException("Unknown Version: " + version);
                        }
                    }
                    return null;
                }
            }
            if (((SocketManager)SocketManager.this).manager.logger.level <= 1000) {
                ((SocketManager)SocketManager.this).manager.logger.log("SERIOUS ERROR: Received unknown message address: 0type:" + type);
            }
            return null;
        }
    }
}

