package rice.p2p.replication;

import java.io.IOException;
import java.util.Iterator;
import rice.Continuation;
import rice.Destructable;
import rice.Executable;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.environment.params.Parameters;
import rice.p2p.commonapi.Application;
import rice.p2p.commonapi.Endpoint;
import rice.p2p.commonapi.Id;
import rice.p2p.commonapi.IdFactory;
import rice.p2p.commonapi.IdRange;
import rice.p2p.commonapi.IdSet;
import rice.p2p.commonapi.Message;
import rice.p2p.commonapi.Node;
import rice.p2p.commonapi.NodeHandle;
import rice.p2p.commonapi.NodeHandleSet;
import rice.p2p.commonapi.RangeCannotBeDeterminedException;
import rice.p2p.commonapi.RouteMessage;
import rice.p2p.commonapi.rawserialization.InputBuffer;
import rice.p2p.commonapi.rawserialization.MessageDeserializer;
import rice.p2p.commonapi.rawserialization.RawMessage;
import rice.p2p.replication.ReplicationPolicy;
import rice.p2p.replication.messaging.ReminderMessage;
import rice.p2p.replication.messaging.RequestMessage;
import rice.p2p.replication.messaging.ResponseMessage;
import rice.p2p.util.IdBloomFilter;

/* loaded from: input_file:rice/p2p/replication/ReplicationImpl.class */
public class ReplicationImpl implements Replication, Application, Destructable {
    public final int MAINTENANCE_INTERVAL;
    public final int MAX_KEYS_IN_MESSAGE;
    protected Endpoint endpoint;
    protected NodeHandle handle;
    protected IdFactory factory;
    protected ReplicationClient client;
    protected ReplicationPolicy policy;
    protected int replicationFactor;
    protected String instance;
    Environment environment;
    Logger logger;
    protected boolean destroyed;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:rice/p2p/replication/ReplicationImpl$BloomFilterExecutable.class */
    public class BloomFilterExecutable implements Executable {
        protected IdRange range;

        public BloomFilterExecutable(IdRange idRange) {
            this.range = idRange;
        }

        public String toString() {
            return "bloomfilter range " + this.range + " namespace " + ReplicationImpl.this.instance;
        }

        @Override // rice.Executable
        public Object execute() {
            return new IdBloomFilter(ReplicationImpl.this.client.scan(this.range));
        }
    }

    public ReplicationImpl(Node node, ReplicationClient replicationClient, int i, String str) {
        this(node, replicationClient, i, str, new ReplicationPolicy.DefaultReplicationPolicy());
    }

    public ReplicationImpl(Node node, ReplicationClient replicationClient, int i, String str, ReplicationPolicy replicationPolicy) {
        this.destroyed = false;
        this.environment = node.getEnvironment();
        this.environment.addDestructable(this);
        this.logger = this.environment.getLogManager().getLogger(ReplicationImpl.class, str);
        Parameters parameters = this.environment.getParameters();
        this.MAINTENANCE_INTERVAL = parameters.getInt("p2p_replication_maintenance_interval");
        this.MAX_KEYS_IN_MESSAGE = parameters.getInt("p2p_replication_max_keys_in_message");
        this.client = replicationClient;
        this.replicationFactor = i;
        this.factory = node.getIdFactory();
        this.policy = replicationPolicy;
        this.instance = str;
        this.endpoint = node.buildEndpoint(this, str);
        this.endpoint.setDeserializer(new MessageDeserializer() { // from class: rice.p2p.replication.ReplicationImpl.1
            @Override // rice.p2p.commonapi.rawserialization.MessageDeserializer
            public Message deserialize(InputBuffer inputBuffer, short s, int i2, NodeHandle nodeHandle) throws IOException {
                switch (s) {
                    case 2:
                        return RequestMessage.build(inputBuffer, ReplicationImpl.this.endpoint);
                    case 3:
                        return ResponseMessage.build(inputBuffer, ReplicationImpl.this.endpoint);
                    default:
                        throw new IllegalArgumentException("Unknown type:" + ((int) s));
                }
            }
        });
        if (this.policy == null) {
            this.policy = new ReplicationPolicy.DefaultReplicationPolicy();
        }
        this.handle = this.endpoint.getLocalNodeHandle();
        if (this.logger.level <= 400) {
            this.logger.log("Starting up ReplicationImpl with client " + replicationClient + " and factor " + i);
        }
        this.endpoint.register();
        this.endpoint.scheduleMessage(new ReminderMessage(this.handle), this.environment.getRandomSource().nextInt(this.MAINTENANCE_INTERVAL), this.MAINTENANCE_INTERVAL);
    }

