package org.dataone.cn.batch.synchronization.tasks;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.log4j.Logger;
import org.dataone.cn.batch.exceptions.ExecutionDisabledException;
import org.dataone.cn.batch.exceptions.RetryableException;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/* loaded from: input_file:org/dataone/cn/batch/synchronization/tasks/QueueProcessorCallable.class */
public abstract class QueueProcessorCallable<E, V> implements Callable<String> {
    protected Queue<E> queue;
    private static ThreadPoolTaskExecutor taskExecutor;
    private static final long EXECUTION_THREAD_TIMEOUT = 900000;
    Logger logger = Logger.getLogger(QueueProcessorCallable.class.getName());
    protected boolean inactivate = false;
    protected DelayQueue<DelayWrapper<E>> pendingQueueItem = new DelayQueue<>();
    protected CircularFifoQueue<V> latestResults = new CircularFifoQueue<>(50);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/dataone/cn/batch/synchronization/tasks/QueueProcessorCallable$FutureStat.class */
    public class FutureStat {
        Date start;
        E queueItem;

        FutureStat(Date date, E e) {
            this.start = date;
            this.queueItem = e;
        }
    }

    public void setQueue(Queue<E> queue) {
        this.queue = queue;
    }

    public void setThreadPoolTaskExecutor(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        taskExecutor = threadPoolTaskExecutor;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public String call() throws Exception {
        HashMap<FutureTask<V>, QueueProcessorCallable<E, V>.FutureStat> hashMap = new HashMap<>();
        while (true) {
            try {
                reapFutures(hashMap);
                if (isInactivated()) {
                    if (hashMap.isEmpty() && this.pendingQueueItem.isEmpty()) {
                        break;
                    }
                    if (this.pendingQueueItem.isEmpty()) {
                        interruptableSleep(3000L);
                    }
                }
                E nextItem = getNextItem();
                if (nextItem != null) {
                    try {
                        hashMap.put(scheduleExecution(prepareTask(nextItem)), new FutureStat(new Date(), nextItem));
                    } catch (TaskRejectedException e) {
                        this.pendingQueueItem.add((DelayQueue<DelayWrapper<E>>) new DelayWrapper<>(nextItem));
                        interruptableSleep(2000L);
                    }
                    cancelStuckTasks(hashMap);
                }
            } catch (InterruptedException e2) {
                e2.printStackTrace();
                return "Interrupted";
            }
        }
        this.logger.info("All Tasks are complete. Shutting down\n");
        throw new ExecutionDisabledException();
    }

    private E getNextItem() throws InterruptedException {
        DelayWrapper<E> poll = this.pendingQueueItem.poll();
        if (poll != null) {
            return poll.getWrappedObject();
        }
        if (isInactivated()) {
            return null;
        }
        return this.queue instanceof BlockingQueue ? (E) ((BlockingQueue) this.queue).poll(250L, TimeUnit.MILLISECONDS) : this.queue.poll();
    }

    private FutureTask<V> scheduleExecution(Callable<V> callable) throws TaskRejectedException {
        FutureTask<V> futureTask = new FutureTask<>(callable);
        taskExecutor.execute(futureTask);
        return futureTask;
    }

    private void interruptableSleep(Long l) {
        try {
            Thread.sleep(l.longValue());
        } catch (InterruptedException e) {
            this.logger.debug("sleep interrupted");
        }
    }

    private void cancelStuckTasks(HashMap<FutureTask<V>, QueueProcessorCallable<E, V>.FutureStat> hashMap) {
        if (taskExecutor.getActiveCount() >= taskExecutor.getPoolSize()) {
            if (taskExecutor.getPoolSize() == taskExecutor.getMaxPoolSize() && hashMap.isEmpty()) {
                for (Runnable runnable : (Runnable[]) taskExecutor.getThreadPoolExecutor().getQueue().toArray(new Runnable[0])) {
                    taskExecutor.getThreadPoolExecutor().remove(runnable);
                }
            }
            taskExecutor.getThreadPoolExecutor().purge();
        }
    }

    protected abstract Callable<V> prepareTask(E e);

    protected abstract void cleanupTask(E e);

    protected void setIsInactivated(boolean z) {
        this.logger.warn("Setting processor inactivation: " + z);
        this.inactivate = z;
    }

    protected boolean isInactivated() {
        return this.inactivate;
    }

    private void reapFutures(HashMap<FutureTask<V>, QueueProcessorCallable<E, V>.FutureStat> hashMap) throws InterruptedException {
        if (hashMap.size() > 0) {
            this.logger.info("waiting on " + hashMap.size() + " futures");
        } else {
            this.logger.debug("Polling empty queue");
        }
        if (hashMap.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (FutureTask<V> futureTask : hashMap.keySet()) {
            try {
                this.latestResults.add(futureTask.get(250L, TimeUnit.MILLISECONDS));
                cleanupTask(hashMap.get(futureTask).queueItem);
                arrayList.add(futureTask);
            } catch (CancellationException e) {
                cleanupTask(hashMap.get(futureTask).queueItem);
                this.pendingQueueItem.add((DelayQueue<DelayWrapper<E>>) new DelayWrapper<>(hashMap.get(futureTask).queueItem, 0L));
                arrayList.add(futureTask);
            } catch (ExecutionException e2) {
                if (e2.getCause() != null && (e2.getCause() instanceof RetryableException)) {
                    this.logger.debug("Adding item to pendingQueue...");
                    this.pendingQueueItem.add((DelayQueue<DelayWrapper<E>>) new DelayWrapper<>(hashMap.get(futureTask).queueItem, ((RetryableException) e2.getCause()).getDelay()));
                }
                cleanupTask(hashMap.get(futureTask).queueItem);
                arrayList.add(futureTask);
            } catch (TimeoutException e3) {
                if (new Date().getTime() - hashMap.get(futureTask).start.getTime() > EXECUTION_THREAD_TIMEOUT) {
                    if (futureTask.cancel(true)) {
                        cleanupTask(hashMap.get(futureTask).queueItem);
                        arrayList.add(futureTask);
                    } else {
                        this.logger.warn("unable to cancel this future task!");
                    }
                    if (taskExecutor.getThreadPoolExecutor().remove(futureTask)) {
                    }
                }
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        Iterator<E> it = arrayList.iterator();
        while (it.hasNext()) {
            hashMap.remove((Future) it.next());
        }
    }
}
