package rice.p2p.scribe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Observable;
import java.util.Observer;
import java.util.Set;
import java.util.Vector;
import rice.Destructable;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.environment.params.Parameters;
import rice.p2p.commonapi.Application;
import rice.p2p.commonapi.CancellableTask;
import rice.p2p.commonapi.Endpoint;
import rice.p2p.commonapi.Id;
import rice.p2p.commonapi.Message;
import rice.p2p.commonapi.Node;
import rice.p2p.commonapi.NodeHandle;
import rice.p2p.commonapi.NodeHandleSet;
import rice.p2p.commonapi.RouteMessage;
import rice.p2p.commonapi.rawserialization.InputBuffer;
import rice.p2p.commonapi.rawserialization.MessageDeserializer;
import rice.p2p.commonapi.rawserialization.RawMessage;
import rice.p2p.scribe.ScribePolicy;
import rice.p2p.scribe.messaging.AnycastMessage;
import rice.p2p.scribe.messaging.DropMessage;
import rice.p2p.scribe.messaging.MaintenanceMessage;
import rice.p2p.scribe.messaging.PublishMessage;
import rice.p2p.scribe.messaging.PublishRequestMessage;
import rice.p2p.scribe.messaging.SubscribeAckMessage;
import rice.p2p.scribe.messaging.SubscribeFailedMessage;
import rice.p2p.scribe.messaging.SubscribeLostMessage;
import rice.p2p.scribe.messaging.SubscribeMessage;
import rice.p2p.scribe.messaging.UnsubscribeMessage;
import rice.p2p.scribe.rawserialization.JavaScribeContentDeserializer;
import rice.p2p.scribe.rawserialization.JavaSerializedScribeContent;
import rice.p2p.scribe.rawserialization.RawScribeContent;
import rice.p2p.scribe.rawserialization.ScribeContentDeserializer;

/* loaded from: input_file:rice/p2p/scribe/ScribeImpl.class */
public class ScribeImpl implements Application, Scribe {
    public final int MAINTENANCE_INTERVAL;
    public final int MESSAGE_TIMEOUT;
    public Hashtable topics;
    protected ScribePolicy policy;
    protected Endpoint endpoint;
    protected NodeHandle handle;
    private Hashtable outstanding;
    private Hashtable lost;
    private int id;
    Environment environment;
    Logger logger;
    private String instance;
    ScribeContentDeserializer contentDeserializer;

    /* loaded from: input_file:rice/p2p/scribe/ScribeImpl$TopicManager.class */
    public class TopicManager implements Destructable, Observer {
        protected Topic topic;
        protected Id[] pathToRoot;
        protected Vector clients;
        protected Vector children;
        protected NodeHandle parent;

        public TopicManager(ScribeImpl scribeImpl, Topic topic, ScribeClient scribeClient) {
            this(topic);
            addClient(scribeClient);
        }

        public TopicManager(ScribeImpl scribeImpl, Topic topic, NodeHandle nodeHandle) {
            this(topic);
            addChild(nodeHandle);
        }

        protected TopicManager(Topic topic) {
            this.topic = topic;
            this.clients = new Vector();
            this.children = new Vector();
            setPathToRoot(new Id[0]);
        }

        public Topic getTopic() {
            return this.topic;
        }

        public NodeHandle getParent() {
            return this.parent;
        }

        public ScribeClient[] getClients() {
            return (ScribeClient[]) this.clients.toArray(new ScribeClient[0]);
        }

        public NodeHandle[] getChildren() {
            return (NodeHandle[]) this.children.toArray(new NodeHandle[0]);
        }

        public Id[] getPathToRoot() {
            return this.pathToRoot;
        }

        public void setPathToRoot(Id[] idArr) {
            this.pathToRoot = new Id[idArr.length + 1];
            System.arraycopy(idArr, 0, this.pathToRoot, 0, idArr.length);
            this.pathToRoot[idArr.length] = ScribeImpl.this.endpoint.getId();
            NodeHandle[] children = getChildren();
            for (int i = 0; i < children.length; i++) {
                if (Arrays.asList(this.pathToRoot).contains(children[i].getId())) {
                    ScribeImpl.this.endpoint.route((Id) null, (RawMessage) new DropMessage(ScribeImpl.this.handle, this.topic), children[i]);
                    removeChild(children[i]);
                } else {
                    ScribeImpl.this.endpoint.route((Id) null, (RawMessage) new SubscribeAckMessage(ScribeImpl.this.handle, this.topic, getPathToRoot(), Logger.OFF), children[i]);
                }
            }
        }

        public void setParent(NodeHandle nodeHandle) {
            if (nodeHandle != null && this.parent != null && ScribeImpl.this.logger.level <= 900) {
                ScribeImpl.this.logger.log(ScribeImpl.this.endpoint.getId() + ": Unexpectedly changing parents for topic " + this.topic);
            }
            if (this.parent != null) {
                this.parent.deleteObserver(this);
            }
            this.parent = nodeHandle;
            setPathToRoot(new Id[0]);
            if (this.parent == null || !this.parent.isAlive()) {
                return;
            }
            this.parent.addObserver(this);
        }

