/*
 * Decompiled with CFR 0.152.
 */
package org.dataone.cn.indexer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.FileTime;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPathExpressionException;
import org.apache.commons.codec.EncoderException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServerException;
import org.dataone.cn.indexer.SolrIndex;
import org.dataone.cn.indexer.annotation.OntologyModelService;
import org.dataone.cn.indexer.object.ObjectManagerFactory;
import org.dataone.configuration.Settings;
import org.dataone.exceptions.MarshallingException;
import org.dataone.indexer.queue.IndexQueueMessageParser;
import org.dataone.service.exceptions.InvalidRequest;
import org.dataone.service.exceptions.InvalidToken;
import org.dataone.service.exceptions.NotAuthorized;
import org.dataone.service.exceptions.NotFound;
import org.dataone.service.exceptions.NotImplemented;
import org.dataone.service.exceptions.ServiceFailure;
import org.dataone.service.exceptions.UnsupportedType;
import org.dataone.service.types.v1.Identifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.xml.sax.SAXException;

public class IndexWorker {
    public static final String CREATE_INDEX_TYPE = "create";
    public static final String DELETE_INDEX_TYPE = "delete";
    public static final String SYSMETA_CHANGE_TYPE = "sysmeta";
    public static final int HIGHEST_PRIORITY = 4;
    public static final int HIGH_PRIORITY = 3;
    public static final int MEDIUM_PRIORITY = 2;
    public static final int LOW_PRIORITY = 1;
    private static final String HEADER_ID = "id";
    private static final String HEADER_PATH = "path";
    private static final String HEADER_INDEX_TYPE = "index_type";
    private static final String EXCHANGE_NAME = "dataone-index";
    private static final String INDEX_QUEUE_NAME = "index";
    private static final String INDEX_ROUTING_KEY = "index";
    private static final String springConfigFileURL = "/index-parser-context.xml";
    private static final String ENV_NAME_OF_PROPERTIES_FILE = "DATAONE_INDEXER_CONFIG";
    private static Log logger = LogFactory.getLog(IndexWorker.class);
    private static String defaultExternalPropertiesFile = "/etc/dataone/dataone-indexer.properties";
    protected static String propertyFilePath = null;
    protected boolean multipleThread = true;
    protected int nThreads = 1;
    private Connection rabbitMQconnection = null;
    private Channel rabbitMQchannel = null;
    private ApplicationContext context = null;
    protected SolrIndex solrIndex = null;
    private ExecutorService executor = null;
    private ConnectionFactory factory = null;
    private final ReentrantLock connectionLock = new ReentrantLock();
    private boolean isK8s = false;
    private Consumer consumer;
    protected static int readinessInitialDelaySec = 2;
    protected static int readinessPeriodSec = 10;

    public static void main(String[] args) {
        logger.info((Object)"IndexWorker.main - Starting index worker...");
        String propertyFile = null;
        if (args != null && args.length == 1) {
            propertyFile = args[0];
            logger.debug((Object)("The external property file specified in the argument is " + propertyFile));
        }
        IndexWorker.loadExternalPropertiesFile(propertyFile);
        try {
            IndexWorker worker = new IndexWorker();
            worker.start();
            worker.startReadinessProbe();
            worker.startLivenessProbe();
        }
        catch (Exception e) {
            logger.fatal((Object)("IndexWorker.main() exiting due to fatal error: " + e.getMessage()), (Throwable)e);
            System.exit(1);
        }
    }

