package org.apache.kudu.flume.sink;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.test.ClientTestUtil;
import org.apache.kudu.test.KuduTestHarness;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.class */
public class KeyedKuduOperationsProducerTest {
    private static final Logger LOG = LoggerFactory.getLogger(KeyedKuduOperationsProducerTest.class);

    @Rule
    public KuduTestHarness harness = new KuduTestHarness();

    private KuduTable createNewTable(String str) throws Exception {
        LOG.info("Creating new table...");
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());
        arrayList.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(false).build());
        KuduTable createTable = this.harness.getClient().createTable(str, new Schema(arrayList), new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")).setNumReplicas(1));
        LOG.info("Created new table.");
        return createTable;
    }

    @Test
    public void testEmptyChannelWithInsert() throws Exception {
        testEvents(0, "insert");
    }

    @Test
    public void testOneEventWithInsert() throws Exception {
        testEvents(1, "insert");
    }

    @Test
    public void testThreeEventsWithInsert() throws Exception {
        testEvents(3, "insert");
    }

    @Test
    public void testEmptyChannelWithUpsert() throws Exception {
        testEvents(0, "upsert");
    }

    @Test
    public void testOneEventWithUpsert() throws Exception {
        testEvents(1, "upsert");
    }

    @Test
    public void testThreeEventsWithUpsert() throws Exception {
        testEvents(3, "upsert");
    }

    @Test
    public void testDuplicateRowsWithUpsert() throws Exception {
        LOG.info("Testing events with upsert...");
        KuduTable createNewTable = createNewTable("testDupUpsertEvents");
        KuduSink createSink = KuduSinkTestUtil.createSink(this.harness.getClient(), createNewTable.getName(), new Context(ImmutableMap.of("producer.operation", "upsert", "producer", SimpleKeyedKuduOperationsProducer.class.getName())));
        createSink.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            Event withBody = EventBuilder.withBody(String.format("payload body %s", Integer.valueOf(i)), StandardCharsets.UTF_8);
            withBody.setHeaders(ImmutableMap.of("key", String.format("key %s", Integer.valueOf(i))));
            arrayList.add(withBody);
        }
        KuduSinkTestUtil.processEvents(createSink, arrayList);
        List scanTableToStrings = ClientTestUtil.scanTableToStrings(createNewTable, new KuduPredicate[0]);
        Assert.assertEquals("3 row(s) expected", 3, scanTableToStrings.size());
        for (int i2 = 0; i2 < 3; i2++) {
            Assert.assertTrue("incorrect payload", ((String) scanTableToStrings.get(i2)).contains("payload body " + i2));
        }
        Event withBody2 = EventBuilder.withBody("payload body upserted".getBytes(StandardCharsets.UTF_8));
        withBody2.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
        KuduSinkTestUtil.processEvents(createSink, ImmutableList.of(withBody2));
        List scanTableToStrings2 = ClientTestUtil.scanTableToStrings(createNewTable, new KuduPredicate[0]);
        Assert.assertEquals("3 row(s) expected", 3, scanTableToStrings2.size());
        Assert.assertTrue("incorrect payload", ((String) scanTableToStrings2.get(0)).contains("payload body upserted"));
        for (int i3 = 1; i3 < 3; i3++) {
            Assert.assertTrue("incorrect payload", ((String) scanTableToStrings2.get(i3)).contains("payload body " + i3));
        }
        LOG.info("Testing events with upsert finished successfully.");
    }

    private void testEvents(int i, String str) throws Exception {
        LOG.info("Testing {} events...", Integer.valueOf(i));
        KuduTable createNewTable = createNewTable("test" + i + "events" + str);
        KuduSinkTestUtil.processEventsCreatingSink(this.harness.getClient(), new Context(ImmutableMap.of("producer.operation", str, "producer", SimpleKeyedKuduOperationsProducer.class.getName())), createNewTable.getName(), getEvents(i));
        List scanTableToStrings = ClientTestUtil.scanTableToStrings(createNewTable, new KuduPredicate[0]);
        Assert.assertEquals(i + " row(s) expected", i, scanTableToStrings.size());
        for (int i2 = 0; i2 < i; i2++) {
            Assert.assertTrue("incorrect payload", ((String) scanTableToStrings.get(i2)).contains("payload body " + i2));
        }
        LOG.info("Testing {} events finished successfully.", Integer.valueOf(i));
    }

    private List<Event> getEvents(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            Event withBody = EventBuilder.withBody(String.format("payload body %s", Integer.valueOf(i2)).getBytes(StandardCharsets.UTF_8));
            withBody.setHeaders(ImmutableMap.of("key", String.format("key %s", Integer.valueOf(i2))));
            arrayList.add(withBody);
        }
        return arrayList;
    }
}
