package org.dataone.cn.indexer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.FileTime;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPathExpressionException;
import org.apache.commons.codec.EncoderException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServerException;
import org.dataone.cn.indexer.annotation.OntologyModelService;
import org.dataone.cn.indexer.object.ObjectManagerFactory;
import org.dataone.configuration.Settings;
import org.dataone.exceptions.MarshallingException;
import org.dataone.indexer.queue.IndexQueueMessageParser;
import org.dataone.service.exceptions.InvalidRequest;
import org.dataone.service.exceptions.InvalidToken;
import org.dataone.service.exceptions.NotAuthorized;
import org.dataone.service.exceptions.NotFound;
import org.dataone.service.exceptions.NotImplemented;
import org.dataone.service.exceptions.ServiceFailure;
import org.dataone.service.exceptions.UnsupportedType;
import org.dataone.service.types.v1.Identifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.xml.sax.SAXException;

/* loaded from: input_file:org/dataone/cn/indexer/IndexWorker.class */
public class IndexWorker {
    public static final String CREATE_INDEX_TYPE = "create";
    public static final String DELETE_INDEX_TYPE = "delete";
    public static final String SYSMETA_CHANGE_TYPE = "sysmeta";
    public static final int HIGHEST_PRIORITY = 4;
    public static final int HIGH_PRIORITY = 3;
    public static final int MEDIUM_PRIORITY = 2;
    public static final int LOW_PRIORITY = 1;
    private static final String HEADER_ID = "id";
    private static final String HEADER_PATH = "path";
    private static final String HEADER_INDEX_TYPE = "index_type";
    private static final String EXCHANGE_NAME = "dataone-index";
    private static final String INDEX_QUEUE_NAME = "index";
    private static final String INDEX_ROUTING_KEY = "index";
    private static final String springConfigFileURL = "/index-parser-context.xml";
    private static final String ENV_NAME_OF_PROPERTIES_FILE = "DATAONE_INDEXER_CONFIG";
    protected boolean multipleThread;
    protected int nThreads;
    private Connection rabbitMQconnection;
    private Channel rabbitMQchannel;
    private ApplicationContext context;
    protected SolrIndex solrIndex;
    private ExecutorService executor;
    private ConnectionFactory factory;
    private final ReentrantLock connectionLock;
    private boolean isK8s;
    private Consumer consumer;
    private static Log logger = LogFactory.getLog(IndexWorker.class);
    private static String defaultExternalPropertiesFile = "/etc/dataone/dataone-indexer.properties";
    protected static String propertyFilePath = null;
    protected static int readinessInitialDelaySec = 2;
    protected static int readinessPeriodSec = 10;

    public static void main(String[] strArr) {
        logger.info("IndexWorker.main - Starting index worker...");
        String str = null;
        if (strArr != null && strArr.length == 1) {
            str = strArr[0];
            logger.debug("The external property file specified in the argument is " + str);
        }
        loadExternalPropertiesFile(str);
        try {
            IndexWorker indexWorker = new IndexWorker();
            indexWorker.start();
            indexWorker.startReadinessProbe();
            indexWorker.startLivenessProbe();
        } catch (Exception e) {
            logger.fatal("IndexWorker.main() exiting due to fatal error: " + e.getMessage(), e);
            System.exit(1);
        }
    }

    public static void loadExternalPropertiesFile(String str) {
        if (str != null && !str.trim().isEmpty()) {
            propertyFilePath = str;
            logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path specified by users is " + propertyFilePath);
            File file = new File(propertyFilePath);
            if (file.exists() && file.canRead()) {
                logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path users specified is  " + propertyFilePath + ". The file exists and is readable. So it will be used.");
            } else {
                logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path users specified is  " + propertyFilePath + ". But the file does NOT exist or is NOT readable. So it will NOT be used.");
                propertyFilePath = null;
            }
        }
        if (propertyFilePath == null || propertyFilePath.trim().isEmpty()) {
            propertyFilePath = System.getenv(ENV_NAME_OF_PROPERTIES_FILE);
            logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path from the env variable is " + propertyFilePath);
            if (propertyFilePath != null && !propertyFilePath.trim().equals("")) {
                File file2 = new File(propertyFilePath);
                if (file2.exists() && file2.canRead()) {
                    logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path can be read from the env variable DATAONE_INDEXER_CONFIG and its value is " + propertyFilePath + ". The file exists and it will be used.");
                } else {
                    logger.info("IndexWorker.loadExternalPropertiesFile - the configuration path can be read from the env variable DATAONE_INDEXER_CONFIG and its value is " + propertyFilePath + ". But the file does NOT exist or is NOT readable. So it will NOT be used.");
                    propertyFilePath = null;
                }
            }
        }
        if (propertyFilePath == null || propertyFilePath.trim().isEmpty()) {
            File file3 = new File(defaultExternalPropertiesFile);
            if (file3.exists() && file3.canRead()) {
                logger.info("IndexWorker.loadExternalPropertiesFile - the configure path can't be read either by users specified or from the env variable DATAONE_INDEXER_CONFIG. However, the default external file " + defaultExternalPropertiesFile + " exists and it will be used.");
                propertyFilePath = defaultExternalPropertiesFile;
            }
        }
        if (propertyFilePath == null || propertyFilePath.trim().isEmpty()) {
            logger.info("IndexWorker.loadExternalPropertiesFile - can't load an external properties file from the env variable DATAONE_INDEXER_CONFIG or from the default path " + defaultExternalPropertiesFile + ". Dataone-indexer will use the properties file embedded in the jar file");
            return;
        }
        try {
            Settings.augmentConfiguration(propertyFilePath);
            logger.info("IndexWorker.loadExternalPropertiesFile - loaded the properties from the file " + propertyFilePath);
        } catch (ConfigurationException e) {
            logger.error("IndexWorker.loadExternalPropertiesFile - can't load any properties from the file " + propertyFilePath + " since " + e.getMessage() + ". It will use the default properties in the jar file.");
        }
    }

