/*
 * Decompiled with CFR 0.152.
 */
package rice.tutorial.splitstream;

import rice.environment.random.RandomSource;
import rice.p2p.commonapi.Application;
import rice.p2p.commonapi.CancellableTask;
import rice.p2p.commonapi.Endpoint;
import rice.p2p.commonapi.Id;
import rice.p2p.commonapi.Message;
import rice.p2p.commonapi.Node;
import rice.p2p.commonapi.NodeHandle;
import rice.p2p.commonapi.RouteMessage;
import rice.p2p.splitstream.Channel;
import rice.p2p.splitstream.ChannelId;
import rice.p2p.splitstream.SplitStream;
import rice.p2p.splitstream.SplitStreamClient;
import rice.p2p.splitstream.SplitStreamImpl;
import rice.p2p.splitstream.Stripe;
import rice.pastry.commonapi.PastryIdFactory;

public class MySplitStreamClient
implements SplitStreamClient,
Application {
    public static final int DATA_LENGTH = 10;
    public static final int NUM_PUBLISHES = 10;
    byte seqNum = 0;
    SplitStream mySplitStream;
    ChannelId myChannelId;
    Channel myChannel;
    Stripe[] myStripes;
    protected RandomSource random;
    CancellableTask publishTask;
    protected Endpoint endpoint;

    public MySplitStreamClient(Node node) {
        this.endpoint = node.buildEndpoint(this, "myinstance");
        this.random = this.endpoint.getEnvironment().getRandomSource();
        this.mySplitStream = new SplitStreamImpl(node, "splitStreamTutorial");
        Id temp = new PastryIdFactory(node.getEnvironment()).buildId("my channel");
        this.myChannelId = new ChannelId(temp);
        this.endpoint.register();
    }

    public void subscribe() {
        this.myChannel = this.mySplitStream.attachChannel(this.myChannelId);
        this.myStripes = this.myChannel.getStripes();
        for (int curStripe = 0; curStripe < this.myStripes.length; ++curStripe) {
            this.myStripes[curStripe].subscribe(this);
        }
    }

    public void startPublishTask() {
        this.publishTask = this.endpoint.scheduleMessage(new PublishContent(), 5000L, 5000L);
    }

    public void deliver(Id id, Message message) {
        if (message instanceof PublishContent) {
            this.publish();
        }
    }

    public void publish() {
        for (int curStripe = 0; curStripe < this.myStripes.length; curStripe = (int)((byte)(curStripe + 1))) {
            byte[] data = new byte[10];
            this.random.nextBytes(data);
            data[0] = this.seqNum;
            data[1] = curStripe;
            System.out.println("Node " + this.endpoint.getLocalNodeHandle() + " publishing " + this.seqNum + " " + this.printData(data));
            this.myStripes[curStripe].publish(data);
        }
        this.seqNum = (byte)(this.seqNum + 1);
        if (this.seqNum >= 10) {
            this.publishTask.cancel();
        }
    }

    private String printData(byte[] data) {
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < data.length - 1; ++i) {
            sb.append(data[i]);
            sb.append(',');
        }
        sb.append(data[data.length - 1]);
        return sb.toString();
    }

    public void deliver(Stripe s, byte[] data) {
        System.out.println(this.endpoint.getId() + " deliver(" + s + "):seq:" + data[0] + " stripe:" + data[1] + " " + this.printData(data) + ")");
    }

    public void joinFailed(Stripe s) {
        System.out.println("joinFailed(" + s + ")");
    }

    public boolean forward(RouteMessage message) {
        throw new RuntimeException("Cant happen.");
    }

    public void update(NodeHandle handle, boolean joined) {
    }

    class PublishContent
    implements Message {
        PublishContent() {
        }

        public int getPriority() {
            return 0;
        }
    }
}

