package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter;
import org.apache.hadoop.shaded.org.codehaus.jettison.json.JSONArray;
import org.apache.hadoop.shaded.org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.class */
public class NamenodeHeartbeatService extends PeriodicService {
    private static final Logger LOG = LoggerFactory.getLogger(NamenodeHeartbeatService.class);
    private Configuration conf;
    private final ActiveNamenodeResolver resolver;
    private final String nameserviceId;
    private final String namenodeId;
    private NNHAServiceTarget localTarget;
    private String rpcAddress;
    private String serviceAddress;
    private String lifelineAddress;
    private String webAddress;

    public NamenodeHeartbeatService(ActiveNamenodeResolver activeNamenodeResolver, String str, String str2) {
        super(NamenodeHeartbeatService.class.getSimpleName() + (str == null ? "" : " " + str) + (str2 == null ? "" : " " + str2));
        this.resolver = activeNamenodeResolver;
        this.nameserviceId = str;
        this.namenodeId = str2;
    }

    protected void serviceInit(Configuration configuration) throws Exception {
        this.conf = configuration;
        String str = this.nameserviceId;
        if (this.namenodeId == null || this.namenodeId.isEmpty()) {
            this.localTarget = null;
        } else {
            this.localTarget = new NNHAServiceTarget(this.conf, this.nameserviceId, this.namenodeId);
            str = str + "-" + this.namenodeId;
        }
        this.rpcAddress = getRpcAddress(this.conf, this.nameserviceId, this.namenodeId);
        LOG.info("{} RPC address: {}", str, this.rpcAddress);
        this.serviceAddress = DFSUtil.getNamenodeServiceAddr(this.conf, this.nameserviceId, this.namenodeId);
        if (this.serviceAddress == null) {
            LOG.error("Cannot locate RPC service address for NN {}, using RPC address {}", str, this.rpcAddress);
            this.serviceAddress = this.rpcAddress;
        }
        LOG.info("{} Service RPC address: {}", str, this.serviceAddress);
        this.lifelineAddress = DFSUtil.getNamenodeLifelineAddr(this.conf, this.nameserviceId, this.namenodeId);
        if (this.lifelineAddress == null) {
            this.lifelineAddress = this.serviceAddress;
        }
        LOG.info("{} Lifeline RPC address: {}", str, this.lifelineAddress);
        this.webAddress = DFSUtil.getNamenodeWebAddr(this.conf, this.nameserviceId, this.namenodeId);
        LOG.info("{} Web address: {}", str, this.webAddress);
        setIntervalMs(this.conf.getLong(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS, DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
        super.serviceInit(configuration);
    }

    @Override // org.apache.hadoop.hdfs.server.federation.router.PeriodicService
    public void periodicInvoke() {
        updateState();
    }

    private static String getRpcAddress(Configuration configuration, String str, String str2) {
        String str3 = configuration.get(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
        if (str != null || str2 != null) {
            str3 = configuration.get(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, str, str2));
            if (str3 == null) {
                Map<String, InetSocketAddress> rpcAddressesForNameserviceId = DFSUtil.getRpcAddressesForNameserviceId(configuration, str, null);
                InetSocketAddress inetSocketAddress = null;
                if (str2 != null) {
                    inetSocketAddress = rpcAddressesForNameserviceId.get(str2);
                } else if (rpcAddressesForNameserviceId.size() == 1) {
                    inetSocketAddress = rpcAddressesForNameserviceId.values().iterator().next();
                }
                if (inetSocketAddress != null) {
                    str3 = inetSocketAddress.getAddress().getHostName() + ":" + inetSocketAddress.getPort();
                }
            }
        }
        return str3;
    }

    private void updateState() {
        NamenodeStatusReport namenodeStatusReport = getNamenodeStatusReport();
        if (!namenodeStatusReport.registrationValid()) {
            LOG.error("Namenode is not operational: {}", getNamenodeDesc());
        } else if (namenodeStatusReport.haStateValid()) {
            LOG.debug("Received service state: {} from HA namenode: {}", namenodeStatusReport.getState(), getNamenodeDesc());
        } else if (this.localTarget != null) {
            return;
        } else {
            LOG.debug("Reporting non-HA namenode as operational: " + getNamenodeDesc());
        }
        try {
            if (!this.resolver.registerNamenode(namenodeStatusReport)) {
                LOG.warn("Cannot register namenode {}", namenodeStatusReport);
            }
        } catch (IOException e) {
            LOG.info("Cannot register namenode in the State Store");
        } catch (Exception e2) {
            LOG.error("Unhandled exception updating NN registration for {}", getNamenodeDesc(), e2);
        }
    }

    protected NamenodeStatusReport getNamenodeStatusReport() {
        URI uri;
        NamespaceInfo versionRequest;
        NamenodeStatusReport namenodeStatusReport = new NamenodeStatusReport(this.nameserviceId, this.namenodeId, this.rpcAddress, this.serviceAddress, this.lifelineAddress, this.webAddress);
        try {
            LOG.debug("Probing NN at service address: {}", this.serviceAddress);
            uri = new URI("hdfs://" + this.serviceAddress);
            NamenodeProtocol namenodeProtocol = (NamenodeProtocol) NameNodeProxies.createProxy(this.conf, uri, NamenodeProtocol.class).getProxy();
            if (namenodeProtocol != null && (versionRequest = namenodeProtocol.versionRequest()) != null) {
                namenodeStatusReport.setNamespaceInfo(versionRequest);
            }
        } catch (IOException e) {
            LOG.error("Cannot communicate with {}: {}", getNamenodeDesc(), e.getMessage());
        } catch (Throwable th) {
            LOG.error("Unexpected exception while communicating with {}: {}", new Object[]{getNamenodeDesc(), th.getMessage(), th});
        }
        if (!namenodeStatusReport.registrationValid()) {
            return namenodeStatusReport;
        }
        try {
            ClientProtocol clientProtocol = (ClientProtocol) NameNodeProxies.createProxy(this.conf, uri, ClientProtocol.class).getProxy();
            if (clientProtocol != null) {
                namenodeStatusReport.setSafeMode(clientProtocol.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, false));
            }
        } catch (Exception e2) {
            LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e2);
        }
        updateJMXParameters(this.webAddress, namenodeStatusReport);
        if (this.localTarget != null) {
            try {
                namenodeStatusReport.setHAServiceState(this.localTarget.getProxy(this.conf, 30000).getServiceStatus().getState());
            } catch (Throwable th2) {
                if (th2.getMessage().startsWith("HA for namenode is not enabled")) {
                    LOG.error("HA for {} is not enabled", getNamenodeDesc());
                    this.localTarget = null;
                } else {
                    LOG.error("Cannot fetch HA status for {}: {}", new Object[]{getNamenodeDesc(), th2.getMessage(), th2});
                }
            }
        }
        return namenodeStatusReport;
    }