        public boolean containsClient(ScribeClient scribeClient) {
            return this.clients.contains(scribeClient);
        }

        @Override // java.util.Observer
        public void update(Observable observable, Object obj) {
            if (obj.equals(NodeHandle.DECLARED_DEAD)) {
                if (this.children.contains(observable)) {
                    if (ScribeImpl.this.logger.level <= 500) {
                        ScribeImpl.this.logger.log(ScribeImpl.this.endpoint.getId() + ": Child " + observable + " for topic " + this.topic + " has died - removing.");
                    }
                    ScribeImpl.this.removeChild(this.topic, (NodeHandle) observable);
                } else {
                    if (!observable.equals(this.parent)) {
                        if (ScribeImpl.this.logger.level <= 900) {
                            ScribeImpl.this.logger.log(ScribeImpl.this.endpoint.getId() + ": Received unexpected update from " + observable);
                        }
                        observable.deleteObserver(this);
                        return;
                    }
                    if (ScribeImpl.this.logger.level <= 500) {
                        ScribeImpl.this.logger.log(ScribeImpl.this.endpoint.getId() + ": Parent " + this.parent + " for topic " + this.topic + " has died - resubscribing.");
                    }
                    setParent(null);
                    if (this.clients.size() > 0) {
                        ScribeImpl.this.sendSubscribe(this.topic, (ScribeClient) this.clients.elementAt(0), null, ((NodeHandle) observable).getId());
                    } else {
                        ScribeImpl.this.sendSubscribe(this.topic, null, null, ((NodeHandle) observable).getId());
                    }
                }
            }
        }

        public void addClient(ScribeClient scribeClient) {
            if (this.clients.contains(scribeClient)) {
                return;
            }
            this.clients.add(scribeClient);
        }

        public boolean removeClient(ScribeClient scribeClient) {
            this.clients.remove(scribeClient);
            boolean z = this.clients.size() == 0 && this.children.size() == 0;
            if (z && this.parent != null) {
                this.parent.deleteObserver(this);
            }
            return z;
        }

        public void addChild(NodeHandle nodeHandle) {
            if (this.children.contains(nodeHandle) || !nodeHandle.isAlive()) {
                return;
            }
            this.children.add(nodeHandle);
            nodeHandle.addObserver(this);
        }

        public boolean removeChild(NodeHandle nodeHandle) {
            this.children.remove(nodeHandle);
            nodeHandle.deleteObserver(this);
            boolean z = this.clients.size() == 0 && this.children.size() == 0;
            if (z && this.parent != null) {
                this.parent.deleteObserver(this);
            }
            return z;
        }

        @Override // rice.Destructable
        public void destroy() {
            if (ScribeImpl.this.logger.level <= 500) {
                ScribeImpl.this.logger.log("Destroying " + this);
            }
            if (this.parent != null) {
                if (ScribeImpl.this.logger.level <= 400) {
                    ScribeImpl.this.logger.log(this.parent + ".deleteObserver()p");
                }
                this.parent.deleteObserver(this);
            }
            Iterator it = this.children.iterator();
            while (it.hasNext()) {
                NodeHandle nodeHandle = (NodeHandle) it.next();
                if (ScribeImpl.this.logger.level <= 400) {
                    ScribeImpl.this.logger.log(nodeHandle + ".deleteObserver()c");
                }
                nodeHandle.deleteObserver(this);
            }
        }
    }

    public ScribeImpl(Node node, String str) {
        this(node, new ScribePolicy.DefaultScribePolicy(node.getEnvironment()), str);
    }