    public static void loadExternalPropertiesFile(String propertyFile) {
        File defaultFile;
        if (propertyFile != null && !propertyFile.trim().isEmpty()) {
            propertyFilePath = propertyFile;
            logger.info((Object)("IndexWorker.loadExternalPropertiesFile - the configuration path specified by users is " + propertyFilePath));
            defaultFile = new File(propertyFilePath);
            if (defaultFile.exists() && defaultFile.canRead()) {
                logger.info((Object)("IndexWorker.loadExternalPropertiesFile - the configuration path users specified is  " + propertyFilePath + ". The file exists and is readable. So it will be used."));
            } else {
                logger.info((Object)("IndexWorker.loadExternalPropertiesFile - the configuration path users specified is  " + propertyFilePath + ". But the file does NOT exist or is NOT readable. So it will NOT be used."));
                propertyFilePath = null;
            }
        }
        if (propertyFilePath == null || propertyFilePath.trim().isEmpty()) {
            propertyFilePath = System.getenv(ENV_NAME_OF_PROPERTIES_FILE);
            logger.info((Object)("IndexWorker.loadExternalPropertiesFile - the configuration path from the env variable is " + propertyFilePath));
            if (propertyFilePath != null && !propertyFilePath.trim().equals("")) {
                defaultFile = new File(propertyFilePath);
                if (defaultFile.exists() && defaultFile.canRead()) {
                    logger.info((Object)("IndexWorker.loadExternalPropertiesFile - the configuration path can be read from the env variable DATAONE_INDEXER_CONFIG and its value is " + propertyFilePath + ". The file exists and it will be used."));
                } else {
                    logger.info((Object)("IndexWorker.loadExternalPropertiesFile - the configuration path can be read from the env variable DATAONE_INDEXER_CONFIG and its value is " + propertyFilePath + ". But the file does NOT exist or is NOT readable. So it will NOT be used."));
                    propertyFilePath = null;
                }
            }
        }
        if ((propertyFilePath == null || propertyFilePath.trim().isEmpty()) && (defaultFile = new File(defaultExternalPropertiesFile)).exists() && defaultFile.canRead()) {
            logger.info((Object)("IndexWorker.loadExternalPropertiesFile - the configure path can't be read either by users specified or from the env variable DATAONE_INDEXER_CONFIG. However, the default external file " + defaultExternalPropertiesFile + " exists and it will be used."));
            propertyFilePath = defaultExternalPropertiesFile;
        }
        if (propertyFilePath != null && !propertyFilePath.trim().isEmpty()) {
            try {
                Settings.augmentConfiguration((String)propertyFilePath);
                logger.info((Object)("IndexWorker.loadExternalPropertiesFile - loaded the properties from the file " + propertyFilePath));
            }
            catch (ConfigurationException e) {
                logger.error((Object)("IndexWorker.loadExternalPropertiesFile - can't load any properties from the file " + propertyFilePath + " since " + e.getMessage() + ". It will use the default properties in the jar file."));
            }
        } else {
            logger.info((Object)("IndexWorker.loadExternalPropertiesFile - can't load an external properties file from the env variable DATAONE_INDEXER_CONFIG or from the default path " + defaultExternalPropertiesFile + ". Dataone-indexer will use the properties file embedded in the jar file"));
        }
    }

    public static void loadAdditionalPropertyFile(String propertyFile) {
        if (propertyFile != null && !propertyFile.trim().isEmpty()) {
            try {
                Settings.augmentConfiguration((String)propertyFile);
                logger.info((Object)("IndexWorker.loadAdditionalPropertyFile - loaded the properties from the file " + propertyFile));
            }
            catch (ConfigurationException e) {
                logger.error((Object)("IndexWorker.loadAdditionalPropertyFile - can't load any properties from the file " + propertyFile + " since " + e.getMessage() + "."));
            }
        } else {
            logger.info((Object)"IndexWorker.loadAdditionalPropertyFile - can't load an additional property file since its path is null or blank.");
        }
    }