    public static IdSet merge(IdFactory idFactory, IdSet idSet, IdSet idSet2) {
        IdSet buildIdSet = idFactory.buildIdSet();
        Iterator iterator = idSet.getIterator();
        while (iterator.hasNext()) {
            buildIdSet.addId((Id) iterator.next());
        }
        Iterator iterator2 = idSet2.getIterator();
        while (iterator2.hasNext()) {
            buildIdSet.addId((Id) iterator2.next());
        }
        return buildIdSet;
    }

    protected IdRange getTotalRange() {
        try {
            return this.endpoint.range(this.handle, this.replicationFactor, this.handle.getId(), true);
        } catch (RangeCannotBeDeterminedException e) {
            if (this.logger.level > 900) {
                return null;
            }
            this.logger.log("ReplicationImpl.getTotalRange():" + e + " returning null.");
            return null;
        }
    }

    private void updateClient() {
        if (this.logger.level <= 500) {
            this.logger.log("Updating client with range " + getTotalRange());
        }
        if (getTotalRange() != null) {
            this.client.setRange(getTotalRange());
        }
    }

    @Override // rice.p2p.replication.Replication
    public void replicate() {
        final NodeHandleSet neighborSet = this.endpoint.neighborSet(Integer.MAX_VALUE);
        final IdRange range = this.endpoint.range(this.handle, 0, this.handle.getId());
        this.endpoint.process(new BloomFilterExecutable(range), new Continuation.ListenerContinuation("Creation of our bloom filter", this.environment) { // from class: rice.p2p.replication.ReplicationImpl.2
            int total = 0;

            @Override // rice.Continuation.ListenerContinuation, rice.Continuation
            public void receiveResult(Object obj) {
                if (ReplicationImpl.this.destroyed) {
                    return;
                }
                final IdBloomFilter idBloomFilter = (IdBloomFilter) obj;
                for (int i = 0; i < neighborSet.size(); i++) {
                    final NodeHandle handle = neighborSet.getHandle(i);
                    try {
                        final IdRange intersectRange = ReplicationImpl.this.endpoint.range(handle, 0, handle.getId()).intersectRange(ReplicationImpl.this.getTotalRange());
                        if (intersectRange != null && !intersectRange.intersectRange(ReplicationImpl.this.getTotalRange()).isEmpty()) {
                            ReplicationImpl.this.endpoint.process(new BloomFilterExecutable(intersectRange), new Continuation.StandardContinuation(this) { // from class: rice.p2p.replication.ReplicationImpl.2.1
                                @Override // rice.Continuation
                                public void receiveResult(Object obj2) {
                                    IdBloomFilter idBloomFilter2 = (IdBloomFilter) obj2;
                                    if (ReplicationImpl.this.logger.level <= 500) {
                                        ReplicationImpl.this.logger.log("COUNT: Sending request to " + handle + " for range " + intersectRange + ", " + range + " in instance " + ReplicationImpl.this.instance);
                                    }
                                    ReplicationImpl.this.endpoint.route((Id) null, (RawMessage) new RequestMessage(ReplicationImpl.this.handle, new IdRange[]{intersectRange, range}, new IdBloomFilter[]{idBloomFilter2, idBloomFilter}), handle);
                                }
                            });
                        }
                    } catch (RangeCannotBeDeterminedException e) {
                    }
                }
                if (this.logger.level <= 500) {
                    this.logger.log("COUNT: Done sending replications requests with " + this.total + " in instance " + ReplicationImpl.this.instance);
                }
            }
        });
    }