    public ScribeImpl(Node node, ScribePolicy scribePolicy, String str) {
        this.environment = node.getEnvironment();
        this.logger = this.environment.getLogManager().getLogger(ScribeImpl.class, str);
        Parameters parameters = this.environment.getParameters();
        this.MAINTENANCE_INTERVAL = parameters.getInt("p2p_scribe_maintenance_interval");
        this.MESSAGE_TIMEOUT = parameters.getInt("p2p_scribe_message_timeout");
        this.instance = str;
        this.endpoint = node.buildEndpoint(this, str);
        this.contentDeserializer = new JavaScribeContentDeserializer();
        this.endpoint.setDeserializer(new MessageDeserializer() { // from class: rice.p2p.scribe.ScribeImpl.1
            @Override // rice.p2p.commonapi.rawserialization.MessageDeserializer
            public Message deserialize(InputBuffer inputBuffer, short s, byte b, NodeHandle nodeHandle) throws IOException {
                try {
                    switch (s) {
                        case 1:
                            return AnycastMessage.build(inputBuffer, ScribeImpl.this.endpoint, ScribeImpl.this.contentDeserializer);
                        case 2:
                            return SubscribeMessage.buildSM(inputBuffer, ScribeImpl.this.endpoint, ScribeImpl.this.contentDeserializer);
                        case 3:
                            return SubscribeAckMessage.build(inputBuffer, ScribeImpl.this.endpoint);
                        case 4:
                            return SubscribeFailedMessage.build(inputBuffer, ScribeImpl.this.endpoint);
                        case 5:
                        case 7:
                        default:
                            throw new IllegalArgumentException("Unknown type:" + ((int) s));
                        case 6:
                            return DropMessage.build(inputBuffer, ScribeImpl.this.endpoint);
                        case 8:
                            return PublishMessage.build(inputBuffer, ScribeImpl.this.endpoint, ScribeImpl.this.contentDeserializer);
                        case 9:
                            return PublishRequestMessage.build(inputBuffer, ScribeImpl.this.endpoint, ScribeImpl.this.contentDeserializer);
                        case 10:
                            return UnsubscribeMessage.build(inputBuffer, ScribeImpl.this.endpoint);
                    }
                } catch (IOException e) {
                    if (ScribeImpl.this.logger.level <= 1000) {
                        ScribeImpl.this.logger.log("Exception in deserializer in " + ScribeImpl.this.endpoint.toString() + ":" + ScribeImpl.this.instance + " " + ScribeImpl.this.contentDeserializer + " " + e);
                    }
                    throw e;
                }
            }
        });
        this.topics = new Hashtable();
        this.outstanding = new Hashtable();
        this.lost = new Hashtable();
        this.policy = scribePolicy;
        this.handle = this.endpoint.getLocalNodeHandle();
        this.id = Logger.ALL;
        this.endpoint.register();
        this.endpoint.scheduleMessage(new MaintenanceMessage(), this.environment.getRandomSource().nextInt(this.MAINTENANCE_INTERVAL), this.MAINTENANCE_INTERVAL);
        if (this.logger.level <= 400) {
            this.logger.log(this.endpoint.getId() + ": Starting up Scribe");
        }
    }

    @Override // rice.p2p.scribe.Scribe
    public Environment getEnvironment() {
        return this.environment;
    }

    @Override // rice.p2p.scribe.Scribe
    public ScribePolicy getPolicy() {
        return this.policy;
    }

    public Id getId() {
        return this.endpoint.getId();
    }

    public ScribeClient[] getClients(Topic topic) {
        return this.topics.get(topic) != null ? ((TopicManager) this.topics.get(topic)).getClients() : new ScribeClient[0];
    }

    @Override // rice.p2p.scribe.Scribe
    public NodeHandle[] getChildren(Topic topic) {
        return this.topics.get(topic) != null ? ((TopicManager) this.topics.get(topic)).getChildren() : new NodeHandle[0];
    }

    @Override // rice.p2p.scribe.Scribe
    public NodeHandle getParent(Topic topic) {
        if (this.topics.get(topic) != null) {
            return ((TopicManager) this.topics.get(topic)).getParent();
        }
        return null;
    }

    @Override // rice.p2p.scribe.Scribe
    public boolean isRoot(Topic topic) {
        NodeHandleSet replicaSet = this.endpoint.replicaSet(topic.getId(), 1);
        if (replicaSet.size() == 0) {
            return false;
        }
        return replicaSet.getHandle(0).getId().equals(this.endpoint.getId());
    }

    @Override // rice.p2p.scribe.Scribe
    public Topic[] getTopics(ScribeClient scribeClient) {
        Vector vector = new Vector();
        Enumeration keys = this.topics.keys();
        while (keys.hasMoreElements()) {
            Topic topic = (Topic) keys.nextElement();
            if (((TopicManager) this.topics.get(topic)).containsClient(scribeClient)) {
                vector.add(topic);
            }
        }
        return (Topic[]) vector.toArray(new Topic[0]);
    }

    @Override // rice.p2p.scribe.Scribe
    public void setPolicy(ScribePolicy scribePolicy) {
        this.policy = scribePolicy;
    }

    @Override // rice.p2p.scribe.Scribe
    public void setContentDeserializer(ScribeContentDeserializer scribeContentDeserializer) {
        this.contentDeserializer = scribeContentDeserializer;
    }

