/*
 * Decompiled with CFR 0.152.
 */
package rice.pastry.socket;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.pastry.commonapi.PastryEndpointMessage;
import rice.pastry.routing.RouteMessage;
import rice.pastry.socket.SocketBuffer;
import rice.pastry.socket.SocketPastryNode;
import rice.pastry.socket.SourceRoute;

public class SocketChannelReader {
    private SocketPastryNode spn;
    private int objectSize = -1;
    private ByteBuffer buffer;
    private ByteBuffer sizeBuffer;
    protected SourceRoute path;
    protected Environment environment;
    protected Logger logger;
    byte[] sizeArray = new byte[4];
    ByteArrayInputStream bais = new ByteArrayInputStream(this.sizeArray);
    DataInputStream dis = new DataInputStream(this.bais);

    public SocketChannelReader(SocketPastryNode spn, SourceRoute path) {
        this(spn.getEnvironment(), path);
        this.spn = spn;
    }

    public SocketChannelReader(Environment env, SourceRoute path) {
        this.environment = env;
        this.path = path;
        this.logger = env.getLogManager().getLogger(SocketChannelReader.class, null);
        this.sizeBuffer = ByteBuffer.allocateDirect(4);
    }

    protected void setPath(SourceRoute path) {
        this.path = path;
    }

    public SocketBuffer read(SocketChannel sc) throws IOException {
        int read;
        if (this.objectSize == -1) {
            read = sc.read(this.sizeBuffer);
            if (read == -1) {
                throw new IOException("Error on read - the channel has been closed.");
            }
            if (this.sizeBuffer.remaining() == 0) {
                this.initializeObjectBuffer(sc);
            } else {
                return null;
            }
        }
        if (this.objectSize != -1) {
            read = sc.read(this.buffer);
            if (this.logger.level <= 300) {
                this.logger.log("(R) Read " + read + " bytes of object... " + this.buffer.remaining() + " remaining.");
            }
            if (read == -1) {
                throw new ClosedChannelException();
            }
            if (this.buffer.remaining() == 0) {
                this.buffer.flip();
                byte[] objectArray = new byte[this.objectSize];
                this.buffer.get(objectArray);
                int size = this.objectSize + 8;
                this.reset();
                SocketBuffer obj = new SocketBuffer(objectArray, this.spn);
                if (this.logger.level <= 400) {
                    this.logger.log("(R) Deserialized bytes into object " + obj);
                }
                if (this.spn != null) {
                    this.spn.broadcastReceivedListeners(obj, this.path == null ? (InetSocketAddress)sc.socket().getRemoteSocketAddress() : this.path.getLastHop().address, size, 0);
                }
                this.record(obj, size, this.path);
                return obj;
            }
        }
        return null;
    }

    protected void record(Object obj, int size, SourceRoute path) {
        boolean recorded = false;
        try {
            if (obj instanceof RouteMessage) {
                this.record(((RouteMessage)obj).unwrap(), size, path);
                recorded = true;
            } else if (obj instanceof PastryEndpointMessage) {
                this.record(((PastryEndpointMessage)obj).getMessage(), size, path);
                recorded = true;
            }
        }
        catch (NoClassDefFoundError exc) {
            // empty catch block
        }
        if (!recorded && this.logger.level <= 400) {
            this.logger.log("COUNT: Read message(5) " + obj + " of size " + size + " from " + path);
        }
    }

    public void reset() {
        this.objectSize = -1;
        this.buffer = null;
        this.sizeBuffer.clear();
    }

    private void initializeObjectBuffer(SocketChannel sc) throws IOException {
        this.sizeBuffer.flip();
        this.sizeBuffer.get(this.sizeArray, 0, this.sizeArray.length);
        this.dis.reset();
        this.bais.reset();
        this.objectSize = this.dis.readInt();
        if (this.objectSize <= 0) {
            throw new IOException("Found message of improper number of bytes - " + this.objectSize + " bytes");
        }
        if (this.logger.level <= 400) {
            this.logger.log("(R) Found object of " + this.objectSize + " bytes from " + sc.socket().getRemoteSocketAddress());
        }
        try {
            this.buffer = ByteBuffer.allocateDirect(this.objectSize);
        }
        catch (OutOfMemoryError oome) {
            if (this.logger.level <= 1000) {
                this.logger.logException("SCR ran out of memory allocating an object of size " + this.objectSize + " from " + this.path, oome);
            }
            throw oome;
        }
    }
}

