001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.BufferedReader;
022import java.io.File;
023import java.io.IOException;
024import java.io.InputStreamReader;
025import java.io.InterruptedIOException;
026import java.io.OutputStream;
027import java.io.Reader;
028import java.net.BindException;
029import java.net.InetSocketAddress;
030import java.net.Socket;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.Random;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.zookeeper.server.NIOServerCnxnFactory;
039import org.apache.zookeeper.server.ZooKeeperServer;
040import org.apache.zookeeper.server.persistence.FileTxnLog;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045
046/**
047 * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
048 * of redoing it, we should contribute updates to their code which let us more
049 * easily access testing helper objects.
050 */
051@InterfaceAudience.Public
052public class MiniZooKeeperCluster {
053  private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class);
054
055  private static final int TICK_TIME = 2000;
056  private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
057  private int connectionTimeout;
058
059  private boolean started;
060
061  /** The default port. If zero, we use a random port. */
062  private int defaultClientPort = 0;
063
064  private List<NIOServerCnxnFactory> standaloneServerFactoryList;
065  private List<ZooKeeperServer> zooKeeperServers;
066  private List<Integer> clientPortList;
067
068  private int activeZKServerIndex;
069  private int tickTime = 0;
070
071  private Configuration configuration;
072
073  public MiniZooKeeperCluster() {
074    this(new Configuration());
075  }
076
077  public MiniZooKeeperCluster(Configuration configuration) {
078    this.started = false;
079    this.configuration = configuration;
080    activeZKServerIndex = -1;
081    zooKeeperServers = new ArrayList<>();
082    clientPortList = new ArrayList<>();
083    standaloneServerFactoryList = new ArrayList<>();
084    connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster",
085      DEFAULT_CONNECTION_TIMEOUT);
086  }
087
088  /**
089   * Add a client port to the list.
090   *
091   * @param clientPort the specified port
092   */
093  public void addClientPort(int clientPort) {
094    clientPortList.add(clientPort);
095  }
096
097  /**
098   * Get the list of client ports.
099   * @return clientPortList the client port list
100   */
101  @VisibleForTesting
102  public List<Integer> getClientPortList() {
103    return clientPortList;
104  }
105
106  /**
107   * Check whether the client port in a specific position of the client port list is valid.
108   *
109   * @param index the specified position
110   */
111  private boolean hasValidClientPortInList(int index) {
112    return (clientPortList.size() > index && clientPortList.get(index) > 0);
113  }
114
115  public void setDefaultClientPort(int clientPort) {
116    if (clientPort <= 0) {
117      throw new IllegalArgumentException("Invalid default ZK client port: "
118          + clientPort);
119    }
120    this.defaultClientPort = clientPort;
121  }
122
123  /**
124   * Selects a ZK client port.
125   *
126   * @param seedPort the seed port to start with; -1 means first time.
127   * @Returns a valid and unused client port
128   */
129  private int selectClientPort(int seedPort) {
130    int i;
131    int returnClientPort = seedPort + 1;
132    if (returnClientPort == 0) {
133      // If the new port is invalid, find one - starting with the default client port.
134      // If the default client port is not specified, starting with a random port.
135      // The random port is selected from the range between 49152 to 65535. These ports cannot be
136      // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
137      if (defaultClientPort > 0) {
138        returnClientPort = defaultClientPort;
139      } else {
140        returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
141      }
142    }
143    // Make sure that the port is unused.
144    while (true) {
145      for (i = 0; i < clientPortList.size(); i++) {
146        if (returnClientPort == clientPortList.get(i)) {
147          // Already used. Update the port and retry.
148          returnClientPort++;
149          break;
150        }
151      }
152      if (i == clientPortList.size()) {
153        break; // found a unused port, exit
154      }
155    }
156    return returnClientPort;
157  }
158
159  public void setTickTime(int tickTime) {
160    this.tickTime = tickTime;
161  }
162
163  public int getBackupZooKeeperServerNum() {
164    return zooKeeperServers.size()-1;
165  }
166
167  public int getZooKeeperServerNum() {
168    return zooKeeperServers.size();
169  }
170
171  // / XXX: From o.a.zk.t.ClientBase
172  private static void setupTestEnv() {
173    // during the tests we run with 100K prealloc in the logs.
174    // on windows systems prealloc of 64M was seen to take ~15seconds
175    // resulting in test failure (client timeout on first session).
176    // set env and directly in order to handle static init/gc issues
177    System.setProperty("zookeeper.preAllocSize", "100");
178    FileTxnLog.setPreallocSize(100 * 1024);
179    // allow all 4 letter words
180    System.setProperty("zookeeper.4lw.commands.whitelist","*");
181  }
182
183  public int startup(File baseDir) throws IOException, InterruptedException {
184    int numZooKeeperServers = clientPortList.size();
185    if (numZooKeeperServers == 0) {
186      numZooKeeperServers = 1; // need at least 1 ZK server for testing
187    }
188    return startup(baseDir, numZooKeeperServers);
189  }
190
191  /**
192   * @param baseDir the base directory to use
193   * @param numZooKeeperServers the number of ZooKeeper servers
194   * @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick
195   *         another port.
196   * @throws IOException if an operation fails during the startup
197   * @throws InterruptedException if the startup fails
198   */
199  public int startup(File baseDir, int numZooKeeperServers) throws IOException,
200          InterruptedException {
201    if (numZooKeeperServers <= 0) {
202      return -1;
203    }
204
205    setupTestEnv();
206    shutdown();
207
208    int tentativePort = -1; // the seed port
209    int currentClientPort;
210
211    // running all the ZK servers
212    for (int i = 0; i < numZooKeeperServers; i++) {
213      File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
214      createDir(dir);
215      int tickTimeToUse;
216      if (this.tickTime > 0) {
217        tickTimeToUse = this.tickTime;
218      } else {
219        tickTimeToUse = TICK_TIME;
220      }
221
222      // Set up client port - if we have already had a list of valid ports, use it.
223      if (hasValidClientPortInList(i)) {
224        currentClientPort = clientPortList.get(i);
225      } else {
226        tentativePort = selectClientPort(tentativePort); // update the seed
227        currentClientPort = tentativePort;
228      }
229
230      ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
231      // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
232      server.setMinSessionTimeout(configuration.getInt(
233              "hbase.zookeeper.property.minSessionTimeout", -1));
234      server.setMaxSessionTimeout(configuration.getInt(
235              "hbase.zookeeper.property.maxSessionTimeout", -1));
236      NIOServerCnxnFactory standaloneServerFactory;
237      while (true) {
238        try {
239          standaloneServerFactory = new NIOServerCnxnFactory();
240          standaloneServerFactory.configure(
241            new InetSocketAddress(currentClientPort),
242            configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
243                    HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS));
244        } catch (BindException e) {
245          LOG.debug("Failed binding ZK Server to client port: " +
246              currentClientPort, e);
247          // We're told to use some port but it's occupied, fail
248          if (hasValidClientPortInList(i)) {
249            return -1;
250          }
251          // This port is already in use, try to use another.
252          tentativePort = selectClientPort(tentativePort);
253          currentClientPort = tentativePort;
254          continue;
255        }
256        break;
257      }
258
259      // Start up this ZK server
260      standaloneServerFactory.startup(server);
261      // Runs a 'stat' against the servers.
262      if (!waitForServerUp(currentClientPort, connectionTimeout)) {
263        throw new IOException("Waiting for startup of standalone server");
264      }
265
266      // We have selected a port as a client port.  Update clientPortList if necessary.
267      if (clientPortList.size() <= i) { // it is not in the list, add the port
268        clientPortList.add(currentClientPort);
269      }
270      else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
271        clientPortList.remove(i);
272        clientPortList.add(i, currentClientPort);
273      }
274
275      standaloneServerFactoryList.add(standaloneServerFactory);
276      zooKeeperServers.add(server);
277    }
278
279    // set the first one to be active ZK; Others are backups
280    activeZKServerIndex = 0;
281    started = true;
282    int clientPort = clientPortList.get(activeZKServerIndex);
283    LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
284        "on client port=" + clientPort);
285    return clientPort;
286  }
287
288  private void createDir(File dir) throws IOException {
289    try {
290      if (!dir.exists()) {
291        dir.mkdirs();
292      }
293    } catch (SecurityException e) {
294      throw new IOException("creating dir: " + dir, e);
295    }
296  }
297
298  /**
299   * @throws IOException if waiting for the shutdown of a server fails
300   */
301  public void shutdown() throws IOException {
302    // shut down all the zk servers
303    for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
304      NIOServerCnxnFactory standaloneServerFactory =
305        standaloneServerFactoryList.get(i);
306      int clientPort = clientPortList.get(i);
307
308      standaloneServerFactory.shutdown();
309      if (!waitForServerDown(clientPort, connectionTimeout)) {
310        throw new IOException("Waiting for shutdown of standalone server");
311      }
312    }
313    standaloneServerFactoryList.clear();
314
315    for (ZooKeeperServer zkServer: zooKeeperServers) {
316      //explicitly close ZKDatabase since ZookeeperServer does not close them
317      zkServer.getZKDatabase().close();
318    }
319    zooKeeperServers.clear();
320
321    // clear everything
322    if (started) {
323      started = false;
324      activeZKServerIndex = 0;
325      clientPortList.clear();
326      LOG.info("Shutdown MiniZK cluster with all ZK servers");
327    }
328  }
329
330  /**
331   * @return clientPort return clientPort if there is another ZK backup can run
332   *         when killing the current active; return -1, if there is no backups.
333   * @throws IOException if waiting for the shutdown of a server fails
334   */
335  public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException {
336    if (!started || activeZKServerIndex < 0) {
337      return -1;
338    }
339
340    // Shutdown the current active one
341    NIOServerCnxnFactory standaloneServerFactory =
342      standaloneServerFactoryList.get(activeZKServerIndex);
343    int clientPort = clientPortList.get(activeZKServerIndex);
344
345    standaloneServerFactory.shutdown();
346    if (!waitForServerDown(clientPort, connectionTimeout)) {
347      throw new IOException("Waiting for shutdown of standalone server");
348    }
349
350    zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
351
352    // remove the current active zk server
353    standaloneServerFactoryList.remove(activeZKServerIndex);
354    clientPortList.remove(activeZKServerIndex);
355    zooKeeperServers.remove(activeZKServerIndex);
356    LOG.info("Kill the current active ZK servers in the cluster " +
357        "on client port: " + clientPort);
358
359    if (standaloneServerFactoryList.isEmpty()) {
360      // there is no backup servers;
361      return -1;
362    }
363    clientPort = clientPortList.get(activeZKServerIndex);
364    LOG.info("Activate a backup zk server in the cluster " +
365        "on client port: " + clientPort);
366    // return the next back zk server's port
367    return clientPort;
368  }
369
370  /**
371   * Kill one back up ZK servers.
372   *
373   * @throws IOException if waiting for the shutdown of a server fails
374   */
375  public void killOneBackupZooKeeperServer() throws IOException, InterruptedException {
376    if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) {
377      return ;
378    }
379
380    int backupZKServerIndex = activeZKServerIndex+1;
381    // Shutdown the current active one
382    NIOServerCnxnFactory standaloneServerFactory =
383      standaloneServerFactoryList.get(backupZKServerIndex);
384    int clientPort = clientPortList.get(backupZKServerIndex);
385
386    standaloneServerFactory.shutdown();
387    if (!waitForServerDown(clientPort, connectionTimeout)) {
388      throw new IOException("Waiting for shutdown of standalone server");
389    }
390
391    zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
392
393    // remove this backup zk server
394    standaloneServerFactoryList.remove(backupZKServerIndex);
395    clientPortList.remove(backupZKServerIndex);
396    zooKeeperServers.remove(backupZKServerIndex);
397    LOG.info("Kill one backup ZK servers in the cluster " +
398        "on client port: " + clientPort);
399  }
400
401  // XXX: From o.a.zk.t.ClientBase
402  private static boolean waitForServerDown(int port, long timeout) throws IOException {
403    long start = System.currentTimeMillis();
404    while (true) {
405      try {
406        Socket sock = new Socket("localhost", port);
407        try {
408          OutputStream outstream = sock.getOutputStream();
409          outstream.write("stat".getBytes());
410          outstream.flush();
411        } finally {
412          sock.close();
413        }
414      } catch (IOException e) {
415        return true;
416      }
417
418      if (System.currentTimeMillis() > start + timeout) {
419        break;
420      }
421      try {
422        Thread.sleep(250);
423      } catch (InterruptedException e) {
424        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
425      }
426    }
427    return false;
428  }
429
430  // XXX: From o.a.zk.t.ClientBase
431  private static boolean waitForServerUp(int port, long timeout) throws IOException {
432    long start = System.currentTimeMillis();
433    while (true) {
434      try {
435        Socket sock = new Socket("localhost", port);
436        BufferedReader reader = null;
437        try {
438          OutputStream outstream = sock.getOutputStream();
439          outstream.write("stat".getBytes());
440          outstream.flush();
441
442          Reader isr = new InputStreamReader(sock.getInputStream());
443          reader = new BufferedReader(isr);
444          String line = reader.readLine();
445          if (line != null && line.startsWith("Zookeeper version:")) {
446            return true;
447          }
448        } finally {
449          sock.close();
450          if (reader != null) {
451            reader.close();
452          }
453        }
454      } catch (IOException e) {
455        // ignore as this is expected
456        LOG.info("server localhost:" + port + " not up " + e);
457      }
458
459      if (System.currentTimeMillis() > start + timeout) {
460        break;
461      }
462      try {
463        Thread.sleep(250);
464      } catch (InterruptedException e) {
465        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
466      }
467    }
468    return false;
469  }
470
471  public int getClientPort() {
472    return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
473        : clientPortList.get(activeZKServerIndex);
474  }
475
476  List<ZooKeeperServer> getZooKeeperServers() {
477    return zooKeeperServers;
478  }
479}