package org.apache.flume.source.http;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.List;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
import org.apache.flume.source.SslContextAwareAbstractSource;
import org.apache.flume.tools.FlumeBeanConfigurator;
import org.apache.flume.tools.HTTPServerConstraintUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.base.Preconditions;
import org.spark_project.guava.base.Throwables;
import org.spark_project.jetty.http.HttpVersion;
import org.spark_project.jetty.jmx.MBeanContainer;
import org.spark_project.jetty.server.ConnectionFactory;
import org.spark_project.jetty.server.HttpConfiguration;
import org.spark_project.jetty.server.HttpConnectionFactory;
import org.spark_project.jetty.server.SecureRequestCustomizer;
import org.spark_project.jetty.server.Server;
import org.spark_project.jetty.server.ServerConnector;
import org.spark_project.jetty.server.SslConnectionFactory;
import org.spark_project.jetty.servlet.ServletContextHandler;
import org.spark_project.jetty.servlet.ServletHolder;
import org.spark_project.jetty.util.ssl.SslContextFactory;
import org.spark_project.jetty.util.thread.QueuedThreadPool;

/* loaded from: input_file:org/apache/flume/source/http/HTTPSource.class */
public class HTTPSource extends SslContextAwareAbstractSource implements EventDrivenSource, Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
    private volatile Integer port;
    private volatile Server srv;
    private volatile String host;
    private HTTPSourceHandler handler;
    private SourceCounter sourceCounter;
    private Context sourceContext;

    /* loaded from: input_file:org/apache/flume/source/http/HTTPSource$FlumeHTTPServlet.class */
    private class FlumeHTTPServlet extends HttpServlet {
        private static final long serialVersionUID = 4891924863218790344L;

        private FlumeHTTPServlet() {
        }

        public void doPost(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException {
            Collections.emptyList();
            try {
                List<Event> events = HTTPSource.this.handler.getEvents(httpServletRequest);
                HTTPSource.this.sourceCounter.incrementAppendBatchReceivedCount();
                HTTPSource.this.sourceCounter.addToEventReceivedCount(events.size());
                try {
                    HTTPSource.this.getChannelProcessor().processEventBatch(events);
                    httpServletResponse.setCharacterEncoding(httpServletRequest.getCharacterEncoding());
                    httpServletResponse.setStatus(200);
                    httpServletResponse.flushBuffer();
                    HTTPSource.this.sourceCounter.incrementAppendBatchAcceptedCount();
                    HTTPSource.this.sourceCounter.addToEventAcceptedCount(events.size());
                } catch (ChannelException e) {
                    HTTPSource.LOG.warn("Error appending event to channel. Channel might be full. Consider increasing the channel capacity or make sure the sinks perform faster.", e);
                    HTTPSource.this.sourceCounter.incrementChannelWriteFail();
                    httpServletResponse.sendError(503, "Error appending event to channel. Channel might be full." + e.getMessage());
                } catch (Exception e2) {
                    HTTPSource.LOG.warn("Unexpected error appending event to channel. ", e2);
                    HTTPSource.this.sourceCounter.incrementGenericProcessingFail();
                    httpServletResponse.sendError(SpoolDirectorySourceConfigurationConstants.DEFAULT_POLL_DELAY, "Unexpected error while appending event to channel. " + e2.getMessage());
                }
            } catch (HTTPBadRequestException e3) {
                HTTPSource.LOG.warn("Received bad request from client. ", e3);
                HTTPSource.this.sourceCounter.incrementEventReadFail();
                httpServletResponse.sendError(400, "Bad request from client. " + e3.getMessage());
            } catch (Exception e4) {
                HTTPSource.LOG.warn("Deserializer threw unexpected exception. ", e4);
                HTTPSource.this.sourceCounter.incrementEventReadFail();
                httpServletResponse.sendError(SpoolDirectorySourceConfigurationConstants.DEFAULT_POLL_DELAY, "Deserializer threw unexpected exception. " + e4.getMessage());
            }
        }

        public void doGet(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException {
            doPost(httpServletRequest, httpServletResponse);
        }
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        configureSsl(context);
        this.sourceContext = context;
        try {
            this.port = context.getInteger("port");
            this.host = context.getString("bind", HTTPSourceConfigurationConstants.DEFAULT_BIND);
            Preconditions.checkState((this.host == null || this.host.isEmpty()) ? false : true, "HTTPSource hostname specified is empty");
            Preconditions.checkNotNull(this.port, "HTTPSource requires a port number to be specified");
            this.handler = (HTTPSourceHandler) Class.forName(context.getString(HTTPSourceConfigurationConstants.CONFIG_HANDLER, HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim()).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            this.handler.configure(new Context(context.getSubProperties(HTTPSourceConfigurationConstants.CONFIG_HANDLER_PREFIX)));
        } catch (ClassCastException e) {
            LOG.error("Deserializer is not an instance of HTTPSourceHandler.Deserializer must implement HTTPSourceHandler.");
            Throwables.propagate(e);
        } catch (ClassNotFoundException e2) {
            LOG.error("Error while configuring HTTPSource. Exception follows.", e2);
            Throwables.propagate(e2);
        } catch (Exception e3) {
            LOG.error("Error configuring HTTPSource!", e3);
            Throwables.propagate(e3);
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        Preconditions.checkState(this.srv == null, "Running HTTP Server found in source: " + getName() + " before I started one.Will not attempt to start.");
        QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
        if (this.sourceContext.getSubProperties("QueuedThreadPool.").size() > 0) {
            FlumeBeanConfigurator.setConfigurationFields(queuedThreadPool, this.sourceContext);
        }
        this.srv = new Server(queuedThreadPool);
        MBeanContainer mBeanContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer());
        this.srv.addEventListener(mBeanContainer);
        this.srv.addBean(mBeanContainer);
        HttpConfiguration httpConfiguration = new HttpConfiguration();
        httpConfiguration.addCustomizer(new SecureRequestCustomizer());
        FlumeBeanConfigurator.setConfigurationFields(httpConfiguration, this.sourceContext);
        ServerConnector serverConnector = (ServerConnector) getSslContextSupplier().get().map(sSLContext -> {
            SslContextFactory sslContextFactory = new SslContextFactory();
            sslContextFactory.setSslContext(sSLContext);
            sslContextFactory.setExcludeProtocols((String[]) getExcludeProtocols().toArray(new String[0]));
            sslContextFactory.setIncludeProtocols((String[]) getIncludeProtocols().toArray(new String[0]));
            sslContextFactory.setExcludeCipherSuites((String[]) getExcludeCipherSuites().toArray(new String[0]));
            sslContextFactory.setIncludeCipherSuites((String[]) getIncludeCipherSuites().toArray(new String[0]));
            FlumeBeanConfigurator.setConfigurationFields(sslContextFactory, this.sourceContext);
            httpConfiguration.setSecurePort(this.port.intValue());
            httpConfiguration.setSecureScheme("https");
            return new ServerConnector(this.srv, new ConnectionFactory[]{new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()), new HttpConnectionFactory(httpConfiguration)});
        }).orElse(new ServerConnector(this.srv, new ConnectionFactory[]{new HttpConnectionFactory(httpConfiguration)}));
        serverConnector.setPort(this.port.intValue());
        serverConnector.setHost(this.host);
        serverConnector.setReuseAddress(true);
        FlumeBeanConfigurator.setConfigurationFields(serverConnector, this.sourceContext);
        this.srv.addConnector(serverConnector);
        try {
            ServletContextHandler servletContextHandler = new ServletContextHandler(1);
            servletContextHandler.setContextPath("/");
            this.srv.setHandler(servletContextHandler);
            servletContextHandler.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
            servletContextHandler.setSecurityHandler(HTTPServerConstraintUtil.enforceConstraints());
            this.srv.start();
        } catch (Exception e) {
            LOG.error("Error while starting HTTPSource. Exception follows.", e);
            Throwables.propagate(e);
        }
        Preconditions.checkArgument(this.srv.isRunning());
        this.sourceCounter.start();
        super.start();
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        try {
            this.srv.stop();
            this.srv.join();
            this.srv = null;
        } catch (Exception e) {
            LOG.error("Error while stopping HTTPSource. Exception follows.", e);
        }
        this.sourceCounter.stop();
        LOG.info("Http source {} stopped. Metrics: {}", getName(), this.sourceCounter);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flume.source.SslContextAwareAbstractSource
    public void configureSsl(Context context) {
        handleDeprecatedParameter(context, RpcClientConfigurationConstants.CONFIG_SSL, HTTPSourceConfigurationConstants.SSL_ENABLED);
        handleDeprecatedParameter(context, RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS, HTTPSourceConfigurationConstants.EXCLUDE_PROTOCOLS);
        handleDeprecatedParameter(context, "keystore-password", HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD);
        super.configureSsl(context);
    }

    private void handleDeprecatedParameter(Context context, String str, String str2) {
        if (context.containsKey(str) || !context.containsKey(str2)) {
            return;
        }
        context.put(str, context.getString(str2));
    }
}
