package org.dataone.cn.messaging;

import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/* loaded from: input_file:org/dataone/cn/messaging/QueueAccessIT.class */
public class QueueAccessIT {
    private static Logger logger = Logger.getLogger(QueueAccessIT.class.getName());
    static String testQueueName = "brokerTestQueue";
    CachingConnectionFactory connFactory = new CachingConnectionFactory("localhost");
    RabbitTemplate rabbitTemplate;

    @Before
    public void setUp() throws Exception {
        this.connFactory.setUsername("guest");
        this.connFactory.setPassword("guest");
        this.connFactory.setPublisherConfirms(true);
        this.rabbitTemplate = new RabbitTemplate(this.connFactory);
    }

    @Test
    @Ignore("It doesn't look like an exceptionp throwing situation")
    public void testConfirmedPublishToNonExistingQueue_ShouldThrowAMQPException() {
        try {
            this.rabbitTemplate.convertAndSend("aNonExistingQueue", "foo");
            Assert.fail("publishing to a non-existing queue should throw exception");
        } catch (AmqpException e) {
            logger.info(e.getClass().getCanonicalName() + ": " + e.getMessage());
        }
    }

    @Test
    @Ignore
    public void testConsumeFromNonExistingQueue_ShouldThrowAMQUException() {
        try {
            this.rabbitTemplate.receiveAndConvert("aNonExistingQueue", 500L);
            Assert.fail("Should not successfully retrieve from non existing queue");
        } catch (AmqpException e) {
            logger.info(e.getClass().getCanonicalName() + ": " + e.getMessage());
        }
    }

    void stopBroker() {
        logger.warn("*#*#*#*#*#*#*#*#*#*    PLEASE STOP BROKER while test sleeps for 15 seconds     *#*#*#*#*#*#*#*#*#*");
        try {
            Thread.sleep(15000L);
        } catch (InterruptedException e) {
            logger.warn("OK, moving on...");
        }
    }

    void startBroker() {
        logger.warn("*#*#*#*#*#*#*#*#*#*    PLEASE RESTART BROKER while test sleeps for 15 seconds    *#*#*#*#*#*#*#*#*#*");
        try {
            Thread.sleep(15000L);
        } catch (InterruptedException e) {
            logger.warn("OK, moving on...");
        }
    }

    @Test
    @Ignore
    public void testMessageDurability_messageShouldSurviveBrokerRestart() {
        this.rabbitTemplate.convertAndSend(testQueueName, "testBrokerStop1");
        logger.info("   =======> received firstMessage, so broker is up and running: " + ((String) this.rabbitTemplate.receiveAndConvert(testQueueName, 250L)));
        this.rabbitTemplate.convertAndSend(testQueueName, "testBrokerStop2");
        stopBroker();
        try {
            Assert.fail("Should not be able to get a message, so the stop probably didn't happen. " + ((String) this.rabbitTemplate.receiveAndConvert(testQueueName, 250L)));
        } catch (AmqpException e) {
            logger.info("Broker down, as planned... :" + e.getClass().getCanonicalName());
        }
        startBroker();
        Assert.assertEquals("Message should have survived broker restart", "testBrokerStop2", (String) this.rabbitTemplate.receiveAndConvert(testQueueName, 2000L));
    }

    @Test
    @Ignore
    public void testQueueDurability_queueDeclaredAsDurableShouldSurviveBrokerRestart() throws InterruptedException {
        new RabbitAdmin(this.connFactory).declareQueue(new Queue("aNonDurableQueue", true));
        stopBroker();
        startBroker();
        this.rabbitTemplate.convertAndSend("aNonDurableQueue", "a test message that could have been anything");
    }

    @Test
    @Ignore
    public void testDelayedConsumeAcknowledgement() throws InterruptedException {
        this.rabbitTemplate.convertAndSend(testQueueName, "simpleMessage1");
        logger.warn("Please check broker for message count. should be 1");
        Thread.sleep(8000L);
        QueueAccess queueAccess = new QueueAccess(this.connFactory, testQueueName);
        Message consumeNextNoAck = queueAccess.consumeNextNoAck(200L);
        logger.warn("Please check broker for message count. should be 1");
        Thread.sleep(8000L);
        queueAccess.acknowledgeConsume(consumeNextNoAck);
        logger.warn("Please check broker for message count. should be 0");
        Thread.sleep(8000L);
    }

    @Test
    @Ignore
    public void examplePipeline() throws InterruptedException {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(this.connFactory);
        Queue queue = new Queue("pipe1", false);
        rabbitAdmin.declareQueue(queue);
        Queue queue2 = new Queue("pipe2", false);
        rabbitAdmin.declareQueue(queue2);
        Queue queue3 = new Queue("pipe3", false);
        rabbitAdmin.declareQueue(queue3);
        Queue queue4 = new Queue("pipe4", false);
        rabbitAdmin.declareQueue(queue4);
        this.rabbitTemplate.convertAndSend(testQueueName, "demoMessage");
        processStep(testQueueName, queue.getName());
        processStep(queue.getName(), queue2.getName());
        processStep(queue2.getName(), queue3.getName());
        processStep(queue3.getName(), queue4.getName());
    }

    private void processStep(String str, String str2) throws InterruptedException {
        logger.warn("PAUSING:  please list_queues");
        Thread.sleep(8000L);
        QueueAccess queueAccess = new QueueAccess(this.connFactory, str);
        Message consumeNextNoAck = queueAccess.consumeNextNoAck(200L);
        logger.info(String.format("Got message from queue %s: %s", str, consumeNextNoAck.getBody().toString()));
        new QueueAccess(this.connFactory, str2).publish(consumeNextNoAck);
        queueAccess.acknowledgeConsume(consumeNextNoAck);
    }

    @Test
    @Ignore
    public void simpleroundTripTest() throws Exception {
        logger.info("********** running the test");
        Thread.sleep(2000L);
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPublisherConfirms(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        rabbitTemplate.convertAndSend(testQueueName, "hello world message!");
        Thread.sleep(6000L);
        logger.info("************ message sent / trying to retrieve...");
        String str = (String) rabbitTemplate.receiveAndConvert(testQueueName, 10000L);
        logger.info("************* message received");
        Assert.assertEquals("Message in should match message out.", "hello world message!", str);
        logger.info("************* done.");
    }

    @Test
    public void testPlaceHolderUntilCanGetMockBroker() {
    }
}