    public String getNamenodeDesc() {
        return (this.namenodeId == null || this.namenodeId.isEmpty()) ? this.nameserviceId + ":" + this.serviceAddress : this.nameserviceId + "-" + this.namenodeId + ":" + this.serviceAddress;
    }

    private void updateJMXParameters(String str, NamenodeStatusReport namenodeStatusReport) {
        try {
            JSONArray jmx = FederationUtil.getJmx("Hadoop:service=NameNode,name=FSNamesystem*", str);
            if (jmx != null) {
                for (int i = 0; i < jmx.length(); i++) {
                    JSONObject jSONObject = jmx.getJSONObject(i);
                    String string = jSONObject.getString(PBImageXmlWriter.SECTION_NAME);
                    if (string.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
                        namenodeStatusReport.setDatanodeInfo(jSONObject.getInt("NumLiveDataNodes"), jSONObject.getInt("NumDeadDataNodes"), jSONObject.getInt("NumDecommissioningDataNodes"), jSONObject.getInt("NumDecomLiveDataNodes"), jSONObject.getInt("NumDecomDeadDataNodes"));
                    } else if (string.equals("Hadoop:service=NameNode,name=FSNamesystem")) {
                        namenodeStatusReport.setNamesystemInfo(jSONObject.getLong("CapacityRemaining"), jSONObject.getLong("CapacityTotal"), jSONObject.getLong("FilesTotal"), jSONObject.getLong("BlocksTotal"), jSONObject.getLong("MissingBlocks"), jSONObject.getLong("PendingReplicationBlocks"), jSONObject.getLong("UnderReplicatedBlocks"), jSONObject.getLong("PendingDeletionBlocks"));
                    }
                }
            }
        } catch (Exception e) {
            LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
        }
    }
}
