package org.apache.zookeeper.server.quorum;

import com.rabbitmq.client.LongString;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zookeeper/server/quorum/ObserverMaster.class */
public class ObserverMaster extends LearnerMaster implements Runnable {
    private QuorumPeer self;
    private FollowerZooKeeperServer zks;
    private int port;
    private static final int PKTS_SIZE_LIMIT = 33554432;
    private long lastProposedZxid;
    private Thread thread;
    private ServerSocket ss;
    private boolean listenerRunning;
    private ScheduledExecutorService pinger;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ObserverMaster.class);
    private static volatile int pktsSizeLimit = Integer.getInteger("zookeeper.observerMaster.sizeLimit", 33554432).intValue();
    private final AtomicLong followerCounter = new AtomicLong(-1);
    private Set<LearnerHandler> activeObservers = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ConcurrentHashMap<LearnerHandler, LearnerHandlerBean> connectionBeans = new ConcurrentHashMap<>();
    private ConcurrentLinkedQueue<QuorumPacket> proposedPkts = new ConcurrentLinkedQueue<>();
    private ConcurrentLinkedQueue<QuorumPacket> committedPkts = new ConcurrentLinkedQueue<>();
    private int pktsSize = 0;
    private final Object revalidateSessionLock = new Object();
    private final ConcurrentLinkedQueue<Revalidation> pendingRevalidations = new ConcurrentLinkedQueue<>();
    Runnable ping = new Runnable() { // from class: org.apache.zookeeper.server.quorum.ObserverMaster.1
        @Override // java.lang.Runnable
        public void run() {
            Iterator it = ObserverMaster.this.activeObservers.iterator();
            while (it.hasNext()) {
                ((LearnerHandler) it.next()).ping();
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/zookeeper/server/quorum/ObserverMaster$Revalidation.class */
    public static class Revalidation {
        public final long sessionId;
        public final int timeout;
        public final LearnerHandler handler;

        Revalidation(Long l, int i, LearnerHandler learnerHandler) {
            this.sessionId = l.longValue();
            this.timeout = i;
            this.handler = learnerHandler;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Revalidation revalidation = (Revalidation) obj;
            return this.sessionId == revalidation.sessionId && this.timeout == revalidation.timeout && this.handler.equals(revalidation.handler);
        }

        public int hashCode() {
            return (31 * ((31 * ((int) (this.sessionId ^ (this.sessionId >>> 32)))) + this.timeout)) + this.handler.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ObserverMaster(QuorumPeer quorumPeer, FollowerZooKeeperServer followerZooKeeperServer, int i) {
        this.self = quorumPeer;
        this.zks = followerZooKeeperServer;
        this.port = i;
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void addLearnerHandler(LearnerHandler learnerHandler) {
        if (!this.listenerRunning) {
            throw new RuntimeException("ObserverMaster is not running");
        }
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void removeLearnerHandler(LearnerHandler learnerHandler) {
        this.activeObservers.remove(learnerHandler);
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public int syncTimeout() {
        return this.self.getSyncLimit() * this.self.getTickTime();
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public int getTickOfNextAckDeadline() {
        return this.self.tick.get() + this.self.syncLimit;
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public int getTickOfInitialAckDeadline() {
        return this.self.tick.get() + this.self.initLimit + this.self.syncLimit;
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public long getAndDecrementFollowerCounter() {
        return this.followerCounter.getAndDecrement();
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void waitForEpochAck(long j, StateSummary stateSummary) throws IOException, InterruptedException {
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void waitForStartup() throws InterruptedException {
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public synchronized long getLastProposed() {
        return this.lastProposedZxid;
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public long getEpochToPropose(long j, long j2) throws InterruptedException, IOException {
        return this.self.getCurrentEpoch();
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public ZKDatabase getZKDatabase() {
        return this.zks.getZKDatabase();
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void waitForNewLeaderAck(long j, long j2) throws InterruptedException {
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public int getCurrentTick() {
        return this.self.tick.get();
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void processAck(long j, long j2, SocketAddress socketAddress) {
        if ((j2 & LongString.MAX_LENGTH) != 0) {
            throw new RuntimeException("Observers shouldn't send ACKS ack = " + Long.toHexString(j2));
        }
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void touch(long j, int i) {
        this.zks.getSessionTracker().touchSession(j, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean revalidateLearnerSession(QuorumPacket quorumPacket) throws IOException {
        DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(quorumPacket.getData()));
        long readLong = dataInputStream.readLong();
        boolean readBoolean = dataInputStream.readBoolean();
        Iterator<Revalidation> it = this.pendingRevalidations.iterator();
        if (!it.hasNext()) {
            return false;
        }
        Revalidation next = it.next();
        if (next.sessionId != readLong) {
            return false;
        }
        it.remove();
        next.handler.queuePacket(new QuorumPacket(quorumPacket.getType(), quorumPacket.getZxid(), Arrays.copyOf(quorumPacket.getData(), quorumPacket.getData().length), quorumPacket.getAuthinfo() == null ? null : new ArrayList(quorumPacket.getAuthinfo())));
        if (!readBoolean) {
            return true;
        }
        touch(next.sessionId, next.timeout);
        return true;
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void revalidateSession(QuorumPacket quorumPacket, LearnerHandler learnerHandler) throws IOException {
        DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(quorumPacket.getData()));
        long readLong = dataInputStream.readLong();
        int readInt = dataInputStream.readInt();
        synchronized (this.revalidateSessionLock) {
            this.pendingRevalidations.add(new Revalidation(Long.valueOf(readLong), readInt, learnerHandler));
            Learner learner = this.zks.getLearner();
            if (learner != null) {
                learner.writePacket(quorumPacket, true);
            }
        }
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void submitLearnerRequest(Request request) {
        this.zks.processObserverRequest(request);
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public synchronized long startForwarding(LearnerHandler learnerHandler, long j) {
        Iterator<QuorumPacket> it = this.committedPkts.iterator();
        if (it.hasNext()) {
            QuorumPacket next = it.next();
            if (next.getZxid() > j + 1) {
                LOG.error("LearnerHandler is too far behind (0x{} < 0x{}), disconnecting {} at {}", Long.toHexString(j + 1), Long.toHexString(next.getZxid()), Long.valueOf(learnerHandler.getSid()), learnerHandler.getRemoteAddress());
                learnerHandler.shutdown();
                return -1L;
            }
            if (next.getZxid() == j + 1) {
                learnerHandler.queuePacket(next);
            }
            long zxid = next.getZxid();
            long packetSize = LearnerHandler.packetSize(next);
            while (it.hasNext()) {
                next = it.next();
                if (next.getZxid() > j) {
                    learnerHandler.queuePacket(next);
                    packetSize += LearnerHandler.packetSize(next);
                }
            }
            LOG.info("finished syncing observer from retained commit queue: sid {}, queue head 0x{}, queue tail 0x{}, sync position 0x{}, num packets used {}, num bytes used {}", Long.valueOf(learnerHandler.getSid()), Long.toHexString(zxid), Long.toHexString(next.getZxid()), Long.toHexString(j), Long.valueOf(next.getZxid() - j), Long.valueOf(packetSize));
        }
        this.activeObservers.add(learnerHandler);
        return this.lastProposedZxid;
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public long getQuorumVerifierVersion() {
        return this.self.getQuorumVerifier().getVersion();
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public String getPeerInfo(long j) {
        QuorumPeer.QuorumServer quorumServer = this.self.getView().get(Long.valueOf(j));
        return quorumServer == null ? "" : quorumServer.toString();
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public byte[] getQuorumVerifierBytes() {
        return this.self.getLastSeenQuorumVerifier().toString().getBytes();
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public QuorumAuthServer getQuorumAuthServer() {
        if (this.self == null) {
            return null;
        }
        return this.self.authServer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void proposalReceived(QuorumPacket quorumPacket) {
        this.proposedPkts.add(new QuorumPacket(8, quorumPacket.getZxid(), quorumPacket.getData(), null));
    }

    private synchronized QuorumPacket removeProposedPacket(long j) {
        QuorumPacket peek = this.proposedPkts.peek();
        if (peek == null || peek.getZxid() > j) {
            LOG.debug("ignore missing proposal packet for {}", Long.toHexString(j));
            return null;
        }
        if (peek.getZxid() == j) {
            this.proposedPkts.remove();
            return peek;
        }
        String format = String.format("Unexpected proposal packet on commit ack, expected zxid 0x%d got zxid 0x%d", Long.valueOf(j), Long.valueOf(peek.getZxid()));
        LOG.error(format);
        throw new RuntimeException(format);
    }

    private synchronized void cacheCommittedPacket(QuorumPacket quorumPacket) {
        this.committedPkts.add(quorumPacket);
        this.pktsSize = (int) (this.pktsSize + LearnerHandler.packetSize(quorumPacket));
        int i = 0;
        while (true) {
            if (this.pktsSize <= pktsSizeLimit * 0.8d || i >= 5) {
                break;
            }
            QuorumPacket poll = this.committedPkts.poll();
            if (poll == null) {
                this.pktsSize = 0;
                break;
            } else {
                this.pktsSize = (int) (this.pktsSize - LearnerHandler.packetSize(poll));
                i++;
            }
        }
        while (this.pktsSize > pktsSizeLimit) {
            QuorumPacket poll2 = this.committedPkts.poll();
            if (poll2 == null) {
                this.pktsSize = 0;
                return;
            }
            this.pktsSize = (int) (this.pktsSize - LearnerHandler.packetSize(poll2));
        }
    }

    private synchronized void sendPacket(QuorumPacket quorumPacket) {
        Iterator<LearnerHandler> it = this.activeObservers.iterator();
        while (it.hasNext()) {
            it.next().queuePacket(quorumPacket);
        }
        this.lastProposedZxid = quorumPacket.getZxid();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void proposalCommitted(long j) {
        QuorumPacket removeProposedPacket = removeProposedPacket(j);
        if (removeProposedPacket == null) {
            return;
        }
        cacheCommittedPacket(removeProposedPacket);
        sendPacket(removeProposedPacket);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void informAndActivate(long j, long j2) {
        QuorumPacket removeProposedPacket = removeProposedPacket(j);
        if (removeProposedPacket == null) {
            return;
        }
        QuorumPacket buildInformAndActivePacket = Leader.buildInformAndActivePacket(j, j2, removeProposedPacket.getData());
        cacheCommittedPacket(buildInformAndActivePacket);
        sendPacket(buildInformAndActivePacket);
    }

    public synchronized void start() throws IOException {
        if (this.thread == null || !this.thread.isAlive()) {
            this.listenerRunning = true;
            InetAddress address = this.self.getQuorumAddress().getReachableOrOne().getAddress();
            if (this.self.shouldUsePortUnification() || this.self.isSslQuorum()) {
                boolean shouldUsePortUnification = this.self.shouldUsePortUnification();
                if (this.self.getQuorumListenOnAllIPs()) {
                    this.ss = new UnifiedServerSocket(this.self.getX509Util(), shouldUsePortUnification, this.port, 10);
                } else {
                    this.ss = new UnifiedServerSocket(this.self.getX509Util(), shouldUsePortUnification, this.port, 10, address);
                }
            } else if (this.self.getQuorumListenOnAllIPs()) {
                this.ss = new ServerSocket(this.port, 10);
            } else {
                this.ss = new ServerSocket(this.port, 10, address);
            }
            this.thread = new Thread(this, "ObserverMaster");
            this.thread.start();
            this.pinger = Executors.newSingleThreadScheduledExecutor();
            this.pinger.scheduleAtFixedRate(this.ping, this.self.tickTime / 2, this.self.tickTime / 2, TimeUnit.MILLISECONDS);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        ServerSocket serverSocket;
        synchronized (this) {
            serverSocket = this.ss;
        }
        while (this.listenerRunning) {
            try {
                Socket accept = serverSocket.accept();
                accept.setSoTimeout(this.self.tickTime * this.self.initLimit);
                new LearnerHandler(accept, new BufferedInputStream(accept.getInputStream()), this).start();
            } catch (Exception e) {
                if (this.listenerRunning) {
                    LOG.debug("Ignoring accept exception (maybe shutting down)", (Throwable) e);
                } else {
                    LOG.debug("Ignoring accept exception (maybe client closed)", (Throwable) e);
                }
            }
        }
    }

    public synchronized void stop() {
        this.listenerRunning = false;
        if (this.pinger != null) {
            this.pinger.shutdownNow();
        }
        if (this.ss != null) {
            try {
                this.ss.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        Iterator<LearnerHandler> it = this.activeObservers.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumActiveObservers() {
        return this.activeObservers.size();
    }

    public Iterable<Map<String, Object>> getActiveObservers() {
        HashSet hashSet = new HashSet();
        Iterator<LearnerHandler> it = this.activeObservers.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getLearnerHandlerInfo());
        }
        return hashSet;
    }

    public void resetObserverConnectionStats() {
        Iterator<LearnerHandler> it = this.activeObservers.iterator();
        while (it.hasNext()) {
            it.next().resetObserverConnectionStats();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getPktsSizeLimit() {
        return pktsSizeLimit;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void setPktsSizeLimit(int i) {
        pktsSizeLimit = i;
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void registerLearnerHandlerBean(LearnerHandler learnerHandler, Socket socket) {
        LearnerHandlerBean learnerHandlerBean = new LearnerHandlerBean(learnerHandler, socket);
        if (this.zks.registerJMX(learnerHandlerBean)) {
            this.connectionBeans.put(learnerHandler, learnerHandlerBean);
        }
    }

    @Override // org.apache.zookeeper.server.quorum.LearnerMaster
    public void unregisterLearnerHandlerBean(LearnerHandler learnerHandler) {
        LearnerHandlerBean remove = this.connectionBeans.remove(learnerHandler);
        if (remove != null) {
            MBeanRegistry.getInstance().unregister(remove);
        }
    }
}
