001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.io.BufferedReader; 022import java.io.File; 023import java.io.IOException; 024import java.io.InputStreamReader; 025import java.io.InterruptedIOException; 026import java.io.OutputStream; 027import java.io.Reader; 028import java.net.BindException; 029import java.net.InetSocketAddress; 030import java.net.Socket; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.Random; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.zookeeper.server.NIOServerCnxnFactory; 039import org.apache.zookeeper.server.ZooKeeperServer; 040import org.apache.zookeeper.server.persistence.FileTxnLog; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 045 046/** 047 * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead 048 * of redoing it, we should contribute updates to their code which let us more 049 * easily access testing helper objects. 050 */ 051@InterfaceAudience.Public 052public class MiniZooKeeperCluster { 053 private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class); 054 055 private static final int TICK_TIME = 2000; 056 private static final int DEFAULT_CONNECTION_TIMEOUT = 30000; 057 private int connectionTimeout; 058 059 private boolean started; 060 061 /** The default port. If zero, we use a random port. */ 062 private int defaultClientPort = 0; 063 064 private List<NIOServerCnxnFactory> standaloneServerFactoryList; 065 private List<ZooKeeperServer> zooKeeperServers; 066 private List<Integer> clientPortList; 067 068 private int activeZKServerIndex; 069 private int tickTime = 0; 070 071 private Configuration configuration; 072 073 public MiniZooKeeperCluster() { 074 this(new Configuration()); 075 } 076 077 public MiniZooKeeperCluster(Configuration configuration) { 078 this.started = false; 079 this.configuration = configuration; 080 activeZKServerIndex = -1; 081 zooKeeperServers = new ArrayList<>(); 082 clientPortList = new ArrayList<>(); 083 standaloneServerFactoryList = new ArrayList<>(); 084 connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", 085 DEFAULT_CONNECTION_TIMEOUT); 086 } 087 088 /** 089 * Add a client port to the list. 090 * 091 * @param clientPort the specified port 092 */ 093 public void addClientPort(int clientPort) { 094 clientPortList.add(clientPort); 095 } 096 097 /** 098 * Get the list of client ports. 099 * @return clientPortList the client port list 100 */ 101 @VisibleForTesting 102 public List<Integer> getClientPortList() { 103 return clientPortList; 104 } 105 106 /** 107 * Check whether the client port in a specific position of the client port list is valid. 108 * 109 * @param index the specified position 110 */ 111 private boolean hasValidClientPortInList(int index) { 112 return (clientPortList.size() > index && clientPortList.get(index) > 0); 113 } 114 115 public void setDefaultClientPort(int clientPort) { 116 if (clientPort <= 0) { 117 throw new IllegalArgumentException("Invalid default ZK client port: " 118 + clientPort); 119 } 120 this.defaultClientPort = clientPort; 121 } 122 123 /** 124 * Selects a ZK client port. 125 * 126 * @param seedPort the seed port to start with; -1 means first time. 127 * @Returns a valid and unused client port 128 */ 129 private int selectClientPort(int seedPort) { 130 int i; 131 int returnClientPort = seedPort + 1; 132 if (returnClientPort == 0) { 133 // If the new port is invalid, find one - starting with the default client port. 134 // If the default client port is not specified, starting with a random port. 135 // The random port is selected from the range between 49152 to 65535. These ports cannot be 136 // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports). 137 if (defaultClientPort > 0) { 138 returnClientPort = defaultClientPort; 139 } else { 140 returnClientPort = 0xc000 + new Random().nextInt(0x3f00); 141 } 142 } 143 // Make sure that the port is unused. 144 while (true) { 145 for (i = 0; i < clientPortList.size(); i++) { 146 if (returnClientPort == clientPortList.get(i)) { 147 // Already used. Update the port and retry. 148 returnClientPort++; 149 break; 150 } 151 } 152 if (i == clientPortList.size()) { 153 break; // found a unused port, exit 154 } 155 } 156 return returnClientPort; 157 } 158 159 public void setTickTime(int tickTime) { 160 this.tickTime = tickTime; 161 } 162 163 public int getBackupZooKeeperServerNum() { 164 return zooKeeperServers.size()-1; 165 } 166 167 public int getZooKeeperServerNum() { 168 return zooKeeperServers.size(); 169 } 170 171 // / XXX: From o.a.zk.t.ClientBase 172 private static void setupTestEnv() { 173 // during the tests we run with 100K prealloc in the logs. 174 // on windows systems prealloc of 64M was seen to take ~15seconds 175 // resulting in test failure (client timeout on first session). 176 // set env and directly in order to handle static init/gc issues 177 System.setProperty("zookeeper.preAllocSize", "100"); 178 FileTxnLog.setPreallocSize(100 * 1024); 179 // allow all 4 letter words 180 System.setProperty("zookeeper.4lw.commands.whitelist","*"); 181 } 182 183 public int startup(File baseDir) throws IOException, InterruptedException { 184 int numZooKeeperServers = clientPortList.size(); 185 if (numZooKeeperServers == 0) { 186 numZooKeeperServers = 1; // need at least 1 ZK server for testing 187 } 188 return startup(baseDir, numZooKeeperServers); 189 } 190 191 /** 192 * @param baseDir the base directory to use 193 * @param numZooKeeperServers the number of ZooKeeper servers 194 * @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick 195 * another port. 196 * @throws IOException if an operation fails during the startup 197 * @throws InterruptedException if the startup fails 198 */ 199 public int startup(File baseDir, int numZooKeeperServers) throws IOException, 200 InterruptedException { 201 if (numZooKeeperServers <= 0) { 202 return -1; 203 } 204 205 setupTestEnv(); 206 shutdown(); 207 208 int tentativePort = -1; // the seed port 209 int currentClientPort; 210 211 // running all the ZK servers 212 for (int i = 0; i < numZooKeeperServers; i++) { 213 File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile(); 214 createDir(dir); 215 int tickTimeToUse; 216 if (this.tickTime > 0) { 217 tickTimeToUse = this.tickTime; 218 } else { 219 tickTimeToUse = TICK_TIME; 220 } 221 222 // Set up client port - if we have already had a list of valid ports, use it. 223 if (hasValidClientPortInList(i)) { 224 currentClientPort = clientPortList.get(i); 225 } else { 226 tentativePort = selectClientPort(tentativePort); // update the seed 227 currentClientPort = tentativePort; 228 } 229 230 ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); 231 // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper 232 server.setMinSessionTimeout(configuration.getInt( 233 "hbase.zookeeper.property.minSessionTimeout", -1)); 234 server.setMaxSessionTimeout(configuration.getInt( 235 "hbase.zookeeper.property.maxSessionTimeout", -1)); 236 NIOServerCnxnFactory standaloneServerFactory; 237 while (true) { 238 try { 239 standaloneServerFactory = new NIOServerCnxnFactory(); 240 standaloneServerFactory.configure( 241 new InetSocketAddress(currentClientPort), 242 configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 243 HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS)); 244 } catch (BindException e) { 245 LOG.debug("Failed binding ZK Server to client port: " + 246 currentClientPort, e); 247 // We're told to use some port but it's occupied, fail 248 if (hasValidClientPortInList(i)) { 249 return -1; 250 } 251 // This port is already in use, try to use another. 252 tentativePort = selectClientPort(tentativePort); 253 currentClientPort = tentativePort; 254 continue; 255 } 256 break; 257 } 258 259 // Start up this ZK server 260 standaloneServerFactory.startup(server); 261 // Runs a 'stat' against the servers. 262 if (!waitForServerUp(currentClientPort, connectionTimeout)) { 263 throw new IOException("Waiting for startup of standalone server"); 264 } 265 266 // We have selected a port as a client port. Update clientPortList if necessary. 267 if (clientPortList.size() <= i) { // it is not in the list, add the port 268 clientPortList.add(currentClientPort); 269 } 270 else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port 271 clientPortList.remove(i); 272 clientPortList.add(i, currentClientPort); 273 } 274 275 standaloneServerFactoryList.add(standaloneServerFactory); 276 zooKeeperServers.add(server); 277 } 278 279 // set the first one to be active ZK; Others are backups 280 activeZKServerIndex = 0; 281 started = true; 282 int clientPort = clientPortList.get(activeZKServerIndex); 283 LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " + 284 "on client port=" + clientPort); 285 return clientPort; 286 } 287 288 private void createDir(File dir) throws IOException { 289 try { 290 if (!dir.exists()) { 291 dir.mkdirs(); 292 } 293 } catch (SecurityException e) { 294 throw new IOException("creating dir: " + dir, e); 295 } 296 } 297 298 /** 299 * @throws IOException if waiting for the shutdown of a server fails 300 */ 301 public void shutdown() throws IOException { 302 // shut down all the zk servers 303 for (int i = 0; i < standaloneServerFactoryList.size(); i++) { 304 NIOServerCnxnFactory standaloneServerFactory = 305 standaloneServerFactoryList.get(i); 306 int clientPort = clientPortList.get(i); 307 308 standaloneServerFactory.shutdown(); 309 if (!waitForServerDown(clientPort, connectionTimeout)) { 310 throw new IOException("Waiting for shutdown of standalone server"); 311 } 312 } 313 standaloneServerFactoryList.clear(); 314 315 for (ZooKeeperServer zkServer: zooKeeperServers) { 316 //explicitly close ZKDatabase since ZookeeperServer does not close them 317 zkServer.getZKDatabase().close(); 318 } 319 zooKeeperServers.clear(); 320 321 // clear everything 322 if (started) { 323 started = false; 324 activeZKServerIndex = 0; 325 clientPortList.clear(); 326 LOG.info("Shutdown MiniZK cluster with all ZK servers"); 327 } 328 } 329 330 /** 331 * @return clientPort return clientPort if there is another ZK backup can run 332 * when killing the current active; return -1, if there is no backups. 333 * @throws IOException if waiting for the shutdown of a server fails 334 */ 335 public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException { 336 if (!started || activeZKServerIndex < 0) { 337 return -1; 338 } 339 340 // Shutdown the current active one 341 NIOServerCnxnFactory standaloneServerFactory = 342 standaloneServerFactoryList.get(activeZKServerIndex); 343 int clientPort = clientPortList.get(activeZKServerIndex); 344 345 standaloneServerFactory.shutdown(); 346 if (!waitForServerDown(clientPort, connectionTimeout)) { 347 throw new IOException("Waiting for shutdown of standalone server"); 348 } 349 350 zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close(); 351 352 // remove the current active zk server 353 standaloneServerFactoryList.remove(activeZKServerIndex); 354 clientPortList.remove(activeZKServerIndex); 355 zooKeeperServers.remove(activeZKServerIndex); 356 LOG.info("Kill the current active ZK servers in the cluster " + 357 "on client port: " + clientPort); 358 359 if (standaloneServerFactoryList.isEmpty()) { 360 // there is no backup servers; 361 return -1; 362 } 363 clientPort = clientPortList.get(activeZKServerIndex); 364 LOG.info("Activate a backup zk server in the cluster " + 365 "on client port: " + clientPort); 366 // return the next back zk server's port 367 return clientPort; 368 } 369 370 /** 371 * Kill one back up ZK servers. 372 * 373 * @throws IOException if waiting for the shutdown of a server fails 374 */ 375 public void killOneBackupZooKeeperServer() throws IOException, InterruptedException { 376 if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) { 377 return ; 378 } 379 380 int backupZKServerIndex = activeZKServerIndex+1; 381 // Shutdown the current active one 382 NIOServerCnxnFactory standaloneServerFactory = 383 standaloneServerFactoryList.get(backupZKServerIndex); 384 int clientPort = clientPortList.get(backupZKServerIndex); 385 386 standaloneServerFactory.shutdown(); 387 if (!waitForServerDown(clientPort, connectionTimeout)) { 388 throw new IOException("Waiting for shutdown of standalone server"); 389 } 390 391 zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close(); 392 393 // remove this backup zk server 394 standaloneServerFactoryList.remove(backupZKServerIndex); 395 clientPortList.remove(backupZKServerIndex); 396 zooKeeperServers.remove(backupZKServerIndex); 397 LOG.info("Kill one backup ZK servers in the cluster " + 398 "on client port: " + clientPort); 399 } 400 401 // XXX: From o.a.zk.t.ClientBase 402 private static boolean waitForServerDown(int port, long timeout) throws IOException { 403 long start = System.currentTimeMillis(); 404 while (true) { 405 try { 406 Socket sock = new Socket("localhost", port); 407 try { 408 OutputStream outstream = sock.getOutputStream(); 409 outstream.write("stat".getBytes()); 410 outstream.flush(); 411 } finally { 412 sock.close(); 413 } 414 } catch (IOException e) { 415 return true; 416 } 417 418 if (System.currentTimeMillis() > start + timeout) { 419 break; 420 } 421 try { 422 Thread.sleep(250); 423 } catch (InterruptedException e) { 424 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 425 } 426 } 427 return false; 428 } 429 430 // XXX: From o.a.zk.t.ClientBase 431 private static boolean waitForServerUp(int port, long timeout) throws IOException { 432 long start = System.currentTimeMillis(); 433 while (true) { 434 try { 435 Socket sock = new Socket("localhost", port); 436 BufferedReader reader = null; 437 try { 438 OutputStream outstream = sock.getOutputStream(); 439 outstream.write("stat".getBytes()); 440 outstream.flush(); 441 442 Reader isr = new InputStreamReader(sock.getInputStream()); 443 reader = new BufferedReader(isr); 444 String line = reader.readLine(); 445 if (line != null && line.startsWith("Zookeeper version:")) { 446 return true; 447 } 448 } finally { 449 sock.close(); 450 if (reader != null) { 451 reader.close(); 452 } 453 } 454 } catch (IOException e) { 455 // ignore as this is expected 456 LOG.info("server localhost:" + port + " not up " + e); 457 } 458 459 if (System.currentTimeMillis() > start + timeout) { 460 break; 461 } 462 try { 463 Thread.sleep(250); 464 } catch (InterruptedException e) { 465 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 466 } 467 } 468 return false; 469 } 470 471 public int getClientPort() { 472 return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1 473 : clientPortList.get(activeZKServerIndex); 474 } 475 476 List<ZooKeeperServer> getZooKeeperServers() { 477 return zooKeeperServers; 478 } 479}