package org.apache.kafka.streams.tests;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;

/* loaded from: input_file:org/apache/kafka/streams/tests/StreamsStandByReplicaTest.class */
public class StreamsStandByReplicaTest {
    public static void main(String[] strArr) throws IOException {
        if (strArr.length < 2) {
            System.err.println("StreamsStandByReplicaTest are expecting two parameters: propFile, additionalConfigs; but only see " + strArr.length + " parameter");
            System.exit(1);
        }
        System.out.println("StreamsTest instance started");
        String str = strArr[0];
        String str2 = strArr[1];
        Properties loadProps = Utils.loadProps(str);
        if (loadProps.getProperty("bootstrap.servers") == null) {
            System.err.println("No bootstrap kafka servers specified in bootstrap.servers");
            System.exit(1);
        }
        loadProps.put("application.id", "kafka-streams-standby-tasks");
        loadProps.put("commit.interval.ms", 100);
        loadProps.put("num.standby.replicas", 1);
        loadProps.put("cache.max.bytes.buffering", 0);
        loadProps.put("default.key.serde", Serdes.String().getClass());
        loadProps.put("default.value.serde", Serdes.String().getClass());
        loadProps.put(StreamsConfig.producerPrefix("enable.idempotence"), true);
        if (str2 == null) {
            System.err.println("additional configs are not provided");
            System.err.flush();
            System.exit(1);
        }
        Map<String, String> parseConfigs = SystemTestUtil.parseConfigs(str2);
        System.out.println("Updating configs with " + parseConfigs);
        String remove = parseConfigs.remove("sourceTopic");
        String remove2 = parseConfigs.remove("sinkTopic1");
        String remove3 = parseConfigs.remove("sinkTopic2");
        if (remove == null || remove2 == null || remove3 == null) {
            System.err.println(String.format("one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]", remove, remove2, remove3));
            System.err.flush();
            System.exit(1);
        }
        loadProps.putAll(parseConfigs);
        if (!confirmCorrectConfigs(loadProps)) {
            System.err.println(String.format("ERROR: Did not have all required configs expected  to contain %s, %s,  %s,  %s", StreamsConfig.consumerPrefix("max.poll.interval.ms"), StreamsConfig.producerPrefix("retries"), StreamsConfig.producerPrefix("request.timeout.ms"), StreamsConfig.producerPrefix("max.block.ms")));
            System.exit(1);
        }
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KeyValueBytesStoreSupplier inMemoryKeyValueStore = Stores.inMemoryKeyValueStore("in-memory-store");
        KeyValueBytesStoreSupplier persistentKeyValueStore = Stores.persistentKeyValueStore("persistent-memory-store");
        Serde String = Serdes.String();
        ValueMapper valueMapper = (v0) -> {
            return v0.toString();
        };
        KStream stream = streamsBuilder.stream(remove, Consumed.with(String, String));
        stream.groupByKey().count(Materialized.as(inMemoryKeyValueStore)).toStream().mapValues(valueMapper).to(remove2, Produced.with(String, String));
        stream.groupByKey().count(Materialized.as(persistentKeyValueStore)).toStream().mapValues(valueMapper).to(remove3, Produced.with(String, String));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), loadProps);
        kafkaStreams.setUncaughtExceptionHandler((thread, th) -> {
            System.err.println("FATAL: An unexpected exception " + th);
            th.printStackTrace(System.err);
            System.err.flush();
            shutdown(kafkaStreams);
        });
        kafkaStreams.setStateListener((state, state2) -> {
            if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                for (ThreadMetadata threadMetadata : kafkaStreams.localThreadsMetadata()) {
                    System.out.println("ACTIVE_TASKS:" + threadMetadata.activeTasks().size() + " STANDBY_TASKS:" + threadMetadata.standbyTasks().size());
                }
            }
        });
        System.out.println("Start Kafka Streams");
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            shutdown(kafkaStreams);
            System.out.println("Shut down streams now");
        }));
    }

    private static void shutdown(KafkaStreams kafkaStreams) {
        kafkaStreams.close(Duration.ofSeconds(10L));
    }

    private static boolean confirmCorrectConfigs(Properties properties) {
        return properties.containsKey(StreamsConfig.consumerPrefix("max.poll.interval.ms")) && properties.containsKey(StreamsConfig.producerPrefix("retries")) && properties.containsKey(StreamsConfig.producerPrefix("request.timeout.ms")) && properties.containsKey(StreamsConfig.producerPrefix("max.block.ms"));
    }
}
