package org.dataone.cn.index.processor;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.dataone.client.v2.formats.ObjectFormatCache;
import org.dataone.cn.hazelcast.HazelcastClientFactory;
import org.dataone.cn.index.task.IndexTask;
import org.dataone.cn.index.task.IndexTaskRepository;
import org.dataone.cn.index.task.ResourceMapIndexTask;
import org.dataone.cn.index.util.PerformanceLogger;
import org.dataone.cn.indexer.XmlDocumentUtility;
import org.dataone.cn.indexer.resourcemap.ForesiteResourceMap;
import org.dataone.cn.indexer.resourcemap.ResourceMapFactory;
import org.dataone.cn.indexer.solrhttp.HTTPService;
import org.dataone.cn.indexer.solrhttp.SolrDoc;
import org.dataone.configuration.Settings;
import org.dataone.service.exceptions.BaseException;
import org.dataone.service.types.v1.Identifier;
import org.dataone.service.types.v1.ObjectFormatIdentifier;
import org.dataone.service.types.v2.SystemMetadata;
import org.dspace.foresite.OREParserException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.orm.hibernate3.HibernateOptimisticLockingFailureException;
import org.w3c.dom.Document;

/* loaded from: input_file:org/dataone/cn/index/processor/IndexTaskProcessor.class */
public class IndexTaskProcessor {
    private static final String FORMAT_TYPE_DATA = "DATA";
    private static final String LOAD_LOGGER_NAME = "indexProcessorLoad";

    @Autowired
    private IndexTaskRepository repo;

    @Autowired
    private IndexTaskProcessingStrategy deleteProcessor;

    @Autowired
    private IndexTaskProcessingStrategy updateProcessor;

    @Autowired
    private HTTPService httpService;

    @Autowired
    private String solrQueryUri;
    private PerformanceLogger perfLog = PerformanceLogger.getInstance();
    private static Logger logger = Logger.getLogger(IndexTaskProcessor.class.getName());
    private static int BATCH_UPDATE_SIZE = Settings.getConfiguration().getInt("dataone.indexing.batchUpdateSize", 1000);
    private static int NUMOFPROCESSOR = Settings.getConfiguration().getInt("dataone.indexing.processThreadPoolSize", 10);
    private static int MAXATTEMPTS = Settings.getConfiguration().getInt("dataone.indexing.resourceMapWait.maxAttempt", 10);
    private static ExecutorService executor = Executors.newFixedThreadPool(NUMOFPROCESSOR);
    private static final Lock lock = new ReentrantLock();
    private static ConcurrentHashMap<String, String> referencedIdsMap = new ConcurrentHashMap<>();

    public void batchProcessIndexTaskQueue() {
        logProcessorLoad();
        List<IndexTask> indexTaskQueue = getIndexTaskQueue();
        ArrayList arrayList = new ArrayList(BATCH_UPDATE_SIZE);
        logger.info("batchProcessIndexTaskQueue, queue size: " + indexTaskQueue.size() + " tasks");
        IndexTask nextIndexTask = getNextIndexTask(indexTaskQueue);
        while (nextIndexTask != null) {
            arrayList.add(nextIndexTask);
            logger.info("added task: " + nextIndexTask.getPid());
            nextIndexTask = getNextIndexTask(indexTaskQueue);
            if (nextIndexTask != null) {
                logger.info("next task: " + nextIndexTask.getPid());
            }
            logger.info("queue size: " + indexTaskQueue.size());
            if (arrayList.size() >= BATCH_UPDATE_SIZE) {
                batchProcessTasksOnThread(arrayList);
                arrayList = new ArrayList(BATCH_UPDATE_SIZE);
            }
        }
        batchProcessTasksOnThread(arrayList);
        List<IndexTask> indexTaskRetryQueue = getIndexTaskRetryQueue();
        ArrayList arrayList2 = new ArrayList(BATCH_UPDATE_SIZE);
        logger.info("batchProcessIndexTaskQueue, retry queue size: " + indexTaskQueue.size() + " tasks");
        IndexTask nextIndexTask2 = getNextIndexTask(indexTaskRetryQueue);
        while (nextIndexTask2 != null) {
            arrayList2.add(nextIndexTask2);
            nextIndexTask2 = getNextIndexTask(indexTaskRetryQueue);
            if (arrayList2.size() >= BATCH_UPDATE_SIZE) {
                batchProcessTasksOnThread(arrayList2);
                arrayList2 = new ArrayList(BATCH_UPDATE_SIZE);
            }
        }
        batchProcessTasksOnThread(arrayList2);
    }

