/*
 * Decompiled with CFR 0.152.
 */
package org.mpisws.p2p.transport.sourceroute;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.mpisws.p2p.transport.ErrorHandler;
import org.mpisws.p2p.transport.MessageCallback;
import org.mpisws.p2p.transport.MessageRequestHandle;
import org.mpisws.p2p.transport.P2PSocket;
import org.mpisws.p2p.transport.P2PSocketReceiver;
import org.mpisws.p2p.transport.SocketCallback;
import org.mpisws.p2p.transport.SocketRequestHandle;
import org.mpisws.p2p.transport.TransportLayer;
import org.mpisws.p2p.transport.TransportLayerCallback;
import org.mpisws.p2p.transport.sourceroute.DefaultForwardSourceRouteStrategy;
import org.mpisws.p2p.transport.sourceroute.Forwarder;
import org.mpisws.p2p.transport.sourceroute.SourceRoute;
import org.mpisws.p2p.transport.sourceroute.SourceRouteFactory;
import org.mpisws.p2p.transport.sourceroute.SourceRouteForwardStrategy;
import org.mpisws.p2p.transport.sourceroute.SourceRouteTap;
import org.mpisws.p2p.transport.sourceroute.SourceRouteTransportLayer;
import org.mpisws.p2p.transport.util.DefaultErrorHandler;
import org.mpisws.p2p.transport.util.InsufficientBytesException;
import org.mpisws.p2p.transport.util.MessageRequestHandleImpl;
import org.mpisws.p2p.transport.util.SocketInputBuffer;
import org.mpisws.p2p.transport.util.SocketRequestHandleImpl;
import org.mpisws.p2p.transport.util.SocketWrapperSocket;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.p2p.commonapi.Cancellable;
import rice.p2p.util.rawserialization.SimpleInputBuffer;
import rice.p2p.util.rawserialization.SimpleOutputBuffer;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class SourceRouteTransportLayerImpl<Identifier>
implements SourceRouteTransportLayer<Identifier>,
TransportLayerCallback<Identifier, ByteBuffer> {
    int MAX_NUM_HOPS;
    TransportLayerCallback<SourceRoute<Identifier>, ByteBuffer> callback;
    ErrorHandler<SourceRoute<Identifier>> errorHandler;
    TransportLayer<Identifier, ByteBuffer> etl;
    Environment environment;
    Logger logger;
    SourceRoute<Identifier> localIdentifier;
    Collection<SourceRouteTap> taps;
    SourceRouteFactory<Identifier> srFactory;
    SourceRouteForwardStrategy<Identifier> forwardSourceRouteStrategy;

    public SourceRouteTransportLayerImpl(SourceRouteFactory<Identifier> srFactory, TransportLayer<Identifier, ByteBuffer> etl, SourceRouteForwardStrategy<Identifier> fSRs, Environment env, ErrorHandler<SourceRoute<Identifier>> errorHandler) {
        this.etl = etl;
        this.environment = env;
        this.logger = env.getLogManager().getLogger(SourceRouteTransportLayerImpl.class, null);
        this.srFactory = srFactory;
        this.errorHandler = errorHandler;
        this.localIdentifier = this.srFactory.getSourceRoute(etl.getLocalIdentifier());
        this.taps = new ArrayList<SourceRouteTap>();
        this.MAX_NUM_HOPS = env.getParameters().getInt("transport_sr_max_num_hops");
        this.forwardSourceRouteStrategy = fSRs;
        if (this.forwardSourceRouteStrategy == null) {
            this.forwardSourceRouteStrategy = new DefaultForwardSourceRouteStrategy();
        }
        if (this.errorHandler == null) {
            this.errorHandler = new DefaultErrorHandler<SourceRoute<Identifier>>(this.logger);
        }
        etl.setCallback(this);
    }

    @Override
    public SocketRequestHandle<SourceRoute<Identifier>> openSocket(final SourceRoute<Identifier> i, final SocketCallback<SourceRoute<Identifier>> deliverSocketToMe, Map<String, Object> options) {
        if (deliverSocketToMe == null) {
            throw new IllegalArgumentException("deliverSocketToMe must be non-null!");
        }
        if (this.logger.level <= 750) {
            this.logger.log("openSocket(" + i + "," + deliverSocketToMe + "," + options + ")");
        }
        if (i.getNumHops() <= 1) {
            throw new IllegalArgumentException("SourceRoute must have more than 1 hop! sr:" + i);
        }
        if (!i.getFirstHop().equals(this.etl.getLocalIdentifier())) {
            throw new IllegalArgumentException("SourceRoute must start with self! sr:" + i + " self:" + this.etl.getLocalIdentifier());
        }
        if (this.logger.level <= 500) {
            this.logger.log("openSocket(" + i + ")");
        }
        final SocketRequestHandleImpl<SourceRoute<Identifier>> handle = new SocketRequestHandleImpl<SourceRoute<Identifier>>(i, options, this.logger);
        SimpleOutputBuffer sob = new SimpleOutputBuffer(i.getSerializedLength());
        try {
            i.serialize(sob);
        }
        catch (IOException ioe) {
            deliverSocketToMe.receiveException(handle, ioe);
            return handle;
        }
        final ByteBuffer b = ByteBuffer.wrap(sob.getBytes());
        handle.setSubCancellable(this.etl.openSocket(i.getHop(1), new SocketCallback<Identifier>(){

            @Override
            public void receiveResult(SocketRequestHandle<Identifier> c, final P2PSocket<Identifier> result) {
                if (handle.getSubCancellable() != null && c != handle.getSubCancellable()) {
                    throw new RuntimeException("c != handle.getSubCancellable() (indicates a bug in the code) c:" + c + " sub:" + handle.getSubCancellable());
                }
                handle.setSubCancellable(new Cancellable(){

                    public boolean cancel() {
                        result.close();
                        return true;
                    }
                });
                if (SourceRouteTransportLayerImpl.this.logger.level <= 400) {
                    SourceRouteTransportLayerImpl.this.logger.log("openSocket(" + i + "):receiveResult(" + result + ")");
                }
                result.register(false, true, new P2PSocketReceiver<Identifier>(){

                    @Override
                    public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                        if (canRead || !canWrite) {
                            throw new IOException("Expected to write! " + canRead + "," + canWrite);
                        }
                        socket.write(b);
                        if (b.hasRemaining()) {
                            socket.register(false, true, this);
                        } else {
                            SourceRouteTransportLayerImpl.this.openSocketHelper(deliverSocketToMe, handle, socket, i);
                        }
                    }

                    @Override
                    public void receiveException(P2PSocket<Identifier> socket, Exception e) {
                        deliverSocketToMe.receiveException(handle, e);
                    }
                });
            }

            @Override
            public void receiveException(SocketRequestHandle<Identifier> c, Exception exception) {
                deliverSocketToMe.receiveException(handle, exception);
            }
        }, options));
        return handle;
    }

    protected void openSocketHelper(SocketCallback<SourceRoute<Identifier>> deliverSocketToMe, SocketRequestHandleImpl<SourceRoute<Identifier>> handle, P2PSocket<Identifier> socket, SourceRoute<Identifier> i) {
        deliverSocketToMe.receiveResult(handle, new SocketWrapperSocket<SourceRoute<Identifier>, Identifier>(i, socket, this.logger, socket.getOptions()));
    }

    protected void incomingSocketHelper(P2PSocket<Identifier> socket, SourceRoute<Identifier> sr) throws IOException {
        this.callback.incomingSocket(new SocketWrapperSocket<SourceRoute<Identifier>, Identifier>(sr, socket, this.logger, socket.getOptions()));
    }

    @Override
    public void incomingSocket(final P2PSocket<Identifier> socka) throws IOException {
        if (this.logger.level <= 500) {
            this.logger.log("incomingSocket(" + socka + ")");
        }
        final SocketInputBuffer sib = new SocketInputBuffer(socka, 1024);
        socka.register(true, false, new P2PSocketReceiver<Identifier>(){

            @Override
            public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                if (SourceRouteTransportLayerImpl.this.logger.level <= 400) {
                    SourceRouteTransportLayerImpl.this.logger.log("incomingSocket(" + socket + "):receiveSelectResult()");
                }
                if (canWrite || !canRead) {
                    throw new IOException("Expected to read! " + canRead + "," + canWrite);
                }
                try {
                    final SourceRoute sr = SourceRouteTransportLayerImpl.this.srFactory.build(sib);
                    if (SourceRouteTransportLayerImpl.this.logger.level <= 300) {
                        SourceRouteTransportLayerImpl.this.logger.log("Read socket " + sr);
                    }
                    if (sr.getLastHop().equals(SourceRouteTransportLayerImpl.this.etl.getLocalIdentifier())) {
                        SourceRouteTransportLayerImpl.this.incomingSocketHelper(socket, SourceRouteTransportLayerImpl.this.srFactory.reverse(sr));
                    } else {
                        Object nextHop;
                        int hopNum = sr.getHop(SourceRouteTransportLayerImpl.this.etl.getLocalIdentifier());
                        if (hopNum < 1) {
                            sib.reset();
                            byte[] dump = new byte[sib.size()];
                            sib.read(dump);
                            SourceRouteTransportLayerImpl.this.errorHandler.receivedUnexpectedData(sr, dump, 0, null);
                            socka.close();
                            return;
                        }
                        sib.reset();
                        byte[] srbytes = new byte[sib.size()];
                        sib.read(srbytes);
                        final ByteBuffer b = ByteBuffer.wrap(srbytes);
                        if (SourceRouteTransportLayerImpl.this.logger.level <= 400) {
                            SourceRouteTransportLayerImpl.this.logger.log("I'm hop " + hopNum + " in " + sr);
                        }
                        if (SourceRouteTransportLayerImpl.this.forwardSourceRouteStrategy.forward(nextHop = sr.getHop(hopNum + 1), sr, true, socka.getOptions())) {
                            if (SourceRouteTransportLayerImpl.this.logger.level <= 300) {
                                SourceRouteTransportLayerImpl.this.logger.log("Attempting to open next hop " + nextHop + " <" + hopNum + "> in " + sr);
                            }
                            SourceRouteTransportLayerImpl.this.etl.openSocket(nextHop, new SocketCallback<Identifier>(){

                                @Override
                                public void receiveResult(SocketRequestHandle<Identifier> cancellable, final P2PSocket<Identifier> sockb) {
                                    sockb.register(false, true, new P2PSocketReceiver<Identifier>(){

                                        @Override
                                        public void receiveSelectResult(P2PSocket<Identifier> socket, boolean canRead, boolean canWrite) throws IOException {
                                            if (canRead || !canWrite) {
                                                throw new IOException("Expected to write! " + canRead + "," + canWrite);
                                            }
                                            socket.write(b);
                                            if (b.hasRemaining()) {
                                                socket.register(false, true, this);
                                            } else {
                                                for (SourceRouteTap tap : SourceRouteTransportLayerImpl.this.taps) {
                                                    tap.socketOpened(sr, socka, sockb);
                                                }
                                                new Forwarder(sr, socka, sockb, SourceRouteTransportLayerImpl.this.logger);
                                            }
                                        }

                                        @Override
                                        public void receiveException(P2PSocket<Identifier> socket, Exception e) {
                                            SourceRouteTransportLayerImpl.this.errorHandler.receivedException(sr, e);
                                            socka.close();
                                            sockb.close();
                                        }
                                    });
                                }

                                @Override
                                public void receiveException(SocketRequestHandle<Identifier> s, Exception ex) {
                                    socka.close();
                                }
                            }, null);
                        } else {
                            if (SourceRouteTransportLayerImpl.this.logger.level <= 800) {
                                SourceRouteTransportLayerImpl.this.logger.log("Rejecting opening next hop " + nextHop + " <" + hopNum + "> in " + sr);
                            }
                            socka.close();
                        }
                    }
                }
                catch (InsufficientBytesException ibe) {
                    socket.register(true, false, this);
                }
                catch (IOException e) {
                    if (SourceRouteTransportLayerImpl.this.logger.level <= 800) {
                        SourceRouteTransportLayerImpl.this.errorHandler.receivedException(SourceRouteTransportLayerImpl.this.srFactory.getSourceRoute(SourceRouteTransportLayerImpl.this.etl.getLocalIdentifier(), socket.getIdentifier()), e);
                    }
                    socka.close();
                }
            }

            @Override
            public void receiveException(P2PSocket<Identifier> socket, Exception e) {
                SourceRouteTransportLayerImpl.this.errorHandler.receivedException(SourceRouteTransportLayerImpl.this.srFactory.getSourceRoute(SourceRouteTransportLayerImpl.this.etl.getLocalIdentifier(), socket.getIdentifier()), e);
            }
        });
    }

    @Override
    public MessageRequestHandle<SourceRoute<Identifier>, ByteBuffer> sendMessage(final SourceRoute<Identifier> i, ByteBuffer m, final MessageCallback<SourceRoute<Identifier>, ByteBuffer> deliverAckToMe, Map<String, Object> options) {
        if (this.logger.level <= 500) {
            this.logger.log("sendMessage(" + i + "," + m + ")");
        }
        if (i.getNumHops() <= 1) {
            throw new IllegalArgumentException("SourceRoute must have more than 1 hop! sr:" + i);
        }
        if (!i.getFirstHop().equals(this.etl.getLocalIdentifier())) {
            throw new IllegalArgumentException("SourceRoute must start with self! sr:" + i + " self:" + this.etl.getLocalIdentifier());
        }
        final MessageRequestHandleImpl<SourceRoute<Identifier>, ByteBuffer> handle = new MessageRequestHandleImpl<SourceRoute<Identifier>, ByteBuffer>(i, m, options);
        SimpleOutputBuffer sob = new SimpleOutputBuffer(m.remaining() + i.getSerializedLength());
        try {
            i.serialize(sob);
            sob.write(m.array(), m.position(), m.remaining());
        }
        catch (IOException ioe) {
            if (deliverAckToMe == null) {
                this.errorHandler.receivedException(i, ioe);
            } else {
                deliverAckToMe.sendFailed(handle, ioe);
            }
            return null;
        }
        ByteBuffer buf = ByteBuffer.wrap(sob.getBytes());
        handle.setSubCancellable(this.etl.sendMessage(i.getHop(1), buf, new MessageCallback<Identifier, ByteBuffer>(){

            @Override
            public void ack(MessageRequestHandle<Identifier, ByteBuffer> msg) {
                if (handle.getSubCancellable() != null && msg != handle.getSubCancellable()) {
                    throw new RuntimeException("msg != handle.getSubCancellable() (indicates a bug in the code) msg:" + msg + " sub:" + handle.getSubCancellable());
                }
                if (deliverAckToMe != null) {
                    deliverAckToMe.ack(handle);
                }
            }

            @Override
            public void sendFailed(MessageRequestHandle<Identifier, ByteBuffer> msg, Exception ex) {
                if (handle.getSubCancellable() != null && msg != handle.getSubCancellable()) {
                    throw new RuntimeException("msg != handle.getSubCancellable() (indicates a bug in the code) msg:" + msg + " sub:" + handle.getSubCancellable());
                }
                if (deliverAckToMe == null) {
                    SourceRouteTransportLayerImpl.this.errorHandler.receivedException(i, ex);
                } else {
                    deliverAckToMe.sendFailed(handle, ex);
                }
            }
        }, options));
        return handle;
    }

    @Override
    public void messageReceived(Identifier i, ByteBuffer m, Map<String, Object> options) throws IOException {
        SourceRoute<Identifier> tempSr;
        if (!m.hasRemaining()) {
            this.errorHandler.receivedUnexpectedData(this.srFactory.getSourceRoute(this.etl.getLocalIdentifier(), i), m.array(), m.position(), null);
        }
        int pos = m.position();
        SimpleInputBuffer sib = new SimpleInputBuffer(m.array(), pos);
        try {
            tempSr = this.srFactory.build(sib);
        }
        catch (Exception e) {
            this.errorHandler.receivedException(this.srFactory.getSourceRoute(this.etl.getLocalIdentifier(), i), e);
            return;
        }
        SourceRoute<Identifier> sr = tempSr;
        m.position(m.array().length - sib.bytesRemaining());
        if (sr.getLastHop().equals(this.etl.getLocalIdentifier())) {
            this.callback.messageReceived(this.srFactory.reverse(sr), m, options);
        } else {
            int hopNum = sr.getHop(this.etl.getLocalIdentifier());
            if (hopNum < 1) {
                this.errorHandler.receivedUnexpectedData(sr, m.array(), pos, null);
                return;
            }
            if (this.logger.level <= 400) {
                this.logger.log("I'm hop " + hopNum + " in " + sr);
            }
            for (SourceRouteTap tap : this.taps) {
                byte[] retArr = new byte[m.array().length];
                System.arraycopy(m.array(), 0, retArr, 0, retArr.length);
                ByteBuffer ret = ByteBuffer.wrap(retArr);
                ret.position(m.position());
                tap.receivedMessage(ret, sr);
            }
            m.position(pos);
            this.etl.sendMessage(sr.getHop(hopNum + 1), m, null, null);
        }
    }

    @Override
    public void setCallback(TransportLayerCallback<SourceRoute<Identifier>, ByteBuffer> callback) {
        this.callback = callback;
    }

    @Override
    public void setErrorHandler(ErrorHandler<SourceRoute<Identifier>> errorHandler) {
        this.errorHandler = errorHandler;
    }

    @Override
    public void acceptMessages(boolean b) {
        this.etl.acceptMessages(b);
    }

    @Override
    public void acceptSockets(boolean b) {
        this.etl.acceptSockets(b);
    }

    @Override
    public SourceRoute getLocalIdentifier() {
        return this.localIdentifier;
    }

    @Override
    public void destroy() {
        this.etl.destroy();
    }

    @Override
    public void addSourceRouteTap(SourceRouteTap tap) {
        this.taps.add(tap);
    }

    @Override
    public boolean removeSourceRouteTap(SourceRouteTap tap) {
        return this.taps.remove(tap);
    }
}