    public static void loadAdditionalPropertyFile(String str) {
        if (str == null || str.trim().isEmpty()) {
            logger.info("IndexWorker.loadAdditionalPropertyFile - can't load an additional property file since its path is null or blank.");
            return;
        }
        try {
            Settings.augmentConfiguration(str);
            logger.info("IndexWorker.loadAdditionalPropertyFile - loaded the properties from the file " + str);
        } catch (ConfigurationException e) {
            logger.error("IndexWorker.loadAdditionalPropertyFile - can't load any properties from the file " + str + " since " + e.getMessage() + ".");
        }
    }

    public IndexWorker() throws IOException, TimeoutException, ServiceFailure, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
        this(true);
    }

    public IndexWorker(Boolean bool) throws IOException, TimeoutException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
        this.multipleThread = true;
        this.nThreads = 1;
        this.rabbitMQconnection = null;
        this.rabbitMQchannel = null;
        this.context = null;
        this.solrIndex = null;
        this.executor = null;
        this.factory = null;
        this.connectionLock = new ReentrantLock();
        this.isK8s = false;
        if (System.getenv("KUBERNETES_SERVICE_HOST") != null) {
            this.isK8s = true;
            logger.info("The index worker is in the k8s environment.");
        }
        if (bool.booleanValue()) {
            initExecutorService();
            initIndexQueue();
            initIndexParsers();
            ObjectManagerFactory.getObjectManager();
            OntologyModelService.getInstance();
        }
    }

    private void initIndexQueue() throws IOException, TimeoutException {
        String string = Settings.getConfiguration().getString("index.rabbitmq.hostname", "localhost");
        int i = Settings.getConfiguration().getInt("index.rabbitmq.hostport", 5672);
        String string2 = Settings.getConfiguration().getString("index.rabbitmq.username", "guest");
        String string3 = Settings.getConfiguration().getString("index.rabbitmq.password", "guest");
        this.factory = new ConnectionFactory();
        this.factory.setHost(string);
        this.factory.setPort(i);
        this.factory.setPassword(string3);
        this.factory.setUsername(string2);
        this.factory.setAutomaticRecoveryEnabled(true);
        this.factory.setNetworkRecoveryInterval(10000);
        logger.debug("Set RabbitMQ host to: " + string);
        logger.debug("Set RabbitMQ port to: " + i);
        generateConnectionAndChannel();
    }

    private void generateConnectionAndChannel() throws IOException, TimeoutException {
        int i = Settings.getConfiguration().getInt("index.rabbitmq.max.priority");
        this.rabbitMQconnection = this.factory.newConnection();
        this.rabbitMQchannel = this.rabbitMQconnection.createChannel();
        this.rabbitMQchannel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        HashMap hashMap = new HashMap();
        hashMap.put("x-max-priority", Integer.valueOf(i));
        logger.debug("Set RabbitMQ max priority to: " + i);
        this.rabbitMQchannel.queueDeclare("index", true, false, false, hashMap);
        this.rabbitMQchannel.queueBind("index", EXCHANGE_NAME, "index");
        logger.info("The allowed unacknowledged message(s) number is " + this.nThreads);
        this.rabbitMQchannel.basicQos(this.nThreads);
        logger.debug("Connected to the RabbitMQ queue with the name of index");
    }

    protected void initIndexParsers() {
        if (this.context == null) {
            synchronized (IndexWorker.class) {
                if (this.context == null) {
                    this.context = new ClassPathXmlApplicationContext(springConfigFileURL);
                }
            }
        }
        this.solrIndex = (SolrIndex) this.context.getBean("solrIndex");
    }

    protected void initExecutorService() {
        int i;
        String string = Settings.getConfiguration().getString("index.thread.number", "0");
        try {
            i = Integer.parseInt(string);
        } catch (NumberFormatException e) {
            i = 0;
            logger.warn("IndexWorker.initExecutorService - IndexWorker cannot parse the string " + string + " specified by property index.thread.number into a number since " + e.getLocalizedMessage() + ". The default value 0 will be used as the specified value");
        }
        int availableProcessors = Runtime.getRuntime().availableProcessors() - 1;
        this.nThreads = Math.max(1, availableProcessors);
        if (i > 0 && i < this.nThreads) {
            this.nThreads = i;
        }
        if (this.nThreads == 1) {
            logger.info("IndexWorker.initExecutorService - the size of index thread pool specified in the propery file is " + i + ". The size computed from the available processors is " + availableProcessors + ". Final computed thread pool size for index executor: " + this.nThreads + ". Since its value is 1, we do NOT need the executor service and use a single thread way.");
            this.multipleThread = false;
        } else {
            logger.info("IndexWorker.initExecutorService - the size of index thread pool specified in the propery file is " + i + ". The size computed from the available processors is " + availableProcessors + ". Final computed thread pool size for index executor: " + this.nThreads);
            this.executor = Executors.newFixedThreadPool(this.nThreads);
            this.multipleThread = true;
        }
    }

    public void start() throws IOException {
        this.consumer = new DefaultConsumer(this.rabbitMQchannel) { // from class: org.dataone.cn.indexer.IndexWorker.1
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                IndexWorker.logger.debug("Received message with delivery tag: " + envelope.getDeliveryTag());
                IndexWorker.this.rabbitMQchannel.basicAck(envelope.getDeliveryTag(), false);
                final IndexQueueMessageParser indexQueueMessageParser = new IndexQueueMessageParser();
                try {
                    indexQueueMessageParser.parse(basicProperties, bArr);
                    if (IndexWorker.this.multipleThread) {
                        IndexWorker.logger.debug("using multiple threads to index identifier " + indexQueueMessageParser.getIdentifier().getValue());
                        IndexWorker.this.executor.submit(new Runnable() { // from class: org.dataone.cn.indexer.IndexWorker.1.1
                            @Override // java.lang.Runnable
                            public void run() {
                                IndexWorker.this.indexObject(indexQueueMessageParser, IndexWorker.this.multipleThread);
                            }
                        });
                    } else {
                        IndexWorker.logger.debug("using single thread to index identifier " + indexQueueMessageParser.getIdentifier().getValue());
                        IndexWorker.this.indexObject(indexQueueMessageParser, IndexWorker.this.multipleThread);
                    }
                } catch (InvalidRequest e) {
                    String str2 = "Cannot index the task for the object since " + e.getMessage();
                    if (indexQueueMessageParser.getIdentifier() != null) {
                        str2 = str2 + " with the identifier " + indexQueueMessageParser.getIdentifier().getValue();
                    }
                    IndexWorker.logger.error(str2);
                    IndexWorker.this.rabbitMQchannel.basicReject(envelope.getDeliveryTag(), false);
                }
            }
        };
        this.rabbitMQchannel.basicConsume("index", false, this.consumer);
        logger.info("IndexWorker.start - Calling basicConsume and waiting for the coming messages");
    }

    private void recreateConnection() throws IOException {
        this.connectionLock.lock();
        try {
            if (this.rabbitMQchannel != null && this.rabbitMQchannel.isOpen()) {
                try {
                    this.rabbitMQchannel.close();
                    logger.debug("After closing the RabbitMQ channel.");
                } catch (Exception e) {
                    logger.warn("The rabbitmq channel can't be closed since " + e.getMessage());
                }
            }
            if (this.rabbitMQconnection != null && this.rabbitMQconnection.isOpen()) {
                try {
                    this.rabbitMQconnection.close();
                    logger.debug("After closing the RabbitMQ connection.");
                } catch (Exception e2) {
                    logger.warn("The rabbitmq connection can't be closed since " + e2.getMessage());
                }
            }
            try {
                generateConnectionAndChannel();
                if (this.consumer == null) {
                    throw new RuntimeException("The consumer object is null and hasn't been initialized. IndexWorker.start should be called first.");
                }
                this.rabbitMQchannel.basicConsume("index", false, this.consumer);
                logger.debug("RabbitMQ connection and channel successfully re-created");
            } catch (IOException | TimeoutException e3) {
                throw new IOException("Exception trying to re-initialize connection and channel: " + e3.getMessage(), e3);
            }
        } finally {
            this.connectionLock.unlock();
            logger.debug("The connection lock was released");
        }
    }

    private void indexObject(IndexQueueMessageParser indexQueueMessageParser, boolean z) {
        long currentTimeMillis = System.currentTimeMillis();
        Identifier identifier = indexQueueMessageParser.getIdentifier();
        String indexType = indexQueueMessageParser.getIndexType();
        int priority = indexQueueMessageParser.getPriority();
        String docId = indexQueueMessageParser.getDocId();
        try {
            long id = Thread.currentThread().getId();
            Log log = logger;
            log.info("IndexWorker.consumer.indexObject by multiple thread? " + z + ", with the thread id " + id + " - Received the index task from the index queue with the identifier: " + log + " , the index type: " + identifier.getValue() + ", the priority: " + indexType + ", the docId(can be null): " + priority);
            boolean z2 = -1;
            switch (indexType.hashCode()) {
                case -1737578926:
                    if (indexType.equals(SYSMETA_CHANGE_TYPE)) {
                        z2 = true;
                        break;
                    }
                    break;
                case -1352294148:
                    if (indexType.equals(CREATE_INDEX_TYPE)) {
                        z2 = false;
                        break;
                    }
                    break;
                case -1335458389:
                    if (indexType.equals(DELETE_INDEX_TYPE)) {
                        z2 = 2;
                        break;
                    }
                    break;
            }
            switch (z2) {
                case false:
                    this.solrIndex.update(identifier, false, docId);
                    break;
                case LOW_PRIORITY /* 1 */:
                    this.solrIndex.update(identifier, true, docId);
                    break;
                case MEDIUM_PRIORITY /* 2 */:
                    this.solrIndex.remove(identifier);
                    break;
                default:
                    throw new InvalidRequest("0000", "DataONE indexer does not know the index type: " + indexType + " in the index task");
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            Log log2 = logger;
            long j = currentTimeMillis2 - currentTimeMillis;
            log2.info("IndexWorker.indexOjbect with the thread id " + id + " - Completed the index task from the index queue with the identifier: " + log2 + " , the index type: " + identifier.getValue() + ", the priority: " + indexType + " and the time taking is " + priority + " milliseconds");
        } catch (InvalidToken | NotAuthorized | NotImplemented | NotFound | InvalidRequest | ServiceFailure | IOException | ClassNotFoundException | IllegalAccessException | InstantiationException | InterruptedException | XPathExpressionException | UnsupportedType | NoSuchMethodException | InvocationTargetException | ParserConfigurationException | EncoderException | SAXException | SolrServerException | MarshallingException e) {
            logger.error("Cannot index the task for identifier " + identifier.getValue() + " since " + e.getMessage(), e);
        }
    }

    private void startLivenessProbe() {
        if (!this.isK8s) {
            logger.debug("This is a non-k8s deployment and LivenessProbe do nothing.");
            return;
        }
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
        Path path = Paths.get("./livenessprobe", new String[0]);
        newScheduledThreadPool.scheduleAtFixedRate(() -> {
            try {
                Files.setLastModifiedTime(path, FileTime.fromMillis(System.currentTimeMillis()));
            } catch (IOException e) {
                logger.error("IndexWorker.startLivenessProbe - failed to update file: " + path, e);
            }
        }, 0L, 10L, TimeUnit.SECONDS);
        logger.info("IndexWorker.startLivenessProbe - livenessProbe started");
    }

    public void startReadinessProbe() {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
        Path path = Paths.get("./readinessprobe", new String[0]);
        newScheduledThreadPool.scheduleAtFixedRate(() -> {
            try {
                if (this.isK8s && !Files.exists(path, new LinkOption[0])) {
                    Files.createFile(path, new FileAttribute[0]);
                }
                if (this.rabbitMQconnection == null || this.rabbitMQchannel == null || !this.rabbitMQconnection.isOpen() || !this.rabbitMQchannel.isOpen()) {
                    logger.error("The RabbitMQ connection or channel were closed. DataONE-indexer has a mechanism to restore them. However, if this error message shows up repeatedly and there is no network outage, intervention may be required (e.g. checking configuration)");
                    try {
                        recreateConnection();
                    } catch (IOException | RuntimeException e) {
                        logger.error("DataONE-indexer cannot recreate the RabbitMQ connections/channels since " + e.getMessage());
                    }
                } else {
                    if (this.isK8s) {
                        Files.setLastModifiedTime(path, FileTime.fromMillis(System.currentTimeMillis()));
                    }
                    logger.debug("The RabbitMQ connection and channel are healthy.");
                }
            } catch (IOException e2) {
                logger.error("Failed to update file: " + path, e2);
            }
        }, readinessInitialDelaySec, readinessPeriodSec, TimeUnit.SECONDS);
        logger.info("ReadinessProb started");
    }

    protected Connection getRabbitMQconnection() {
        return this.rabbitMQconnection;
    }

    protected Channel getRabbitMQchannel() {
        return this.rabbitMQchannel;
    }
}