    public void processIndexTaskQueue() {
        logProcessorLoad();
        List<IndexTask> indexTaskQueue = getIndexTaskQueue();
        IndexTask nextIndexTask = getNextIndexTask(indexTaskQueue);
        while (true) {
            IndexTask indexTask = nextIndexTask;
            if (indexTask == null) {
                break;
            }
            processTaskOnThread(indexTask);
            nextIndexTask = getNextIndexTask(indexTaskQueue);
        }
        IndexTask nextIndexTask2 = getNextIndexTask(getIndexTaskRetryQueue());
        while (true) {
            IndexTask indexTask2 = nextIndexTask2;
            if (indexTask2 == null) {
                return;
            }
            processTaskOnThread(indexTask2);
            nextIndexTask2 = getNextIndexTask(indexTaskQueue);
        }
    }

    private void logProcessorLoad() {
        Logger logger2 = Logger.getLogger(LOAD_LOGGER_NAME);
        Long l = null;
        Long l2 = null;
        try {
            l = this.repo.countByStatus("NEW");
            l2 = this.repo.countByStatus("FAILED");
        } catch (Exception e) {
            logger.error("Unable to count NEW or FAILED tasks in task index repository.", e);
        }
        logger2.info("new:" + l + ", failed: " + l2);
    }

    private void processTaskOnThread(final IndexTask indexTask) {
        logger.info("using multiple threads to process index");
        executor.submit(new Runnable() { // from class: org.dataone.cn.index.processor.IndexTaskProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                IndexTaskProcessor.this.processTask(indexTask);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processTask(IndexTask indexTask) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                checkReadinessProcessResourceMap(indexTask);
                if (indexTask.isDeleteTask()) {
                    logger.info("Indexing delete task for pid: " + indexTask.getPid());
                    this.deleteProcessor.process(indexTask);
                } else {
                    logger.info("Indexing update task for pid: " + indexTask.getPid());
                    this.updateProcessor.process(indexTask);
                }
                removeIdsFromResourceMapReferencedSet(indexTask);
                this.repo.delete(indexTask);
                logger.info("Indexing complete for pid: " + indexTask.getPid());
                this.perfLog.log("IndexTaskProcessor.processTasks process pid " + indexTask.getPid(), System.currentTimeMillis() - currentTimeMillis);
            } catch (Exception e) {
                logger.error("Unable to process task for pid: " + indexTask.getPid(), e);
                handleFailedTask(indexTask);
                removeIdsFromResourceMapReferencedSet(indexTask);
            }
        } catch (Throwable th) {
            removeIdsFromResourceMapReferencedSet(indexTask);
            throw th;
        }
    }