    public IndexWorker() throws IOException, TimeoutException, ServiceFailure, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
        this(true);
    }

    public IndexWorker(Boolean initialize) throws IOException, TimeoutException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
        String value = System.getenv("KUBERNETES_SERVICE_HOST");
        if (value != null) {
            this.isK8s = true;
            logger.info((Object)"The index worker is in the k8s environment.");
        }
        if (initialize.booleanValue()) {
            this.initExecutorService();
            this.initIndexQueue();
            this.initIndexParsers();
            ObjectManagerFactory.getObjectManager();
            OntologyModelService.getInstance();
        }
    }

    private void initIndexQueue() throws IOException, TimeoutException {
        String rabbitMQhost = Settings.getConfiguration().getString("index.rabbitmq.hostname", "localhost");
        int rabbitMQport = Settings.getConfiguration().getInt("index.rabbitmq.hostport", 5672);
        String rabbitMQusername = Settings.getConfiguration().getString("index.rabbitmq.username", "guest");
        String rabbitMQpassword = Settings.getConfiguration().getString("index.rabbitmq.password", "guest");
        this.factory = new ConnectionFactory();
        this.factory.setHost(rabbitMQhost);
        this.factory.setPort(rabbitMQport);
        this.factory.setPassword(rabbitMQpassword);
        this.factory.setUsername(rabbitMQusername);
        this.factory.setAutomaticRecoveryEnabled(true);
        this.factory.setNetworkRecoveryInterval(10000);
        logger.debug((Object)("Set RabbitMQ host to: " + rabbitMQhost));
        logger.debug((Object)("Set RabbitMQ port to: " + rabbitMQport));
        this.generateConnectionAndChannel();
    }

    private void generateConnectionAndChannel() throws IOException, TimeoutException {
        int rabbitMQMaxPriority = Settings.getConfiguration().getInt("index.rabbitmq.max.priority");
        boolean durable = true;
        this.rabbitMQconnection = this.factory.newConnection();
        this.rabbitMQchannel = this.rabbitMQconnection.createChannel();
        this.rabbitMQchannel.exchangeDeclare(EXCHANGE_NAME, "direct", durable);
        boolean exclusive = false;
        boolean autoDelete = false;
        HashMap<String, Integer> args = new HashMap<String, Integer>();
        args.put("x-max-priority", rabbitMQMaxPriority);
        logger.debug((Object)("Set RabbitMQ max priority to: " + rabbitMQMaxPriority));
        this.rabbitMQchannel.queueDeclare("index", durable, exclusive, autoDelete, args);
        this.rabbitMQchannel.queueBind("index", EXCHANGE_NAME, "index");
        logger.info((Object)("The allowed unacknowledged message(s) number is " + this.nThreads));
        this.rabbitMQchannel.basicQos(this.nThreads);
        logger.debug((Object)"Connected to the RabbitMQ queue with the name of index");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    protected void initIndexParsers() {
        if (this.context == null) {
            Class<IndexWorker> clazz = IndexWorker.class;
            // MONITORENTER : org.dataone.cn.indexer.IndexWorker.class
            if (this.context == null) {
                this.context = new ClassPathXmlApplicationContext(springConfigFileURL);
            }
            // MONITOREXIT : clazz
        }
        this.solrIndex = (SolrIndex)this.context.getBean("solrIndex");
    }

    protected void initExecutorService() {
        int specifiedThreadNumber;
        String specifiedThreadNumberStr = Settings.getConfiguration().getString("index.thread.number", "0");
        try {
            specifiedThreadNumber = Integer.parseInt(specifiedThreadNumberStr);
        }
        catch (NumberFormatException e) {
            specifiedThreadNumber = 0;
            logger.warn((Object)("IndexWorker.initExecutorService - IndexWorker cannot parse the string " + specifiedThreadNumberStr + " specified by property index.thread.number into a number since " + e.getLocalizedMessage() + ". The default value 0 will be used as the specified value"));
        }
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        this.nThreads = Math.max(1, --availableProcessors);
        if (specifiedThreadNumber > 0 && specifiedThreadNumber < this.nThreads) {
            this.nThreads = specifiedThreadNumber;
        }
        if (this.nThreads != 1) {
            logger.info((Object)("IndexWorker.initExecutorService - the size of index thread pool specified in the propery file is " + specifiedThreadNumber + ". The size computed from the available processors is " + availableProcessors + ". Final computed thread pool size for index executor: " + this.nThreads));
            this.executor = Executors.newFixedThreadPool(this.nThreads);
            this.multipleThread = true;
        } else {
            logger.info((Object)("IndexWorker.initExecutorService - the size of index thread pool specified in the propery file is " + specifiedThreadNumber + ". The size computed from the available processors is " + availableProcessors + ". Final computed thread pool size for index executor: " + this.nThreads + ". Since its value is 1, we do NOT need the executor service and use a single thread way."));
            this.multipleThread = false;
        }
    }

    public void start() throws IOException {
        this.consumer = new DefaultConsumer(this.rabbitMQchannel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                logger.debug((Object)("Received message with delivery tag: " + envelope.getDeliveryTag()));
                IndexWorker.this.rabbitMQchannel.basicAck(envelope.getDeliveryTag(), false);
                final IndexQueueMessageParser parser = new IndexQueueMessageParser();
                try {
                    parser.parse(properties, body);
                    if (IndexWorker.this.multipleThread) {
                        logger.debug((Object)("using multiple threads to index identifier " + parser.getIdentifier().getValue()));
                        Runnable runner = new Runnable(){

                            @Override
                            public void run() {
                                IndexWorker.this.indexObject(parser, IndexWorker.this.multipleThread);
                            }
                        };
                        IndexWorker.this.executor.submit(runner);
                    } else {
                        logger.debug((Object)("using single thread to index identifier " + parser.getIdentifier().getValue()));
                        IndexWorker.this.indexObject(parser, IndexWorker.this.multipleThread);
                    }
                }
                catch (InvalidRequest e) {
                    String error = "Cannot index the task for the object since " + e.getMessage();
                    if (parser.getIdentifier() != null) {
                        error = error + " with the identifier " + parser.getIdentifier().getValue();
                    }
                    logger.error((Object)error);
                    boolean requeue = false;
                    IndexWorker.this.rabbitMQchannel.basicReject(envelope.getDeliveryTag(), requeue);
                }
            }
        };
        this.rabbitMQchannel.basicConsume("index", false, this.consumer);
        logger.info((Object)"IndexWorker.start - Calling basicConsume and waiting for the coming messages");
    }

    private void recreateConnection() throws IOException {
        this.connectionLock.lock();
        try {
            if (this.rabbitMQchannel != null && this.rabbitMQchannel.isOpen()) {
                try {
                    this.rabbitMQchannel.close();
                    logger.debug((Object)"After closing the RabbitMQ channel.");
                }
                catch (Exception e) {
                    logger.warn((Object)("The rabbitmq channel can't be closed since " + e.getMessage()));
                }
            }
            if (this.rabbitMQconnection != null && this.rabbitMQconnection.isOpen()) {
                try {
                    this.rabbitMQconnection.close();
                    logger.debug((Object)"After closing the RabbitMQ connection.");
                }
                catch (Exception e) {
                    logger.warn((Object)("The rabbitmq connection can't be closed since " + e.getMessage()));
                }
            }
            try {
                this.generateConnectionAndChannel();
            }
            catch (IOException | TimeoutException e) {
                throw new IOException("Exception trying to re-initialize connection and channel: " + e.getMessage(), e);
            }
            if (this.consumer == null) {
                throw new RuntimeException("The consumer object is null and hasn't been initialized. IndexWorker.start should be called first.");
            }
            this.rabbitMQchannel.basicConsume("index", false, this.consumer);
            logger.debug((Object)"RabbitMQ connection and channel successfully re-created");
        }
        finally {
            this.connectionLock.unlock();
            logger.debug((Object)"The connection lock was released");
        }
    }

    private void indexObject(IndexQueueMessageParser parser, boolean multipleThread) {
        long start = System.currentTimeMillis();
        Identifier pid = parser.getIdentifier();
        String indexType = parser.getIndexType();
        int priority = parser.getPriority();
        String docId = parser.getDocId();
        try {
            long threadId = Thread.currentThread().getId();
            logger.info((Object)("IndexWorker.consumer.indexObject by multiple thread? " + multipleThread + ", with the thread id " + threadId + " - Received the index task from the index queue with the identifier: " + pid.getValue() + " , the index type: " + indexType + ", the priority: " + priority + ", the docId(can be null): " + docId));
            switch (indexType) {
                case "create": {
                    boolean sysmetaOnly = false;
                    this.solrIndex.update(pid, sysmetaOnly, docId);
                    break;
                }
                case "sysmeta": {
                    boolean sysmetaOnly = true;
                    this.solrIndex.update(pid, sysmetaOnly, docId);
                    break;
                }
                case "delete": {
                    this.solrIndex.remove(pid);
                    break;
                }
                default: {
                    throw new InvalidRequest("0000", "DataONE indexer does not know the index type: " + indexType + " in the index task");
                }
            }
            long end = System.currentTimeMillis();
            logger.info((Object)("IndexWorker.indexOjbect with the thread id " + threadId + " - Completed the index task from the index queue with the identifier: " + pid.getValue() + " , the index type: " + indexType + ", the priority: " + priority + " and the time taking is " + (end - start) + " milliseconds"));
        }
        catch (IOException | ClassNotFoundException | IllegalAccessException | InstantiationException | InterruptedException | NoSuchMethodException | InvocationTargetException | ParserConfigurationException | XPathExpressionException | EncoderException | SolrServerException | MarshallingException | InvalidRequest | InvalidToken | NotAuthorized | NotFound | NotImplemented | ServiceFailure | UnsupportedType | SAXException e) {
            logger.error((Object)("Cannot index the task for identifier " + pid.getValue() + " since " + e.getMessage()), e);
        }
    }

    private void startLivenessProbe() {
        if (!this.isK8s) {
            logger.debug((Object)"This is a non-k8s deployment and LivenessProbe do nothing.");
            return;
        }
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        Path path = Paths.get("./livenessprobe", new String[0]);
        Runnable task = () -> {
            try {
                Files.setLastModifiedTime(path, FileTime.fromMillis(System.currentTimeMillis()));
            }
            catch (IOException e) {
                logger.error((Object)("IndexWorker.startLivenessProbe - failed to update file: " + path), (Throwable)e);
            }
        };
        scheduler.scheduleAtFixedRate(task, 0L, 10L, TimeUnit.SECONDS);
        logger.info((Object)"IndexWorker.startLivenessProbe - livenessProbe started");
    }

    public void startReadinessProbe() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        Path path = Paths.get("./readinessprobe", new String[0]);
        Runnable task = () -> {
            try {
                if (this.isK8s && !Files.exists(path, new LinkOption[0])) {
                    Files.createFile(path, new FileAttribute[0]);
                }
                if (this.rabbitMQconnection != null && this.rabbitMQchannel != null && this.rabbitMQconnection.isOpen() && this.rabbitMQchannel.isOpen()) {
                    if (this.isK8s) {
                        Files.setLastModifiedTime(path, FileTime.fromMillis(System.currentTimeMillis()));
                    }
                    logger.debug((Object)"The RabbitMQ connection and channel are healthy.");
                } else {
                    logger.error((Object)"The RabbitMQ connection or channel were closed. DataONE-indexer has a mechanism to restore them. However, if this error message shows up repeatedly and there is no network outage, intervention may be required (e.g. checking configuration)");
                    try {
                        this.recreateConnection();
                    }
                    catch (IOException | RuntimeException e) {
                        logger.error((Object)("DataONE-indexer cannot recreate the RabbitMQ connections/channels since " + e.getMessage()));
                    }
                }
            }
            catch (IOException e) {
                logger.error((Object)("Failed to update file: " + path), (Throwable)e);
            }
        };
        scheduler.scheduleAtFixedRate(task, readinessInitialDelaySec, readinessPeriodSec, TimeUnit.SECONDS);
        logger.info((Object)"ReadinessProb started");
    }

    protected Connection getRabbitMQconnection() {
        return this.rabbitMQconnection;
    }

    protected Channel getRabbitMQchannel() {
        return this.rabbitMQchannel;
    }
}