    private void sendSubscribe(Topic topic, ScribeClient scribeClient, RawScribeContent rawScribeContent) {
        sendSubscribe(topic, scribeClient, rawScribeContent, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendSubscribe(Topic topic, ScribeClient scribeClient, RawScribeContent rawScribeContent, Id id) {
        this.id++;
        if (this.logger.level <= 300) {
            this.logger.log(this.endpoint.getId() + ": Sending subscribe message for topic " + topic + " client:" + scribeClient);
        }
        if (scribeClient == null) {
            ScribeClient[] clients = getClients(topic);
            if (clients.length > 0) {
                scribeClient = clients[0];
            }
        }
        if (scribeClient != null) {
            this.outstanding.put(new Integer(this.id), scribeClient);
        }
        this.endpoint.route(topic.getId(), (RawMessage) new SubscribeMessage(this.handle, topic, id, this.id, rawScribeContent), (NodeHandle) null);
        this.lost.put(new Integer(this.id), this.endpoint.scheduleMessage(new SubscribeLostMessage(this.handle, topic, this.id), this.MESSAGE_TIMEOUT));
    }

    private void ackMessageReceived(SubscribeAckMessage subscribeAckMessage) {
        ScribeClient scribeClient = (ScribeClient) this.outstanding.remove(new Integer(subscribeAckMessage.getId()));
        if (this.logger.level <= 400) {
            this.logger.log(this.endpoint.getId() + ": Removing client " + scribeClient + " from list of outstanding for ack " + subscribeAckMessage.getId());
        }
        CancellableTask cancellableTask = (CancellableTask) this.lost.remove(new Integer(subscribeAckMessage.getId()));
        if (cancellableTask != null) {
            cancellableTask.cancel();
        }
    }

    private void failedMessageReceived(SubscribeFailedMessage subscribeFailedMessage) {
        ScribeClient scribeClient = (ScribeClient) this.outstanding.remove(new Integer(subscribeFailedMessage.getId()));
        this.lost.remove(new Integer(subscribeFailedMessage.getId()));
        if (this.logger.level <= 400) {
            this.logger.log(this.endpoint.getId() + ": Telling client " + scribeClient + " about FAILURE for outstanding ack " + subscribeFailedMessage.getId());
        }
        if (scribeClient != null) {
            scribeClient.subscribeFailed(subscribeFailedMessage.getTopic());
        }
    }

    private void lostMessageReceived(SubscribeLostMessage subscribeLostMessage) {
        ScribeClient scribeClient = (ScribeClient) this.outstanding.remove(new Integer(subscribeLostMessage.getId()));
        this.lost.remove(new Integer(subscribeLostMessage.getId()));
        if (this.logger.level <= 400) {
            this.logger.log(this.endpoint.getId() + ": Telling client " + scribeClient + " about LOSS for outstanding ack " + subscribeLostMessage.getId());
        }
        if (scribeClient != null) {
            scribeClient.subscribeFailed(subscribeLostMessage.getTopic());
        }
    }

    @Override // rice.p2p.scribe.Scribe
    public void subscribe(Topic topic, ScribeClient scribeClient) {
        subscribe(topic, scribeClient, (RawScribeContent) null);
    }

    @Override // rice.p2p.scribe.Scribe
    public void subscribe(Topic topic, ScribeClient scribeClient, ScribeContent scribeContent) {
        subscribe(topic, scribeClient, scribeContent instanceof RawScribeContent ? (RawScribeContent) scribeContent : new JavaSerializedScribeContent(scribeContent));
    }

    @Override // rice.p2p.scribe.Scribe
    public void subscribe(Topic topic, ScribeClient scribeClient, RawScribeContent rawScribeContent) {
        if (this.logger.level <= 400) {
            this.logger.log(this.endpoint.getId() + ": Subscribing client " + scribeClient + " to topic " + topic);
        }
        if (this.topics.get(topic) == null) {
            this.topics.put(topic, new TopicManager(this, topic, scribeClient));
            sendSubscribe(topic, scribeClient, rawScribeContent);
            return;
        }
        TopicManager topicManager = (TopicManager) this.topics.get(topic);
        topicManager.addClient(scribeClient);
        if (topicManager.getParent() != null || isRoot(topic)) {
            return;
        }
        sendSubscribe(topic, scribeClient, rawScribeContent);
    }

    @Override // rice.p2p.scribe.Scribe
    public void unsubscribe(Topic topic, ScribeClient scribeClient) {
        if (this.logger.level <= 400) {
            this.logger.log(this.endpoint.getId() + ": Unsubscribing client " + scribeClient + " from topic " + topic);
        }
        if (this.topics.get(topic) == null) {
            if (this.logger.level <= 900) {
                this.logger.log(this.endpoint.getId() + ": Attempt to unsubscribe client " + scribeClient + " from unknown topic " + topic);
                return;
            }
            return;
        }
        TopicManager topicManager = (TopicManager) this.topics.get(topic);
        if (topicManager.removeClient(scribeClient)) {
            this.topics.remove(topic);
            NodeHandle parent = topicManager.getParent();
            if (parent != null) {
                this.endpoint.route((Id) null, (RawMessage) new UnsubscribeMessage(this.handle, topic), parent);
            }
        }
    }

    @Override // rice.p2p.scribe.Scribe
    public void publish(Topic topic, ScribeContent scribeContent) {
        publish(topic, scribeContent instanceof RawScribeContent ? (RawScribeContent) scribeContent : new JavaSerializedScribeContent(scribeContent));
    }

    @Override // rice.p2p.scribe.Scribe
    public void publish(Topic topic, RawScribeContent rawScribeContent) {
        if (this.logger.level <= 400) {
            this.logger.log(this.endpoint.getId() + ": Publishing content " + rawScribeContent + " to topic " + topic);
        }
        this.endpoint.route(topic.getId(), (RawMessage) new PublishRequestMessage(this.handle, topic, rawScribeContent), (NodeHandle) null);
    }

    @Override // rice.p2p.scribe.Scribe
    public void anycast(Topic topic, ScribeContent scribeContent) {
        if (scribeContent instanceof RawScribeContent) {
            anycast(topic, (RawScribeContent) scribeContent);
        } else {
            anycast(topic, (RawScribeContent) new JavaSerializedScribeContent(scribeContent));
        }
    }

    @Override // rice.p2p.scribe.Scribe
    public void anycast(Topic topic, RawScribeContent rawScribeContent) {
        if (this.logger.level <= 400) {
            this.logger.log(this.endpoint.getId() + ": Anycasting content " + rawScribeContent + " to topic " + topic);
        }
        this.endpoint.route(topic.getId(), (RawMessage) new AnycastMessage(this.handle, topic, rawScribeContent), (NodeHandle) null);
    }

    @Override // rice.p2p.scribe.Scribe
    public void addChild(Topic topic, NodeHandle nodeHandle) {
        addChild(topic, nodeHandle, Logger.OFF);
    }

    protected void addChild(Topic topic, NodeHandle nodeHandle, int i) {
        if (this.logger.level <= 400) {
            this.logger.log(this.endpoint.getId() + ": Adding child " + nodeHandle + " to topic " + topic);
        }
        TopicManager topicManager = (TopicManager) this.topics.get(topic);
        if (topicManager == null) {
            topicManager = new TopicManager(this, topic, nodeHandle);
            this.topics.put(topic, topicManager);
            if (this.logger.level <= 400) {
                this.logger.log(this.endpoint.getId() + ": Implicitly subscribing to topic " + topic);
            }
            sendSubscribe(topic, null, null);
        } else {
            topicManager.addChild(nodeHandle);
        }
        this.endpoint.route((Id) null, (RawMessage) new SubscribeAckMessage(this.handle, topic, topicManager.getPathToRoot(), i), nodeHandle);
        this.policy.childAdded(topic, nodeHandle);
        for (ScribeClient scribeClient : topicManager.getClients()) {
            scribeClient.childAdded(topic, nodeHandle);
        }
    }

    @Override // rice.p2p.scribe.Scribe
    public void removeChild(Topic topic, NodeHandle nodeHandle) {
        removeChild(topic, nodeHandle, true);
    }

    protected void removeChild(Topic topic, NodeHandle nodeHandle, boolean z) {
        if (this.logger.level <= 500) {
            this.logger.log(this.endpoint.getId() + ": Removing child " + nodeHandle + " from topic " + topic);
        }
        if (this.topics.get(topic) == null) {
            if (this.logger.level <= 900) {
                this.logger.log(this.endpoint.getId() + ": Unexpected attempt to remove child " + nodeHandle + " from unknown topic " + topic);
                return;
            }
            return;
        }
        TopicManager topicManager = (TopicManager) this.topics.get(topic);
        if (topicManager.removeChild(nodeHandle)) {
            this.topics.remove(topic);
            NodeHandle parent = topicManager.getParent();
            if (this.logger.level <= 500) {
                this.logger.log(this.endpoint.getId() + ": We no longer need topic " + topic + " - unsubscribing from parent " + parent);
            }
            if (parent != null) {
                this.endpoint.route((Id) null, (RawMessage) new UnsubscribeMessage(this.handle, topic), parent);
            }
        }
        if (z && nodeHandle.isAlive()) {
            if (this.logger.level <= 500) {
                this.logger.log(this.endpoint.getId() + ": Informing child " + nodeHandle + " that he has been dropped from topic " + topic);
            }
            this.endpoint.route((Id) null, (RawMessage) new DropMessage(this.handle, topic), nodeHandle);
        }
        this.policy.childRemoved(topic, nodeHandle);
        for (ScribeClient scribeClient : topicManager.getClients()) {
            scribeClient.childRemoved(topic, nodeHandle);
        }
    }

    @Override // rice.p2p.commonapi.Application
    public boolean forward(RouteMessage routeMessage) {
        NodeHandle nodeHandle;
        try {
            Message message = routeMessage.getMessage(this.endpoint.getDeserializer());
            if (this.logger.level <= 300) {
                this.logger.log(this.endpoint.getId() + ": Forward called with " + message);
            }
            if (!(message instanceof AnycastMessage)) {
                return true;
            }
            AnycastMessage anycastMessage = (AnycastMessage) message;
            TopicManager topicManager = (TopicManager) this.topics.get(anycastMessage.getTopic());
            if (message instanceof SubscribeMessage) {
                SubscribeMessage subscribeMessage = (SubscribeMessage) message;
                if (subscribeMessage.getSource().getId().equals(this.endpoint.getId())) {
                    return true;
                }
                if (topicManager != null) {
                    if (Arrays.asList(topicManager.getPathToRoot()).contains(subscribeMessage.getPreviousParent())) {
                        if (this.logger.level > 800) {
                            return true;
                        }
                        this.logger.log(this.endpoint.getId() + ": Rejecting subscribe message from " + subscribeMessage.getSubscriber() + " for topic " + subscribeMessage.getTopic() + " because we are on the subscriber's path to the root.");
                        return true;
                    }
                }
                ScribeClient[] scribeClientArr = new ScribeClient[0];
                NodeHandle[] nodeHandleArr = new NodeHandle[0];
                if (topicManager != null) {
                    scribeClientArr = topicManager.getClients();
                    nodeHandleArr = topicManager.getChildren();
                }
                if (Arrays.asList(nodeHandleArr).contains(subscribeMessage.getSubscriber())) {
                    return false;
                }
                if (this.policy.allowSubscribe(subscribeMessage, scribeClientArr, nodeHandleArr)) {
                    if (this.logger.level <= 400) {
                        this.logger.log(this.endpoint.getId() + ": Hijacking subscribe message from " + subscribeMessage.getSubscriber() + " for topic " + subscribeMessage.getTopic());
                    }
                    addChild(subscribeMessage.getTopic(), subscribeMessage.getSubscriber(), subscribeMessage.getId());
                    return false;
                }
                if (this.logger.level <= 400) {
                    this.logger.log(this.endpoint.getId() + ": Rejecting subscribe message from " + subscribeMessage.getSubscriber() + " for topic " + subscribeMessage.getTopic());
                }
                if (topicManager == null) {
                    return true;
                }
            } else {
                if (topicManager == null) {
                    return true;
                }
                for (ScribeClient scribeClient : topicManager.getClients()) {
                    if (scribeClient.anycast(anycastMessage.getTopic(), anycastMessage.getContent())) {
                        if (this.logger.level > 400) {
                            return false;
                        }
                        this.logger.log(this.endpoint.getId() + ": Accepting anycast message from " + anycastMessage.getSource() + " for topic " + anycastMessage.getTopic());
                        return false;
                    }
                }
                if (anycastMessage.getSource().getId().equals(this.endpoint.getId()) && routeMessage.getNextHopHandle() != null && !this.handle.equals(routeMessage.getNextHopHandle())) {
                    return true;
                }
                if (this.logger.level <= 400) {
                    this.logger.log(this.endpoint.getId() + ": Rejecting anycast message from " + anycastMessage.getSource() + " for topic " + anycastMessage.getTopic());
                }
            }
            anycastMessage.addVisited(this.endpoint.getLocalNodeHandle());
            this.policy.directAnycast(anycastMessage, topicManager.getParent(), topicManager.getChildren());
            anycastMessage.setSource(this.endpoint.getLocalNodeHandle());
            NodeHandle next = anycastMessage.getNext();
            while (true) {
                nodeHandle = next;
                if (nodeHandle == null || nodeHandle.isAlive()) {
                    break;
                }
                next = anycastMessage.getNext();
            }
            if (this.logger.level <= 400) {
                this.logger.log(this.endpoint.getId() + ": Forwarding anycast message for topic " + anycastMessage.getTopic() + "on to " + nodeHandle);
            }
            if (nodeHandle != null) {
                this.endpoint.route((Id) null, (RawMessage) anycastMessage, nodeHandle);
                return false;
            }
            if (this.logger.level <= 500) {
                this.logger.log(this.endpoint.getId() + ": Anycast " + anycastMessage + " failed.");
            }
            if (!(anycastMessage instanceof SubscribeMessage)) {
                return false;
            }
            SubscribeMessage subscribeMessage2 = (SubscribeMessage) anycastMessage;
            if (this.logger.level <= 400) {
                this.logger.log(this.endpoint.getId() + ": Sending SubscribeFailedMessage to " + subscribeMessage2.getSubscriber());
            }
            this.endpoint.route((Id) null, (RawMessage) new SubscribeFailedMessage(nodeHandle, subscribeMessage2.getTopic(), subscribeMessage2.getId()), subscribeMessage2.getSubscriber());
            return false;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // rice.p2p.commonapi.Application
    public void deliver(Id id, Message message) {
        if (this.logger.level <= 300) {
            this.logger.log(this.endpoint.getId() + ": Deliver called with " + id + " " + message);
        }
        if (message instanceof AnycastMessage) {
            AnycastMessage anycastMessage = (AnycastMessage) message;
            if (anycastMessage.getSource().getId().equals(this.endpoint.getId())) {
                if (!(anycastMessage instanceof SubscribeMessage)) {
                    if (this.logger.level <= 900) {
                        this.logger.log(this.endpoint.getId() + ": Received unexpected delivered anycast message " + anycastMessage + " for topic " + anycastMessage.getTopic() + " - was generated by us.");
                        return;
                    }
                    return;
                } else {
                    this.outstanding.remove(new Integer(((SubscribeMessage) message).getId()));
                    if (this.logger.level <= 500) {
                        this.logger.log(this.endpoint.getId() + ": Received our own subscribe message " + anycastMessage + " for topic " + anycastMessage.getTopic() + " - we are the root.");
                        return;
                    }
                    return;
                }
            }
            if (!(anycastMessage instanceof SubscribeMessage)) {
                if (this.logger.level <= 900) {
                    this.logger.log(this.endpoint.getId() + ": Received unexpected delivered anycast message " + anycastMessage + " for topic " + anycastMessage.getTopic() + " - not generated by us, but was expected to be.");
                    return;
                }
                return;
            } else {
                SubscribeMessage subscribeMessage = (SubscribeMessage) anycastMessage;
                if (this.logger.level <= 500) {
                    this.logger.log(this.endpoint.getId() + ": Sending SubscribeFailedMessage (at root) to " + subscribeMessage.getSubscriber());
                }
                this.endpoint.route((Id) null, (RawMessage) new SubscribeFailedMessage(this.handle, subscribeMessage.getTopic(), subscribeMessage.getId()), subscribeMessage.getSubscriber());
                return;
            }
        }
        if (message instanceof SubscribeAckMessage) {
            SubscribeAckMessage subscribeAckMessage = (SubscribeAckMessage) message;
            TopicManager topicManager = (TopicManager) this.topics.get(subscribeAckMessage.getTopic());
            ackMessageReceived(subscribeAckMessage);
            if (this.logger.level <= 500) {
                this.logger.log(this.endpoint.getId() + ": Received subscribe ack message from " + subscribeAckMessage.getSource() + " for topic " + subscribeAckMessage.getTopic());
            }
            if (!subscribeAckMessage.getSource().isAlive() && this.logger.level <= 900) {
                this.logger.log(this.endpoint.getId() + ": Received subscribe ack message from " + subscribeAckMessage.getSource() + " for topic " + subscribeAckMessage.getTopic());
            }
            if (isRoot(subscribeAckMessage.getTopic())) {
                if (this.logger.level <= 500) {
                    this.logger.log(this.endpoint.getId() + ": Received unexpected subscribe ack message (we are the root) from " + subscribeAckMessage.getSource() + " for topic " + subscribeAckMessage.getTopic());
                }
                this.endpoint.route((Id) null, (RawMessage) new UnsubscribeMessage(this.handle, subscribeAckMessage.getTopic()), subscribeAckMessage.getSource());
                return;
            }
            if (topicManager == null) {
                if (this.logger.level <= 900) {
                    this.logger.log(this.endpoint.getId() + ": Received unexpected subscribe ack message from " + subscribeAckMessage.getSource() + " for unknown topic " + subscribeAckMessage.getTopic());
                }
                this.endpoint.route((Id) null, (RawMessage) new UnsubscribeMessage(this.handle, subscribeAckMessage.getTopic()), subscribeAckMessage.getSource());
                return;
            }
            if (topicManager.getParent() == null) {
                topicManager.setParent(subscribeAckMessage.getSource());
            }
            if (topicManager.getParent().equals(subscribeAckMessage.getSource())) {
                topicManager.setPathToRoot(subscribeAckMessage.getPathToRoot());
                return;
            }
            if (this.logger.level <= 900) {
                this.logger.log(this.endpoint.getId() + ": Received somewhat unexpected subscribe ack message (already have parent " + topicManager.getParent() + ") from " + subscribeAckMessage.getSource() + " for topic " + subscribeAckMessage.getTopic() + " - the new policy is now to accept the message");
            }
            NodeHandle parent = topicManager.getParent();
            topicManager.setParent(subscribeAckMessage.getSource());
            topicManager.setPathToRoot(subscribeAckMessage.getPathToRoot());
            this.endpoint.route((Id) null, (RawMessage) new UnsubscribeMessage(this.handle, subscribeAckMessage.getTopic()), parent);
            return;
        }
        if (message instanceof SubscribeLostMessage) {
            lostMessageReceived((SubscribeLostMessage) message);
            return;
        }
        if (message instanceof SubscribeFailedMessage) {
            failedMessageReceived((SubscribeFailedMessage) message);
            return;
        }
        if (message instanceof PublishRequestMessage) {
            PublishRequestMessage publishRequestMessage = (PublishRequestMessage) message;
            TopicManager topicManager2 = (TopicManager) this.topics.get(publishRequestMessage.getTopic());
            if (this.logger.level <= 400) {
                this.logger.log(this.endpoint.getId() + ": Received publish request message with data " + publishRequestMessage.getContent() + " for topic " + publishRequestMessage.getTopic());
            }
            if (topicManager2 != null) {
                deliver(publishRequestMessage.getTopic().getId(), new PublishMessage(publishRequestMessage.getSource(), publishRequestMessage.getTopic(), publishRequestMessage.getContent()));
                return;
            } else {
                if (this.logger.level <= 500) {
                    this.logger.log(this.endpoint.getId() + ": Received publish request message for non-existent topic " + publishRequestMessage.getTopic() + " - dropping on floor.");
                    return;
                }
                return;
            }
        }
        if (message instanceof PublishMessage) {
            PublishMessage publishMessage = (PublishMessage) message;
            TopicManager topicManager3 = (TopicManager) this.topics.get(publishMessage.getTopic());
            if (this.logger.level <= 400) {
                this.logger.log(this.endpoint.getId() + ": Received publish message with data " + publishMessage.getContent() + " for topic " + publishMessage.getTopic());
            }
            if (topicManager3 == null || !(topicManager3.getParent() == null || topicManager3.getParent().equals(publishMessage.getSource()))) {
                if (this.logger.level <= 900) {
                    this.logger.log(this.endpoint.getId() + ": Received unexpected publish message from " + publishMessage.getSource() + " for unknown topic " + publishMessage.getTopic());
                }
                this.endpoint.route((Id) null, (RawMessage) new UnsubscribeMessage(this.handle, publishMessage.getTopic()), publishMessage.getSource());
                return;
            }
            publishMessage.setSource(this.handle);
            ScribeClient[] clients = topicManager3.getClients();
            for (int i = 0; i < clients.length; i++) {
                if (this.logger.level <= 400) {
                    this.logger.log(this.endpoint.getId() + ": Delivering publish message with data " + publishMessage.getContent() + " for topic " + publishMessage.getTopic() + " to client " + clients[i]);
                }
                clients[i].deliver(publishMessage.getTopic(), publishMessage.getContent());
            }
            NodeHandle[] children = topicManager3.getChildren();
            for (int i2 = 0; i2 < children.length; i2++) {
                if (this.logger.level <= 400) {
                    this.logger.log(this.endpoint.getId() + ": Forwarding publish message with data " + publishMessage.getContent() + " for topic " + publishMessage.getTopic() + " to child " + children[i2]);
                }
                this.endpoint.route((Id) null, (RawMessage) new PublishMessage(this.endpoint.getLocalNodeHandle(), publishMessage.getTopic(), publishMessage.getContent()), children[i2]);
            }
            return;
        }
        if (message instanceof UnsubscribeMessage) {
            UnsubscribeMessage unsubscribeMessage = (UnsubscribeMessage) message;
            if (this.logger.level <= 500) {
                this.logger.log(this.endpoint.getId() + ": Received unsubscribe message from " + unsubscribeMessage.getSource() + " for topic " + unsubscribeMessage.getTopic());
            }
            removeChild(unsubscribeMessage.getTopic(), unsubscribeMessage.getSource(), false);
            return;
        }
        if (!(message instanceof DropMessage)) {
            if (!(message instanceof MaintenanceMessage)) {
                if (this.logger.level <= 900) {
                    this.logger.log(this.endpoint.getId() + ": Received unknown message " + message + " - dropping on floor.");
                    return;
                }
                return;
            }
            if (this.logger.level <= 500) {
                this.logger.log(this.endpoint.getId() + ": Received maintenance message");
            }
            for (TopicManager topicManager4 : this.topics.values()) {
                NodeHandle parent2 = topicManager4.getParent();
                if (parent2 != null) {
                    this.endpoint.route(topicManager4.getTopic().getId(), (RawMessage) new SubscribeMessage(this.handle, topicManager4.getTopic(), this.handle.getId(), -1, null), parent2);
                    parent2.checkLiveness();
                }
            }
            return;
        }
        DropMessage dropMessage = (DropMessage) message;
        if (this.logger.level <= 500) {
            this.logger.log(this.endpoint.getId() + ": Received drop message from " + dropMessage.getSource() + " for topic " + dropMessage.getTopic());
        }
        TopicManager topicManager5 = (TopicManager) this.topics.get(dropMessage.getTopic());
        if (topicManager5 == null) {
            if (this.logger.level <= 900) {
                this.logger.log(this.endpoint.getId() + ": Received unexpected drop message from " + dropMessage.getSource() + " for unknown topic " + dropMessage.getTopic() + " - ignoring");
            }
        } else if (topicManager5.getParent() == null || !topicManager5.getParent().equals(dropMessage.getSource())) {
            if (this.logger.level <= 900) {
                this.logger.log(this.endpoint.getId() + ": Received unexpected drop message from non-parent " + dropMessage.getSource() + " for topic " + dropMessage.getTopic() + " - ignoring");
            }
        } else {
            topicManager5.setParent(null);
            ScribeClient[] clients2 = topicManager5.getClients();
            if (clients2.length > 0) {
                sendSubscribe(dropMessage.getTopic(), clients2[0], null);
            } else {
                sendSubscribe(dropMessage.getTopic(), null, null);
            }
        }
    }

    @Override // rice.p2p.commonapi.Application
    public void update(NodeHandle nodeHandle, boolean z) {
        Iterator it;
        Set keySet = this.topics.keySet();
        synchronized (keySet) {
            it = new ArrayList(keySet).iterator();
        }
        while (it.hasNext()) {
            Topic topic = (Topic) it.next();
            TopicManager topicManager = (TopicManager) this.topics.get(topic);
            if (z) {
                if (topicManager.getParent() == null) {
                    sendSubscribe(topic, null, null);
                }
            } else if (isRoot(topic) && topicManager.getParent() != null) {
                this.endpoint.route((Id) null, (RawMessage) new UnsubscribeMessage(nodeHandle, topic), topicManager.getParent());
                topicManager.setParent(null);
            }
        }
    }

    @Override // rice.p2p.scribe.Scribe, rice.Destructable
    public void destroy() {
        if (this.logger.level <= 800) {
            this.logger.log("Destroying " + this);
        }
        Iterator it = this.topics.values().iterator();
        while (it.hasNext()) {
            ((TopicManager) it.next()).destroy();
        }
    }
}