    @Override // rice.p2p.commonapi.Application
    public boolean forward(RouteMessage routeMessage) {
        return true;
    }

    @Override // rice.p2p.commonapi.Application
    public void deliver(Id id, Message message) {
        if (this.logger.level <= 500) {
            this.logger.log("COUNT: Replication " + this.instance + " received message " + message);
        }
        if (message instanceof RequestMessage) {
            final RequestMessage requestMessage = (RequestMessage) message;
            Continuation.MultiContinuation multiContinuation = new Continuation.MultiContinuation(new Continuation.ListenerContinuation("Processing of RequestMessage", this.environment) { // from class: rice.p2p.replication.ReplicationImpl.3
                @Override // rice.Continuation.ListenerContinuation, rice.Continuation
                public void receiveResult(Object obj) {
                    Object[] objArr = (Object[]) obj;
                    IdSet[] idSetArr = new IdSet[objArr.length];
                    if (objArr.length <= 0 || !(objArr[0] instanceof Throwable)) {
                        try {
                            System.arraycopy(objArr, 0, idSetArr, 0, objArr.length);
                        } catch (ArrayStoreException e) {
                            if (objArr.length > 0) {
                                if (this.logger.level <= 1000) {
                                    this.logger.logException("Error copying " + objArr[0].getClass().getName() + ":" + objArr.length, e);
                                }
                                throw e;
                            }
                        }
                    } else if (this.logger.level <= 1000) {
                        this.logger.logException("Errors in Multicontinuation:", (Throwable) objArr[0]);
                    }
                    if (this.logger.level <= 500) {
                        this.logger.log("COUNT: Telling node " + requestMessage.getSource() + " to fetch");
                    }
                    ReplicationImpl.this.endpoint.route((Id) null, (RawMessage) new ResponseMessage(ReplicationImpl.this.handle, requestMessage.getRanges(), idSetArr), requestMessage.getSource());
                }
            }, requestMessage.getRanges().length);
            for (int i = 0; i < requestMessage.getRanges().length; i++) {
                final int i2 = i;
                this.endpoint.process(new Executable() { // from class: rice.p2p.replication.ReplicationImpl.4
                    public String toString() {
                        return "process " + i2 + " of " + requestMessage.getRanges().length + " namespace " + ReplicationImpl.this.instance;
                    }

                    @Override // rice.Executable
                    public Object execute() {
                        IdSet buildIdSet = ReplicationImpl.this.factory.buildIdSet();
                        requestMessage.getFilters()[i2].check(ReplicationImpl.this.client.scan(requestMessage.getRanges()[i2]), buildIdSet, ReplicationImpl.this.MAX_KEYS_IN_MESSAGE);
                        return buildIdSet;
                    }
                }, multiContinuation.getSubContinuation(i));
            }
            return;
        }
        if (!(message instanceof ResponseMessage)) {
            if (message instanceof ReminderMessage) {
                replicate();
                updateClient();
                return;
            } else {
                if (this.logger.level <= 900) {
                    this.logger.log("Received unknown message " + message + " - dropping on floor.");
                    return;
                }
                return;
            }
        }
        ResponseMessage responseMessage = (ResponseMessage) message;
        for (int i3 = 0; i3 < responseMessage.getIdSets().length; i3++) {
            IdSet buildIdSet = this.factory.buildIdSet();
            for (Id id2 : responseMessage.getIdSets()[i3]) {
                buildIdSet.addId(id2);
            }
            IdSet difference = this.policy.difference(this.client.scan(responseMessage.getRanges()[i3]), buildIdSet, this.factory);
            if (this.logger.level <= 500) {
                this.logger.log("COUNT: Was told to fetch " + difference.numElements() + " in instance " + this.instance);
            }
            if (difference.numElements() > 0) {
                this.client.fetch(difference, responseMessage.getSource());
            }
        }
    }

    @Override // rice.p2p.commonapi.Application
    public void update(NodeHandle nodeHandle, boolean z) {
        updateClient();
    }

    @Override // rice.Destructable
    public void destroy() {
        this.destroyed = true;
    }
}
