package org.apache.flume.api;

import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/api/ThriftRpcClient.class */
public class ThriftRpcClient extends SSLContextAwareAbstractRpcClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(ThriftRpcClient.class);
    public static final String CONFIG_PROTOCOL = "protocol";
    public static final String BINARY_PROTOCOL = "binary";
    public static final String COMPACT_PROTOCOL = "compact";
    private long requestTimeout;
    private String hostname;
    private int port;
    private ConnectionPoolManager connectionManager;
    private String protocol;
    private final Random random = new Random();
    private final Lock stateLock = new ReentrantLock(true);
    private State connState = State.INIT;
    private final AtomicLong threadCounter = new AtomicLong(0);
    private final ExecutorService callTimeoutPool = Executors.newCachedThreadPool(new ThreadFactory() { // from class: org.apache.flume.api.ThriftRpcClient.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName("Flume Thrift RPC thread - " + String.valueOf(ThriftRpcClient.this.threadCounter.incrementAndGet()));
            return thread;
        }
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/api/ThriftRpcClient$ClientWrapper.class */
    public class ClientWrapper {
        public final ThriftSourceProtocol.Client client;
        public final TTransport transport;
        private final int hashCode;

        public ClientWrapper() throws Exception {
            this.transport = ThriftRpcClient.this.getTransport(ThriftRpcClient.this.enableSsl ? ThriftRpcClient.createSSLSocket(ThriftRpcClient.createSSLContext(ThriftRpcClient.this.truststore, ThriftRpcClient.this.truststorePassword, ThriftRpcClient.this.truststoreType).getSocketFactory(), ThriftRpcClient.this.hostname, ThriftRpcClient.this.port, 120000, ThriftRpcClient.this.excludeProtocols, ThriftRpcClient.this.includeProtocols, ThriftRpcClient.this.excludeCipherSuites, ThriftRpcClient.this.includeCipherSuites) : new TSocket(ThriftRpcClient.this.hostname, ThriftRpcClient.this.port));
            if (!this.transport.isOpen()) {
                this.transport.open();
            }
            if (ThriftRpcClient.this.protocol.equals("binary")) {
                ThriftRpcClient.LOGGER.info("Using TBinaryProtocol");
                this.client = new ThriftSourceProtocol.Client(new TBinaryProtocol(this.transport));
            } else {
                ThriftRpcClient.LOGGER.info("Using TCompactProtocol");
                this.client = new ThriftSourceProtocol.Client(new TCompactProtocol(this.transport));
            }
            this.hashCode = ThriftRpcClient.this.random.nextInt();
        }

        public boolean equals(Object obj) {
            return obj != null && this == obj;
        }

        public int hashCode() {
            return this.hashCode;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/api/ThriftRpcClient$ConnectionPoolManager.class */
    public class ConnectionPoolManager {
        private final int maxPoolSize;
        private final Queue<ClientWrapper> availableClients = new LinkedList();
        private final Set<ClientWrapper> checkedOutClients = new HashSet();
        private final Lock poolLock = new ReentrantLock();
        private final Condition availableClientsCondition = this.poolLock.newCondition();
        private int currentPoolSize = 0;

        public ConnectionPoolManager(int i) {
            this.maxPoolSize = i;
        }

        public ClientWrapper checkout() throws Exception {
            this.poolLock.lock();
            try {
                if (this.availableClients.isEmpty() && this.currentPoolSize < this.maxPoolSize) {
                    ClientWrapper clientWrapper = new ClientWrapper();
                    this.currentPoolSize++;
                    this.checkedOutClients.add(clientWrapper);
                    return clientWrapper;
                }
                while (this.availableClients.isEmpty()) {
                    this.availableClientsCondition.await();
                }
                ClientWrapper poll = this.availableClients.poll();
                this.checkedOutClients.add(poll);
                return poll;
            } finally {
                this.poolLock.unlock();
            }
        }

        public void checkIn(ClientWrapper clientWrapper) {
            this.poolLock.lock();
            try {
                this.availableClients.add(clientWrapper);
                this.checkedOutClients.remove(clientWrapper);
                this.availableClientsCondition.signal();
            } finally {
                this.poolLock.unlock();
            }
        }

        public void destroy(ClientWrapper clientWrapper) {
            this.poolLock.lock();
            try {
                this.checkedOutClients.remove(clientWrapper);
                this.currentPoolSize--;
                clientWrapper.transport.close();
            } finally {
                this.poolLock.unlock();
            }
        }

        public void closeAll() {
            this.poolLock.lock();
            try {
                Iterator<ClientWrapper> it = this.availableClients.iterator();
                while (it.hasNext()) {
                    it.next().transport.close();
                    this.currentPoolSize--;
                }
                Iterator<ClientWrapper> it2 = this.checkedOutClients.iterator();
                while (it2.hasNext()) {
                    it2.next().transport.close();
                    this.currentPoolSize--;
                }
            } finally {
                this.poolLock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/api/ThriftRpcClient$State.class */
    public enum State {
        INIT,
        READY,
        DEAD
    }

    @Override // org.apache.flume.api.AbstractRpcClient, org.apache.flume.api.RpcClient
    public void append(Event event) throws EventDeliveryException {
        try {
            try {
                if (!isActive()) {
                    throw new EventDeliveryException("Client was closed due to error. Please create a new client");
                }
                ClientWrapper checkout = this.connectionManager.checkout();
                doAppend(checkout, new ThriftFlumeEvent(event.getHeaders(), ByteBuffer.wrap(event.getBody()))).get(this.requestTimeout, TimeUnit.MILLISECONDS);
                if (checkout == null || 0 != 0) {
                    return;
                }
                this.connectionManager.checkIn(checkout);
            } catch (Throwable th) {
                if (th instanceof ExecutionException) {
                    Throwable cause = th.getCause();
                    if (cause instanceof EventDeliveryException) {
                        throw ((EventDeliveryException) cause);
                    }
                    if (cause instanceof TimeoutException) {
                        throw new EventDeliveryException("Append call timeout", cause);
                    }
                }
                if (0 != 0) {
                    this.connectionManager.destroy(null);
                }
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                if (!(th instanceof RuntimeException)) {
                    throw new EventDeliveryException("Failed to send event. ", th);
                }
                throw ((RuntimeException) th);
            }
        } catch (Throwable th2) {
            if (0 != 0 && 0 == 0) {
                this.connectionManager.checkIn(null);
            }
            throw th2;
        }
    }

    @Override // org.apache.flume.api.AbstractRpcClient, org.apache.flume.api.RpcClient
    public void appendBatch(List<Event> list) throws EventDeliveryException {
        try {
            try {
                if (!isActive()) {
                    throw new EventDeliveryException("Client was closed due to error or is not yet configured.");
                }
                ClientWrapper checkout = this.connectionManager.checkout();
                ArrayList arrayList = new ArrayList();
                Iterator<Event> it = list.iterator();
                while (it.hasNext()) {
                    arrayList.clear();
                    for (int i = 0; i < this.batchSize && it.hasNext(); i++) {
                        Event next = it.next();
                        arrayList.add(new ThriftFlumeEvent(next.getHeaders(), ByteBuffer.wrap(next.getBody())));
                    }
                    if (!arrayList.isEmpty()) {
                        doAppendBatch(checkout, arrayList).get(this.requestTimeout, TimeUnit.MILLISECONDS);
                    }
                }
                if (checkout == null || 0 != 0) {
                    return;
                }
                this.connectionManager.checkIn(checkout);
            } catch (Throwable th) {
                if (th instanceof ExecutionException) {
                    Throwable cause = th.getCause();
                    if (cause instanceof EventDeliveryException) {
                        throw ((EventDeliveryException) cause);
                    }
                    if (cause instanceof TimeoutException) {
                        throw new EventDeliveryException("Append call timeout", cause);
                    }
                }
                if (0 != 0) {
                    this.connectionManager.destroy(null);
                }
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                if (!(th instanceof RuntimeException)) {
                    throw new EventDeliveryException("Failed to send event. ", th);
                }
                throw ((RuntimeException) th);
            }
        } catch (Throwable th2) {
            if (0 != 0 && 0 == 0) {
                this.connectionManager.checkIn(null);
            }
            throw th2;
        }
    }

    private Future<Void> doAppend(final ClientWrapper clientWrapper, final ThriftFlumeEvent thriftFlumeEvent) throws Exception {
        return this.callTimeoutPool.submit(new Callable<Void>() { // from class: org.apache.flume.api.ThriftRpcClient.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                Status append = clientWrapper.client.append(thriftFlumeEvent);
                if (append != Status.OK) {
                    throw new EventDeliveryException("Failed to deliver events. Server returned status : " + append.name());
                }
                return null;
            }
        });
    }

    private Future<Void> doAppendBatch(final ClientWrapper clientWrapper, final List<ThriftFlumeEvent> list) throws Exception {
        return this.callTimeoutPool.submit(new Callable<Void>() { // from class: org.apache.flume.api.ThriftRpcClient.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                Status appendBatch = clientWrapper.client.appendBatch(list);
                if (appendBatch != Status.OK) {
                    throw new EventDeliveryException("Failed to deliver events. Server returned status : " + appendBatch.name());
                }
                return null;
            }
        });
    }

    @Override // org.apache.flume.api.AbstractRpcClient, org.apache.flume.api.RpcClient
    public boolean isActive() {
        this.stateLock.lock();
        try {
            return this.connState == State.READY;
        } finally {
            this.stateLock.unlock();
        }
    }

    @Override // org.apache.flume.api.AbstractRpcClient, org.apache.flume.api.RpcClient
    public void close() throws FlumeException {
        try {
            try {
                this.stateLock.lock();
                this.connState = State.DEAD;
                this.connectionManager.closeAll();
                this.callTimeoutPool.shutdown();
                if (!this.callTimeoutPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.callTimeoutPool.shutdownNow();
                }
            } catch (Throwable th) {
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                if (!(th instanceof RuntimeException)) {
                    throw new FlumeException("Failed to close RPC client. ", th);
                }
                throw ((RuntimeException) th);
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flume.api.AbstractRpcClient
    public void configure(Properties properties) throws FlumeException {
        if (isActive()) {
            throw new FlumeException("Attempting to re-configured an already configured client!");
        }
        this.stateLock.lock();
        try {
            try {
                HostInfo hostInfo = HostInfo.getHostInfoList(properties).get(0);
                this.hostname = hostInfo.getHostName();
                this.port = hostInfo.getPortNumber();
                this.protocol = properties.getProperty("protocol");
                if (this.protocol == null) {
                    this.protocol = "compact";
                }
                if (!this.protocol.equalsIgnoreCase("binary") && !this.protocol.equalsIgnoreCase("compact")) {
                    LOGGER.warn("'binary' or 'compact' are the only valid Thrift protocol types to choose from. Defaulting to 'compact'.");
                    this.protocol = "compact";
                }
                this.batchSize = parseBatchSize(properties);
                this.requestTimeout = Long.parseLong(properties.getProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, String.valueOf(RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS)));
                if (this.requestTimeout < 1000) {
                    LOGGER.warn("Request timeout specified less than 1s. Using default value instead.");
                    this.requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
                }
                int parseInt = Integer.parseInt(properties.getProperty(RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE, String.valueOf(5)));
                if (parseInt < 1) {
                    LOGGER.warn("Connection Pool Size specified is less than 1. Using default value instead.");
                    parseInt = 5;
                }
                configureSSL(properties);
                this.connectionManager = new ConnectionPoolManager(parseInt);
                this.connState = State.READY;
                this.stateLock.unlock();
            } catch (Throwable th) {
                this.connState = State.DEAD;
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                if (!(th instanceof RuntimeException)) {
                    throw new FlumeException("Error while configuring RpcClient. ", th);
                }
                throw ((RuntimeException) th);
            }
        } catch (Throwable th2) {
            this.stateLock.unlock();
            throw th2;
        }
    }

    protected TTransport getTransport(TSocket tSocket) throws Exception {
        return new TFastFramedTransport(tSocket);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static SSLContext createSSLContext(String str, String str2, String str3) throws FlumeException {
        try {
            SSLContext sSLContext = SSLContext.getInstance("TLS");
            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            KeyStore keyStore = null;
            if (str != null && str3 != null) {
                keyStore = KeyStore.getInstance(str3);
                keyStore.load(new FileInputStream(str), str2 != null ? str2.toCharArray() : null);
                trustManagerFactory.init(keyStore);
            }
            trustManagerFactory.init(keyStore);
            sSLContext.init(null, trustManagerFactory.getTrustManagers(), null);
            return sSLContext;
        } catch (Exception e) {
            throw new FlumeException("Error creating the transport", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static TSocket createSSLSocket(SSLSocketFactory sSLSocketFactory, String str, int i, int i2, Set<String> set, Set<String> set2, Set<String> set3, Set<String> set4) throws FlumeException {
        try {
            SSLSocket sSLSocket = (SSLSocket) sSLSocketFactory.createSocket(str, i);
            sSLSocket.setSoTimeout(i2);
            ArrayList arrayList = new ArrayList();
            for (String str2 : sSLSocket.getEnabledProtocols()) {
                if ((set2.isEmpty() || set2.contains(str2)) && !set.contains(str2)) {
                    arrayList.add(str2);
                }
            }
            sSLSocket.setEnabledProtocols((String[]) arrayList.toArray(new String[0]));
            ArrayList arrayList2 = new ArrayList();
            for (String str3 : sSLSocket.getEnabledCipherSuites()) {
                if ((set4.isEmpty() || set4.contains(str3)) && !set3.contains(str3)) {
                    arrayList2.add(str3);
                }
            }
            sSLSocket.setEnabledCipherSuites((String[]) arrayList2.toArray(new String[0]));
            return new TSocket(sSLSocket);
        } catch (Exception e) {
            throw new FlumeException("Could not connect to " + str + " on port " + i, e);
        }
    }
}