    private void checkReadinessProcessResourceMap(IndexTask indexTask) throws Exception {
        if (indexTask == null || !(indexTask instanceof ResourceMapIndexTask)) {
            return;
        }
        lock.lock();
        try {
            try {
                ResourceMapIndexTask resourceMapIndexTask = (ResourceMapIndexTask) indexTask;
                List<String> referencedIds = resourceMapIndexTask.getReferencedIds();
                if (referencedIds != null) {
                    for (String str : referencedIds) {
                        boolean z = false;
                        int i = 0;
                        while (true) {
                            if (i >= MAXATTEMPTS) {
                                break;
                            }
                            if (str == null || str.trim().equals("") || !referencedIdsMap.containsKey(str)) {
                                if (str != null && !str.trim().equals("") && !referencedIdsMap.containsKey(str)) {
                                    referencedIdsMap.put(str, resourceMapIndexTask.getPid());
                                    z = true;
                                    break;
                                }
                                i++;
                            } else if (resourceMapIndexTask.getPid().equals(referencedIdsMap.get(str))) {
                                z = true;
                                break;
                            } else {
                                logger.info("Another resource map is process the referenced id " + str + " as well. So the thread to process id " + resourceMapIndexTask.getPid() + " has to wait 0.5 seconds.");
                                Thread.sleep(500L);
                                i++;
                            }
                        }
                        if (!z) {
                            removeIdsFromResourceMapReferencedSet(resourceMapIndexTask);
                            String str2 = "We waited for another thread to finish indexing a resource map which has the referenced id " + str + " for a while. Now we quited and can't index id " + resourceMapIndexTask.getPid();
                            logger.error(str2);
                            throw new Exception(str2);
                        }
                    }
                }
                lock.unlock();
            } catch (Exception e) {
                throw e;
            }
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private void removeIdsFromResourceMapReferencedSet(IndexTask indexTask) {
        List<String> referencedIds;
        if (indexTask == null || !(indexTask instanceof ResourceMapIndexTask) || (referencedIds = ((ResourceMapIndexTask) indexTask).getReferencedIds()) == null) {
            return;
        }
        for (String str : referencedIds) {
            if (str != null) {
                referencedIdsMap.remove(str);
            }
        }
    }

    private void batchProcessTasksOnThread(final List<IndexTask> list) {
        logger.info("using multiple threads to process BATCHED index tasks.");
        executor.submit(new Runnable() { // from class: org.dataone.cn.index.processor.IndexTaskProcessor.2
            @Override // java.lang.Runnable
            public void run() {
                IndexTaskProcessor.this.batchProcessTasks(list);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void batchProcessTasks(List<IndexTask> list) {
        if (list == null) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        int size = list.size();
        logger.info("batch processing: " + size + " tasks");
        ArrayList<IndexTask> arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (IndexTask indexTask : list) {
            if (indexTask.isDeleteTask()) {
                logger.info("Adding delete task to be processed for pid: " + indexTask.getPid());
                arrayList2.add(indexTask);
            } else {
                logger.info("Adding update task to be processed for pid: " + indexTask.getPid());
                arrayList.add(indexTask);
            }
        }
        logger.info("update tasks: " + arrayList.size());
        logger.info("delete tasks: " + arrayList2.size());
        try {
            try {
                batchCheckReadinessProcessResourceMap(list);
                try {
                    this.deleteProcessor.process(arrayList2);
                    for (IndexTask indexTask2 : arrayList2) {
                        this.repo.delete(indexTask2);
                        logger.info("Indexing complete for pid: " + indexTask2.getPid());
                    }
                } catch (Exception e) {
                    StringBuilder sb = new StringBuilder();
                    Iterator<IndexTask> it = arrayList2.iterator();
                    while (it.hasNext()) {
                        sb.append(it.next().getPid()).append(", ");
                    }
                    logger.error("Unable to process tasks for pids: " + sb.toString(), e);
                    handleFailedTasks(arrayList2);
                }
                try {
                    this.updateProcessor.process(arrayList);
                    for (IndexTask indexTask3 : arrayList) {
                        this.repo.delete(indexTask3);
                        logger.info("Indexing complete for pid: " + indexTask3.getPid());
                    }
                } catch (Exception e2) {
                    StringBuilder sb2 = new StringBuilder();
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        sb2.append(((IndexTask) it2.next()).getPid()).append(", ");
                    }
                    logger.error("Unable to process tasks for pids: " + sb2.toString(), e2);
                    handleFailedTasks(arrayList2);
                }
                batchRemoveIdsFromResourceMapReferencedSet(list);
            } catch (Exception e3) {
                logger.error("Couldn't batch indexing the tasks since " + e3.getMessage());
                batchRemoveIdsFromResourceMapReferencedSet(list);
            }
            this.perfLog.log("IndexTaskProcessor.batchProcessTasks process " + size + " objects in ", System.currentTimeMillis() - currentTimeMillis);
        } catch (Throwable th) {
            batchRemoveIdsFromResourceMapReferencedSet(list);
            throw th;
        }
    }

    private void batchCheckReadinessProcessResourceMap(List<IndexTask> list) throws Exception {
        lock.lock();
        if (list != null) {
            try {
                Iterator<IndexTask> it = list.iterator();
                while (it.hasNext()) {
                    checkReadinessProcessResourceMap(it.next());
                }
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        }
        lock.unlock();
    }

    private void batchRemoveIdsFromResourceMapReferencedSet(List<IndexTask> list) {
        if (list != null) {
            Iterator<IndexTask> it = list.iterator();
            while (it.hasNext()) {
                removeIdsFromResourceMapReferencedSet(it.next());
            }
        }
    }

    private void handleFailedTasks(List<IndexTask> list) {
        for (IndexTask indexTask : list) {
            indexTask.markFailed();
            saveTask(indexTask);
        }
    }

    private void handleFailedTask(IndexTask indexTask) {
        indexTask.markFailed();
        saveTask(indexTask);
    }

    private IndexTask getNextIndexTask(List<IndexTask> list) {
        IndexTask indexTask = null;
        while (indexTask == null && !list.isEmpty()) {
            indexTask = list.remove(0);
            if (indexTask != null) {
                indexTask.markInProgress();
                indexTask = saveTask(indexTask);
                logger.info("Start of indexing pid: " + indexTask.getPid());
                if (indexTask != null && indexTask.isDeleteTask()) {
                    return indexTask;
                }
                if (indexTask != null && !isObjectPathReady(indexTask)) {
                    indexTask.markNew();
                    saveTask(indexTask);
                    logger.info("Task for pid: " + indexTask.getPid() + " not processed since the object path is not ready.");
                    indexTask = null;
                } else if (indexTask != null && representsResourceMap(indexTask)) {
                    boolean z = true;
                    List<String> list2 = null;
                    try {
                        list2 = ResourceMapFactory.buildResourceMap(indexTask.getObjectPath()).getAllDocumentIDs();
                        list2.remove(indexTask.getPid());
                        if (!areAllReferencedDocsIndexed(list2)) {
                            logger.info("Not all map resource references indexed for map: " + indexTask.getPid() + ".  Marking new and continuing...");
                            z = false;
                        }
                    } catch (Exception e) {
                        z = false;
                        logger.error("unable to load resource for pid: " + indexTask.getPid() + " at object path: " + indexTask.getObjectPath() + ".  Marking new and continuing...");
                    } catch (OREParserException e2) {
                        z = false;
                        logger.error("Unable to parse ORE doc: " + indexTask.getPid() + ".  Unrecoverable parse error: task will not be re-tried.");
                        if (logger.isTraceEnabled()) {
                            e2.printStackTrace();
                        }
                    }
                    if (z) {
                        logger.info("the original index task - " + indexTask.toString());
                        ResourceMapIndexTask resourceMapIndexTask = new ResourceMapIndexTask();
                        resourceMapIndexTask.copy(indexTask);
                        resourceMapIndexTask.setReferencedIds(list2);
                        indexTask = resourceMapIndexTask;
                        if (indexTask instanceof ResourceMapIndexTask) {
                            logger.info("the new index task is a ResourceMapIndexTask");
                            logger.info("the new index task - " + indexTask.toString());
                        } else {
                            logger.error("Something is wrong to change the IndexTask object to the ResourceMapIndexTask object ");
                        }
                    } else {
                        indexTask.markNew();
                        saveTask(indexTask);
                        logger.info("Task for resource map pid: " + indexTask.getPid() + " not processed.");
                        indexTask = null;
                    }
                }
            }
        }
        return indexTask;
    }

    private boolean areAllReferencedDocsIndexed(List<String> list) {
        if (list == null || list.size() == 0) {
            return true;
        }
        try {
            List<SolrDoc> documentsById = this.httpService.getDocumentsById(this.solrQueryUri, list);
            int i = 0;
            for (String str : list) {
                boolean z = false;
                for (SolrDoc solrDoc : documentsById) {
                    if (solrDoc.getIdentifier().equals(str) || str.equals(solrDoc.getSeriesId())) {
                        z = true;
                        i++;
                        break;
                    }
                }
                if (!z) {
                    Identifier identifier = new Identifier();
                    identifier.setValue(str);
                    logger.debug("Identifier " + str + " was not found in the referenced id list in the Solr search index.");
                    SystemMetadata systemMetadata = (SystemMetadata) HazelcastClientFactory.getSystemMetadataMap().get(identifier);
                    if (systemMetadata != null && notVisibleInIndex(systemMetadata)) {
                        i++;
                    }
                }
            }
            return list.size() == i;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return false;
        }
    }

    private boolean notVisibleInIndex(SystemMetadata systemMetadata) {
        return (SolrDoc.visibleInIndex(systemMetadata) || systemMetadata == null) ? false : true;
    }

    private boolean representsResourceMap(IndexTask indexTask) {
        return ForesiteResourceMap.representsResourceMap(indexTask.getFormatId());
    }

    private boolean isObjectPathReady(IndexTask indexTask) {
        boolean z = true;
        boolean isDataObject = isDataObject(indexTask);
        if (indexTask.getObjectPath() == null && !isDataObject) {
            String retrieveObjectPath = retrieveObjectPath(indexTask.getPid());
            if (retrieveObjectPath == null) {
                z = false;
                evictObjectPathEntry(indexTask.getPid());
                logger.info("Object path for pid: " + indexTask.getPid() + " is not available.  Object path entry will be evicting from map.  Task will be retried.");
            }
            indexTask.setObjectPath(retrieveObjectPath);
        }
        if (indexTask.getObjectPath() != null && !isDataObject && !new File(indexTask.getObjectPath()).exists()) {
            z = false;
            logger.info("Object path exists for pid: " + indexTask.getPid() + " however the file location: " + indexTask.getObjectPath() + " does not exist.  Marking not ready - task will be marked new and retried.");
        }
        return z;
    }

    private boolean isDataObject(IndexTask indexTask) {
        try {
            ObjectFormatIdentifier objectFormatIdentifier = new ObjectFormatIdentifier();
            objectFormatIdentifier.setValue(indexTask.getFormatId());
            return FORMAT_TYPE_DATA.equals(ObjectFormatCache.getInstance().getFormat(objectFormatIdentifier).getFormatType());
        } catch (BaseException e) {
            logger.error(e.getMessage(), e);
            return false;
        }
    }

    private String retrieveObjectPath(String str) {
        Identifier identifier = new Identifier();
        identifier.setValue(str);
        return (String) HazelcastClientFactory.getObjectPathMap().get(identifier);
    }

    private void evictObjectPathEntry(String str) {
        Identifier identifier = new Identifier();
        identifier.setValue(str);
        HazelcastClientFactory.getObjectPathMap().evict(identifier);
    }

    private List<IndexTask> getIndexTaskQueue() {
        long currentTimeMillis = System.currentTimeMillis();
        List<IndexTask> findByStatusOrderByPriorityAscTaskModifiedDateAsc = this.repo.findByStatusOrderByPriorityAscTaskModifiedDateAsc("NEW");
        this.perfLog.log("IndexTaskProcessor.getIndexTaskQueue() fetching NEW IndexTasks from repo", System.currentTimeMillis() - currentTimeMillis);
        return findByStatusOrderByPriorityAscTaskModifiedDateAsc;
    }

    private List<IndexTask> getIndexTaskRetryQueue() {
        return this.repo.findByStatusAndNextExecutionLessThan("FAILED", System.currentTimeMillis());
    }

    private IndexTask saveTask(IndexTask indexTask) {
        try {
            indexTask = (IndexTask) this.repo.save(indexTask);
        } catch (HibernateOptimisticLockingFailureException e) {
            logger.error("Unable to update index task for pid: " + indexTask.getPid() + ".");
            indexTask = null;
        }
        return indexTask;
    }

    private Document loadDocument(IndexTask indexTask) {
        Document document = null;
        try {
            document = XmlDocumentUtility.loadDocument(indexTask.getObjectPath());
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        if (document == null) {
            logger.error("Could not load OBJECT file for ID,Path=" + indexTask.getPid() + ", " + indexTask.getObjectPath());
        }
        return document;
    }

    public void setSolrQueryUri(String str) {
        this.solrQueryUri = str;
    }
}
