001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.lang.reflect.Field;
029import java.lang.reflect.Modifier;
030import java.net.BindException;
031import java.net.DatagramSocket;
032import java.net.InetAddress;
033import java.net.ServerSocket;
034import java.net.Socket;
035import java.net.UnknownHostException;
036import java.nio.charset.StandardCharsets;
037import java.security.MessageDigest;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Collection;
041import java.util.Collections;
042import java.util.EnumSet;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.TimeUnit;
053import java.util.concurrent.atomic.AtomicReference;
054import java.util.stream.Collectors;
055import org.apache.commons.io.FileUtils;
056import org.apache.commons.lang3.RandomStringUtils;
057import org.apache.commons.logging.impl.Jdk14Logger;
058import org.apache.commons.logging.impl.Log4JLogger;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.ClusterMetrics.Option;
063import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
064import org.apache.hadoop.hbase.Waiter.Predicate;
065import org.apache.hadoop.hbase.client.Admin;
066import org.apache.hadoop.hbase.client.BufferedMutator;
067import org.apache.hadoop.hbase.client.ClusterConnection;
068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
070import org.apache.hadoop.hbase.client.Connection;
071import org.apache.hadoop.hbase.client.ConnectionFactory;
072import org.apache.hadoop.hbase.client.Consistency;
073import org.apache.hadoop.hbase.client.Delete;
074import org.apache.hadoop.hbase.client.Durability;
075import org.apache.hadoop.hbase.client.Get;
076import org.apache.hadoop.hbase.client.HBaseAdmin;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.ImmutableHRegionInfo;
079import org.apache.hadoop.hbase.client.ImmutableHTableDescriptor;
080import org.apache.hadoop.hbase.client.Put;
081import org.apache.hadoop.hbase.client.RegionInfo;
082import org.apache.hadoop.hbase.client.RegionInfoBuilder;
083import org.apache.hadoop.hbase.client.RegionLocator;
084import org.apache.hadoop.hbase.client.Result;
085import org.apache.hadoop.hbase.client.ResultScanner;
086import org.apache.hadoop.hbase.client.Scan;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.fs.HFileSystem;
092import org.apache.hadoop.hbase.io.compress.Compression;
093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
095import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
096import org.apache.hadoop.hbase.io.hfile.HFile;
097import org.apache.hadoop.hbase.ipc.RpcServerInterface;
098import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
099import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
100import org.apache.hadoop.hbase.master.HMaster;
101import org.apache.hadoop.hbase.master.RegionState;
102import org.apache.hadoop.hbase.master.ServerManager;
103import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
104import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
105import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
106import org.apache.hadoop.hbase.master.assignment.RegionStates;
107import org.apache.hadoop.hbase.regionserver.BloomType;
108import org.apache.hadoop.hbase.regionserver.ChunkCreator;
109import org.apache.hadoop.hbase.regionserver.HRegion;
110import org.apache.hadoop.hbase.regionserver.HRegionServer;
111import org.apache.hadoop.hbase.regionserver.HStore;
112import org.apache.hadoop.hbase.regionserver.InternalScanner;
113import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
114import org.apache.hadoop.hbase.regionserver.Region;
115import org.apache.hadoop.hbase.regionserver.RegionScanner;
116import org.apache.hadoop.hbase.regionserver.RegionServerServices;
117import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
118import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
119import org.apache.hadoop.hbase.security.User;
120import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
121import org.apache.hadoop.hbase.trace.TraceUtil;
122import org.apache.hadoop.hbase.util.Bytes;
123import org.apache.hadoop.hbase.util.CommonFSUtils;
124import org.apache.hadoop.hbase.util.FSTableDescriptors;
125import org.apache.hadoop.hbase.util.FSUtils;
126import org.apache.hadoop.hbase.util.JVMClusterUtil;
127import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
128import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
129import org.apache.hadoop.hbase.util.Pair;
130import org.apache.hadoop.hbase.util.RegionSplitter;
131import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
132import org.apache.hadoop.hbase.util.RetryCounter;
133import org.apache.hadoop.hbase.util.Threads;
134import org.apache.hadoop.hbase.wal.WAL;
135import org.apache.hadoop.hbase.wal.WALFactory;
136import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
137import org.apache.hadoop.hbase.zookeeper.ZKConfig;
138import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
139import org.apache.hadoop.hdfs.DFSClient;
140import org.apache.hadoop.hdfs.DistributedFileSystem;
141import org.apache.hadoop.hdfs.MiniDFSCluster;
142import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
143import org.apache.hadoop.mapred.JobConf;
144import org.apache.hadoop.mapred.MiniMRCluster;
145import org.apache.hadoop.mapred.TaskLog;
146import org.apache.hadoop.minikdc.MiniKdc;
147import org.apache.log4j.LogManager;
148import org.apache.yetus.audience.InterfaceAudience;
149import org.apache.zookeeper.WatchedEvent;
150import org.apache.zookeeper.ZooKeeper;
151import org.apache.zookeeper.ZooKeeper.States;
152import org.slf4j.Logger;
153import org.slf4j.LoggerFactory;
154import org.slf4j.impl.Log4jLoggerAdapter;
155
156import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
157
158/**
159 * Facility for testing HBase. Replacement for
160 * old HBaseTestCase and HBaseClusterTestCase functionality.
161 * Create an instance and keep it around testing HBase.  This class is
162 * meant to be your one-stop shop for anything you might need testing.  Manages
163 * one cluster at a time only. Managed cluster can be an in-process
164 * {@link MiniHBaseCluster}, or a deployed cluster of type {@code DistributedHBaseCluster}.
165 * Not all methods work with the real cluster.
166 * Depends on log4j being on classpath and
167 * hbase-site.xml for logging and test-run configuration.  It does not set
168 * logging levels.
169 * In the configuration properties, default values for master-info-port and
170 * region-server-port are overridden such that a random port will be assigned (thus
171 * avoiding port contention if another local HBase instance is already running).
172 * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
173 * setting it to true.
174 * For triggering pre commit
175 */
176@InterfaceAudience.Public
177@SuppressWarnings("deprecation")
178public class HBaseTestingUtility extends HBaseZKTestingUtility {
179
180  /**
181   * System property key to get test directory value. Name is as it is because mini dfs has
182   * hard-codings to put test data here. It should NOT be used directly in HBase, as it's a property
183   * used in mini dfs.
184   * @deprecated can be used only with mini dfs
185   */
186  @Deprecated
187  private static final String TEST_DIRECTORY_KEY = "test.build.data";
188
189  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
190  /**
191   * The default number of regions per regionserver when creating a pre-split
192   * table.
193   */
194  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
195
196
197  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
198  public static final boolean PRESPLIT_TEST_TABLE = true;
199
200  private MiniDFSCluster dfsCluster = null;
201
202  private volatile HBaseCluster hbaseCluster = null;
203  private MiniMRCluster mrCluster = null;
204
205  /** If there is a mini cluster running for this testing utility instance. */
206  private volatile boolean miniClusterRunning;
207
208  private String hadoopLogDir;
209
210  /** Directory on test filesystem where we put the data for this instance of
211    * HBaseTestingUtility*/
212  private Path dataTestDirOnTestFS = null;
213
214  /**
215   * Shared cluster connection.
216   */
217  private volatile Connection connection;
218
219  /** Filesystem URI used for map-reduce mini-cluster setup */
220  private static String FS_URI;
221
222  /** This is for unit tests parameterized with a single boolean. */
223  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
224
225  /**
226   * Checks to see if a specific port is available.
227   *
228   * @param port the port number to check for availability
229   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
230   */
231  public static boolean available(int port) {
232    ServerSocket ss = null;
233    DatagramSocket ds = null;
234    try {
235      ss = new ServerSocket(port);
236      ss.setReuseAddress(true);
237      ds = new DatagramSocket(port);
238      ds.setReuseAddress(true);
239      return true;
240    } catch (IOException e) {
241      // Do nothing
242    } finally {
243      if (ds != null) {
244        ds.close();
245      }
246
247      if (ss != null) {
248        try {
249          ss.close();
250        } catch (IOException e) {
251          /* should not be thrown */
252        }
253      }
254    }
255
256    return false;
257  }
258
259  /**
260   * Create all combinations of Bloom filters and compression algorithms for
261   * testing.
262   */
263  private static List<Object[]> bloomAndCompressionCombinations() {
264    List<Object[]> configurations = new ArrayList<>();
265    for (Compression.Algorithm comprAlgo :
266         HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
267      for (BloomType bloomType : BloomType.values()) {
268        configurations.add(new Object[] { comprAlgo, bloomType });
269      }
270    }
271    return Collections.unmodifiableList(configurations);
272  }
273
274  /**
275   * Create combination of memstoreTS and tags
276   */
277  private static List<Object[]> memStoreTSAndTagsCombination() {
278    List<Object[]> configurations = new ArrayList<>();
279    configurations.add(new Object[] { false, false });
280    configurations.add(new Object[] { false, true });
281    configurations.add(new Object[] { true, false });
282    configurations.add(new Object[] { true, true });
283    return Collections.unmodifiableList(configurations);
284  }
285
286  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
287    List<Object[]> configurations = new ArrayList<>();
288    configurations.add(new Object[] { false, false, true });
289    configurations.add(new Object[] { false, false, false });
290    configurations.add(new Object[] { false, true, true });
291    configurations.add(new Object[] { false, true, false });
292    configurations.add(new Object[] { true, false, true });
293    configurations.add(new Object[] { true, false, false });
294    configurations.add(new Object[] { true, true, true });
295    configurations.add(new Object[] { true, true, false });
296    return Collections.unmodifiableList(configurations);
297  }
298
299  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
300      bloomAndCompressionCombinations();
301
302
303  /**
304   * <p>Create an HBaseTestingUtility using a default configuration.
305   *
306   * <p>Initially, all tmp files are written to a local test data directory.
307   * Once {@link #startMiniDFSCluster} is called, either directly or via
308   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
309   *
310   * <p>Previously, there was a distinction between the type of utility returned by
311   * {@link #createLocalHTU()} and this constructor; this is no longer the case. All
312   * HBaseTestingUtility objects will behave as local until a DFS cluster is started,
313   * at which point they will switch to using mini DFS for storage.
314   */
315  public HBaseTestingUtility() {
316    this(HBaseConfiguration.create());
317  }
318
319  /**
320   * <p>Create an HBaseTestingUtility using a given configuration.
321   *
322   * <p>Initially, all tmp files are written to a local test data directory.
323   * Once {@link #startMiniDFSCluster} is called, either directly or via
324   * {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
325   *
326   * <p>Previously, there was a distinction between the type of utility returned by
327   * {@link #createLocalHTU()} and this constructor; this is no longer the case. All
328   * HBaseTestingUtility objects will behave as local until a DFS cluster is started,
329   * at which point they will switch to using mini DFS for storage.
330   *
331   * @param conf The configuration to use for further operations
332   */
333  public HBaseTestingUtility(@Nullable Configuration conf) {
334    super(conf);
335
336    // a hbase checksum verification failure will cause unit tests to fail
337    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
338
339    // Save this for when setting default file:// breaks things
340    if (this.conf.get("fs.defaultFS") != null) {
341      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
342    }
343    if (this.conf.get(HConstants.HBASE_DIR) != null) {
344      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
345    }
346    // Every cluster is a local cluster until we start DFS
347    // Note that conf could be null, but this.conf will not be
348    String dataTestDir = getDataTestDir().toString();
349    this.conf.set("fs.defaultFS","file:///");
350    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
351    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
352    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,false);
353    // If the value for random ports isn't set set it to true, thus making
354    // tests opt-out for random port assignment
355    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
356        this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
357  }
358
359  /**
360   * @deprecated use {@link HBaseTestingUtility#HBaseTestingUtility()} instead
361   * @return a normal HBaseTestingUtility
362   */
363  @Deprecated
364  public static HBaseTestingUtility createLocalHTU() {
365    return new HBaseTestingUtility();
366  }
367
368  /**
369   * @deprecated use {@link HBaseTestingUtility#HBaseTestingUtility(Configuration)} instead
370   * @return a normal HBaseTestingUtility
371   */
372  @Deprecated
373  public static HBaseTestingUtility createLocalHTU(Configuration c) {
374    return new HBaseTestingUtility(c);
375  }
376
377  /**
378   * Close both the region {@code r} and it's underlying WAL. For use in tests.
379   */
380  public static void closeRegionAndWAL(final Region r) throws IOException {
381    closeRegionAndWAL((HRegion)r);
382  }
383
384  /**
385   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
386   */
387  public static void closeRegionAndWAL(final HRegion r) throws IOException {
388    if (r == null) return;
389    r.close();
390    if (r.getWAL() == null) return;
391    r.getWAL().close();
392  }
393
394  /**
395   * Returns this classes's instance of {@link Configuration}.  Be careful how
396   * you use the returned Configuration since {@link Connection} instances
397   * can be shared.  The Map of Connections is keyed by the Configuration.  If
398   * say, a Connection was being used against a cluster that had been shutdown,
399   * see {@link #shutdownMiniCluster()}, then the Connection will no longer
400   * be wholesome.  Rather than use the return direct, its usually best to
401   * make a copy and use that.  Do
402   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
403   * @return Instance of Configuration.
404   */
405  @Override
406  public Configuration getConfiguration() {
407    return super.getConfiguration();
408  }
409
410  public void setHBaseCluster(HBaseCluster hbaseCluster) {
411    this.hbaseCluster = hbaseCluster;
412  }
413
414  /**
415   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
416   * Give it a random name so can have many concurrent tests running if
417   * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
418   * System property, as it's what minidfscluster bases
419   * it data dir on.  Moding a System property is not the way to do concurrent
420   * instances -- another instance could grab the temporary
421   * value unintentionally -- but not anything can do about it at moment;
422   * single instance only is how the minidfscluster works.
423   *
424   * We also create the underlying directory for
425   *  hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values
426   *  in the conf, and as a system property for hadoop.tmp.dir
427   *
428   * @return The calculated data test build directory, if newly-created.
429   */
430  @Override
431  protected Path setupDataTestDir() {
432    Path testPath = super.setupDataTestDir();
433    if (null == testPath) {
434      return null;
435    }
436
437    createSubDirAndSystemProperty(
438      "hadoop.log.dir",
439      testPath, "hadoop-log-dir");
440
441    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
442    //  we want our own value to ensure uniqueness on the same machine
443    createSubDirAndSystemProperty(
444      "hadoop.tmp.dir",
445      testPath, "hadoop-tmp-dir");
446
447    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
448    createSubDir(
449      "mapreduce.cluster.local.dir",
450      testPath, "mapred-local-dir");
451
452    return testPath;
453  }
454
455  private void createSubDirAndSystemProperty(
456    String propertyName, Path parent, String subDirName){
457
458    String sysValue = System.getProperty(propertyName);
459
460    if (sysValue != null) {
461      // There is already a value set. So we do nothing but hope
462      //  that there will be no conflicts
463      LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
464        sysValue + " so I do NOT create it in " + parent);
465      String confValue = conf.get(propertyName);
466      if (confValue != null && !confValue.endsWith(sysValue)){
467       LOG.warn(
468         propertyName + " property value differs in configuration and system: "+
469         "Configuration="+confValue+" while System="+sysValue+
470         " Erasing configuration value by system value."
471       );
472      }
473      conf.set(propertyName, sysValue);
474    } else {
475      // Ok, it's not set, so we create it as a subdirectory
476      createSubDir(propertyName, parent, subDirName);
477      System.setProperty(propertyName, conf.get(propertyName));
478    }
479  }
480
481  /**
482   * @return Where to write test data on the test filesystem; Returns working directory
483   * for the test filesystem by default
484   * @see #setupDataTestDirOnTestFS()
485   * @see #getTestFileSystem()
486   */
487  private Path getBaseTestDirOnTestFS() throws IOException {
488    FileSystem fs = getTestFileSystem();
489    return new Path(fs.getWorkingDirectory(), "test-data");
490  }
491
492  /**
493   * @return META table descriptor
494   * @deprecated since 2.0 version and will be removed in 3.0 version.
495   *             use {@link #getMetaTableDescriptorBuilder()}
496   */
497  @Deprecated
498  public HTableDescriptor getMetaTableDescriptor() {
499    return new ImmutableHTableDescriptor(getMetaTableDescriptorBuilder().build());
500  }
501
502  /**
503   * @return META table descriptor
504   */
505  public TableDescriptorBuilder getMetaTableDescriptorBuilder() {
506    try {
507      return FSTableDescriptors.createMetaTableDescriptorBuilder(conf);
508    } catch (IOException e) {
509      throw new RuntimeException("Unable to create META table descriptor", e);
510    }
511  }
512
513  /**
514   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
515   * to write temporary test data. Call this method after setting up the mini dfs cluster
516   * if the test relies on it.
517   * @return a unique path in the test filesystem
518   */
519  public Path getDataTestDirOnTestFS() throws IOException {
520    if (dataTestDirOnTestFS == null) {
521      setupDataTestDirOnTestFS();
522    }
523
524    return dataTestDirOnTestFS;
525  }
526
527  /**
528   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
529   * to write temporary test data. Call this method after setting up the mini dfs cluster
530   * if the test relies on it.
531   * @return a unique path in the test filesystem
532   * @param subdirName name of the subdir to create under the base test dir
533   */
534  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
535    return new Path(getDataTestDirOnTestFS(), subdirName);
536  }
537
538  /**
539   * Sets up a path in test filesystem to be used by tests.
540   * Creates a new directory if not already setup.
541   */
542  private void setupDataTestDirOnTestFS() throws IOException {
543    if (dataTestDirOnTestFS != null) {
544      LOG.warn("Data test on test fs dir already setup in "
545          + dataTestDirOnTestFS.toString());
546      return;
547    }
548    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
549  }
550
551  /**
552   * Sets up a new path in test filesystem to be used by tests.
553   */
554  private Path getNewDataTestDirOnTestFS() throws IOException {
555    //The file system can be either local, mini dfs, or if the configuration
556    //is supplied externally, it can be an external cluster FS. If it is a local
557    //file system, the tests should use getBaseTestDir, otherwise, we can use
558    //the working directory, and create a unique sub dir there
559    FileSystem fs = getTestFileSystem();
560    Path newDataTestDir;
561    String randomStr = getRandomUUID().toString();
562    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
563      newDataTestDir = new Path(getDataTestDir(), randomStr);
564      File dataTestDir = new File(newDataTestDir.toString());
565      if (deleteOnExit()) dataTestDir.deleteOnExit();
566    } else {
567      Path base = getBaseTestDirOnTestFS();
568      newDataTestDir = new Path(base, randomStr);
569      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
570    }
571    return newDataTestDir;
572  }
573
574  /**
575   * Cleans the test data directory on the test filesystem.
576   * @return True if we removed the test dirs
577   * @throws IOException
578   */
579  public boolean cleanupDataTestDirOnTestFS() throws IOException {
580    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
581    if (ret)
582      dataTestDirOnTestFS = null;
583    return ret;
584  }
585
586  /**
587   * Cleans a subdirectory under the test data directory on the test filesystem.
588   * @return True if we removed child
589   * @throws IOException
590   */
591  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
592    Path cpath = getDataTestDirOnTestFS(subdirName);
593    return getTestFileSystem().delete(cpath, true);
594  }
595
596  /**
597   * Start a minidfscluster.
598   * @param servers How many DNs to start.
599   * @throws Exception
600   * @see #shutdownMiniDFSCluster()
601   * @return The mini dfs cluster created.
602   */
603  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
604    return startMiniDFSCluster(servers, null);
605  }
606
607  /**
608   * Start a minidfscluster.
609   * This is useful if you want to run datanode on distinct hosts for things
610   * like HDFS block location verification.
611   * If you start MiniDFSCluster without host names, all instances of the
612   * datanodes will have the same host name.
613   * @param hosts hostnames DNs to run on.
614   * @throws Exception
615   * @see #shutdownMiniDFSCluster()
616   * @return The mini dfs cluster created.
617   */
618  public MiniDFSCluster startMiniDFSCluster(final String hosts[])
619  throws Exception {
620    if ( hosts != null && hosts.length != 0) {
621      return startMiniDFSCluster(hosts.length, hosts);
622    } else {
623      return startMiniDFSCluster(1, null);
624    }
625  }
626
627  /**
628   * Start a minidfscluster.
629   * Can only create one.
630   * @param servers How many DNs to start.
631   * @param hosts hostnames DNs to run on.
632   * @throws Exception
633   * @see #shutdownMiniDFSCluster()
634   * @return The mini dfs cluster created.
635   */
636  public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
637  throws Exception {
638    return startMiniDFSCluster(servers, null, hosts);
639  }
640
641  private void setFs() throws IOException {
642    if(this.dfsCluster == null){
643      LOG.info("Skipping setting fs because dfsCluster is null");
644      return;
645    }
646    FileSystem fs = this.dfsCluster.getFileSystem();
647    FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
648
649    // re-enable this check with dfs
650    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
651  }
652
653  public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
654      throws Exception {
655    createDirsAndSetProperties();
656    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
657
658    // Error level to skip some warnings specific to the minicluster. See HBASE-4709
659    org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class).
660        setLevel(org.apache.log4j.Level.ERROR);
661    org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class).
662        setLevel(org.apache.log4j.Level.ERROR);
663
664    TraceUtil.initTracer(conf);
665
666    this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
667        true, null, racks, hosts, null);
668
669    // Set this just-started cluster as our filesystem.
670    setFs();
671
672    // Wait for the cluster to be totally up
673    this.dfsCluster.waitClusterUp();
674
675    //reset the test directory for test file system
676    dataTestDirOnTestFS = null;
677    String dataTestDir = getDataTestDir().toString();
678    conf.set(HConstants.HBASE_DIR, dataTestDir);
679    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
680
681    return this.dfsCluster;
682  }
683
684  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
685    createDirsAndSetProperties();
686    dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
687        null, null, null);
688    return dfsCluster;
689  }
690
691  /** This is used before starting HDFS and map-reduce mini-clusters */
692  private void createDirsAndSetProperties() throws IOException {
693    setupClusterTestDir();
694    conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
695    System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
696    createDirAndSetProperty("cache_data", "test.cache.data");
697    createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir");
698    hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir");
699    createDirAndSetProperty("mapred_local", "mapreduce.cluster.local.dir");
700    createDirAndSetProperty("mapred_temp", "mapreduce.cluster.temp.dir");
701    enableShortCircuit();
702
703    Path root = getDataTestDirOnTestFS("hadoop");
704    conf.set(MapreduceTestingShim.getMROutputDirProp(),
705      new Path(root, "mapred-output-dir").toString());
706    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
707    conf.set("mapreduce.jobtracker.staging.root.dir",
708      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
709    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
710    conf.set("yarn.app.mapreduce.am.staging-dir",
711      new Path(root, "mapreduce-am-staging-root-dir").toString());
712  }
713
714  /**
715   *  Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating
716   *  new column families. Default to false.
717   */
718  public boolean isNewVersionBehaviorEnabled(){
719    final String propName = "hbase.tests.new.version.behavior";
720    String v = System.getProperty(propName);
721    if (v != null){
722      return Boolean.parseBoolean(v);
723    }
724    return false;
725  }
726
727  /**
728   *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
729   *  This allows to specify this parameter on the command line.
730   *   If not set, default is true.
731   */
732  public boolean isReadShortCircuitOn(){
733    final String propName = "hbase.tests.use.shortcircuit.reads";
734    String readOnProp = System.getProperty(propName);
735    if (readOnProp != null){
736      return  Boolean.parseBoolean(readOnProp);
737    } else {
738      return conf.getBoolean(propName, false);
739    }
740  }
741
742  /** Enable the short circuit read, unless configured differently.
743   * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
744   */
745  private void enableShortCircuit() {
746    if (isReadShortCircuitOn()) {
747      String curUser = System.getProperty("user.name");
748      LOG.info("read short circuit is ON for user " + curUser);
749      // read short circuit, for hdfs
750      conf.set("dfs.block.local-path-access.user", curUser);
751      // read short circuit, for hbase
752      conf.setBoolean("dfs.client.read.shortcircuit", true);
753      // Skip checking checksum, for the hdfs client and the datanode
754      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
755    } else {
756      LOG.info("read short circuit is OFF");
757    }
758  }
759
760  private String createDirAndSetProperty(final String relPath, String property) {
761    String path = getDataTestDir(relPath).toString();
762    System.setProperty(property, path);
763    conf.set(property, path);
764    new File(path).mkdirs();
765    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
766    return path;
767  }
768
769  /**
770   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
771   * or does nothing.
772   * @throws IOException
773   */
774  public void shutdownMiniDFSCluster() throws IOException {
775    if (this.dfsCluster != null) {
776      // The below throws an exception per dn, AsynchronousCloseException.
777      this.dfsCluster.shutdown();
778      dfsCluster = null;
779      dataTestDirOnTestFS = null;
780      FSUtils.setFsDefault(this.conf, new Path("file:///"));
781    }
782  }
783
784
785  /**
786   * Start up a minicluster of hbase, dfs, and zookeeper.
787   * @throws Exception
788   * @return Mini hbase cluster instance created.
789   * @see #shutdownMiniDFSCluster()
790   */
791  public MiniHBaseCluster startMiniCluster() throws Exception {
792    return startMiniCluster(1, 1);
793  }
794
795  /**
796   * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
797   * @throws Exception
798   * @return Mini hbase cluster instance created.
799   * @see #shutdownMiniDFSCluster()
800   */
801  public MiniHBaseCluster startMiniCluster(boolean withWALDir) throws Exception {
802    return startMiniCluster(1, 1, 1, null, null, null, false, withWALDir);
803  }
804
805  /**
806   * Start up a minicluster of hbase, dfs, and zookeeper.
807   * Set the <code>create</code> flag to create root or data directory path or not
808   * (will overwrite if dir already exists)
809   * @throws Exception
810   * @return Mini hbase cluster instance created.
811   * @see #shutdownMiniDFSCluster()
812   */
813  public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create)
814  throws Exception {
815    return startMiniCluster(1, numSlaves, create);
816  }
817
818  /**
819   * Start up a minicluster of hbase, optionally dfs, and zookeeper.
820   * Modifies Configuration.  Homes the cluster data directory under a random
821   * subdirectory in a directory under System property test.build.data.
822   * Directory is cleaned up on exit.
823   * @param numSlaves Number of slaves to start up.  We'll start this many
824   * datanodes and regionservers.  If numSlaves is > 1, then make sure
825   * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
826   * bind errors.
827   * @throws Exception
828   * @see #shutdownMiniCluster()
829   * @return Mini hbase cluster instance created.
830   */
831  public MiniHBaseCluster startMiniCluster(final int numSlaves)
832  throws Exception {
833    return startMiniCluster(1, numSlaves, false);
834  }
835
836  public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create, boolean withWALDir)
837          throws Exception {
838    return startMiniCluster(1, numSlaves, numSlaves, null, null, null, create, withWALDir);
839  }
840
841  /**
842   * Start minicluster. Whether to create a new root or data dir path even if such a path
843   * has been created earlier is decided based on flag <code>create</code>
844   * @throws Exception
845   * @see #shutdownMiniCluster()
846   * @return Mini hbase cluster instance created.
847   */
848  public MiniHBaseCluster startMiniCluster(final int numMasters,
849      final int numSlaves, boolean create)
850    throws Exception {
851      return startMiniCluster(numMasters, numSlaves, null, create);
852  }
853
854  /**
855   * start minicluster
856   * @throws Exception
857   * @see #shutdownMiniCluster()
858   * @return Mini hbase cluster instance created.
859   */
860  public MiniHBaseCluster startMiniCluster(final int numMasters,
861    final int numSlaves)
862  throws Exception {
863    return startMiniCluster(numMasters, numSlaves, null, false);
864  }
865
866  public MiniHBaseCluster startMiniCluster(final int numMasters,
867      final int numSlaves, final String[] dataNodeHosts, boolean create)
868      throws Exception {
869    return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
870        null, null, create, false);
871  }
872
873  /**
874   * Start up a minicluster of hbase, optionally dfs, and zookeeper.
875   * Modifies Configuration.  Homes the cluster data directory under a random
876   * subdirectory in a directory under System property test.build.data.
877   * Directory is cleaned up on exit.
878   * @param numMasters Number of masters to start up.  We'll start this many
879   * hbase masters.  If numMasters > 1, you can find the active/primary master
880   * with {@link MiniHBaseCluster#getMaster()}.
881   * @param numSlaves Number of slaves to start up.  We'll start this many
882   * regionservers. If dataNodeHosts == null, this also indicates the number of
883   * datanodes to start. If dataNodeHosts != null, the number of datanodes is
884   * based on dataNodeHosts.length.
885   * If numSlaves is > 1, then make sure
886   * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
887   * bind errors.
888   * @param dataNodeHosts hostnames DNs to run on.
889   * This is useful if you want to run datanode on distinct hosts for things
890   * like HDFS block location verification.
891   * If you start MiniDFSCluster without host names,
892   * all instances of the datanodes will have the same host name.
893   * @throws Exception
894   * @see #shutdownMiniCluster()
895   * @return Mini hbase cluster instance created.
896   */
897  public MiniHBaseCluster startMiniCluster(final int numMasters,
898      final int numSlaves, final String[] dataNodeHosts) throws Exception {
899    return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
900        null, null);
901  }
902
903  /**
904   * Same as {@link #startMiniCluster(int, int)}, but with custom number of datanodes.
905   * @param numDataNodes Number of data nodes.
906   */
907  public MiniHBaseCluster startMiniCluster(final int numMasters,
908      final int numSlaves, final int numDataNodes) throws Exception {
909    return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null);
910  }
911
912  /**
913   * Start up a minicluster of hbase, optionally dfs, and zookeeper.
914   * Modifies Configuration.  Homes the cluster data directory under a random
915   * subdirectory in a directory under System property test.build.data.
916   * Directory is cleaned up on exit.
917   * @param numMasters Number of masters to start up.  We'll start this many
918   * hbase masters.  If numMasters > 1, you can find the active/primary master
919   * with {@link MiniHBaseCluster#getMaster()}.
920   * @param numSlaves Number of slaves to start up.  We'll start this many
921   * regionservers. If dataNodeHosts == null, this also indicates the number of
922   * datanodes to start. If dataNodeHosts != null, the number of datanodes is
923   * based on dataNodeHosts.length.
924   * If numSlaves is > 1, then make sure
925   * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
926   * bind errors.
927   * @param dataNodeHosts hostnames DNs to run on.
928   * This is useful if you want to run datanode on distinct hosts for things
929   * like HDFS block location verification.
930   * If you start MiniDFSCluster without host names,
931   * all instances of the datanodes will have the same host name.
932   * @param masterClass The class to use as HMaster, or null for default
933   * @param regionserverClass The class to use as HRegionServer, or null for
934   * default
935   * @throws Exception
936   * @see #shutdownMiniCluster()
937   * @return Mini hbase cluster instance created.
938   */
939  public MiniHBaseCluster startMiniCluster(final int numMasters,
940      final int numSlaves, final String[] dataNodeHosts, Class<? extends HMaster> masterClass,
941      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
942          throws Exception {
943    return startMiniCluster(
944        numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass);
945  }
946
947  public MiniHBaseCluster startMiniCluster(final int numMasters,
948      final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
949      Class<? extends HMaster> masterClass,
950      Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
951    throws Exception {
952    return startMiniCluster(numMasters, numSlaves, numDataNodes, dataNodeHosts,
953        masterClass, regionserverClass, false, false);
954  }
955
956  /**
957   * Same as {@link #startMiniCluster(int, int, String[], Class, Class)}, but with custom
958   * number of datanodes.
959   * @param numDataNodes Number of data nodes.
960   * @param create Set this flag to create a new
961   * root or data directory path or not (will overwrite if exists already).
962   */
963  public MiniHBaseCluster startMiniCluster(final int numMasters,
964    final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
965    Class<? extends HMaster> masterClass,
966    Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
967    boolean create, boolean withWALDir)
968  throws Exception {
969    if (dataNodeHosts != null && dataNodeHosts.length != 0) {
970      numDataNodes = dataNodeHosts.length;
971    }
972
973    LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
974        numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)");
975
976    // If we already put up a cluster, fail.
977    if (miniClusterRunning) {
978      throw new IllegalStateException("A mini-cluster is already running");
979    }
980    miniClusterRunning = true;
981
982    setupClusterTestDir();
983    System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
984
985    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
986    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
987    if(this.dfsCluster == null) {
988      LOG.info("STARTING DFS");
989      dfsCluster = startMiniDFSCluster(numDataNodes, dataNodeHosts);
990    } else LOG.info("NOT STARTING DFS");
991
992    // Start up a zk cluster.
993    if (getZkCluster() == null) {
994      startMiniZKCluster();
995    }
996
997    // Start the MiniHBaseCluster
998    return startMiniHBaseCluster(numMasters, numSlaves, null, masterClass,
999      regionserverClass, create, withWALDir);
1000  }
1001
1002  public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
1003      throws IOException, InterruptedException {
1004     return startMiniHBaseCluster(numMasters, numSlaves, null);
1005  }
1006
1007  public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves,
1008      List<Integer> rsPorts) throws IOException, InterruptedException {
1009    return startMiniHBaseCluster(numMasters, numSlaves, rsPorts, null, null, false, false);
1010  }
1011
1012  /**
1013   * Starts up mini hbase cluster.  Usually used after call to
1014   * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters.
1015   * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1016   * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
1017   *                restart where for sure the regionservers come up on same address+port (but
1018   *                just with different startcode); by default mini hbase clusters choose new
1019   *                arbitrary ports on each cluster start.
1020   * @param create Whether to create a
1021   * root or data directory path or not; will overwrite if exists already.
1022   * @return Reference to the hbase mini hbase cluster.
1023   * @throws IOException
1024   * @throws InterruptedException
1025   * @see #startMiniCluster()
1026   */
1027  public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
1028        final int numSlaves, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
1029        Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
1030        boolean create, boolean withWALDir)
1031  throws IOException, InterruptedException {
1032    // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
1033    createRootDir(create);
1034    if (withWALDir) {
1035      createWALRootDir();
1036    }
1037    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1038    // for tests that do not read hbase-defaults.xml
1039    setHBaseFsTmpDir();
1040
1041    // These settings will make the server waits until this exact number of
1042    // regions servers are connected.
1043    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1044      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, numSlaves);
1045    }
1046    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1047      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, numSlaves);
1048    }
1049
1050    Configuration c = new Configuration(this.conf);
1051    TraceUtil.initTracer(c);
1052    this.hbaseCluster =
1053        new MiniHBaseCluster(c, numMasters, numSlaves, rsPorts, masterClass, regionserverClass);
1054    // Don't leave here till we've done a successful scan of the hbase:meta
1055    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
1056    ResultScanner s = t.getScanner(new Scan());
1057    while (s.next() != null) {
1058      continue;
1059    }
1060    s.close();
1061    t.close();
1062
1063    getAdmin(); // create immediately the hbaseAdmin
1064    LOG.info("Minicluster is up; activeMaster=" + this.getHBaseCluster().getMaster());
1065
1066    return (MiniHBaseCluster)this.hbaseCluster;
1067  }
1068
1069  /**
1070   * Starts the hbase cluster up again after shutting it down previously in a
1071   * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
1072   * @param servers number of region servers
1073   */
1074  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1075    if (hbaseAdmin != null) {
1076      hbaseAdmin.close();
1077      hbaseAdmin = null;
1078    }
1079    if (this.connection != null) {
1080      this.connection.close();
1081      this.connection = null;
1082    }
1083    this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
1084    // Don't leave here till we've done a successful scan of the hbase:meta
1085    Connection conn = ConnectionFactory.createConnection(this.conf);
1086    Table t = conn.getTable(TableName.META_TABLE_NAME);
1087    ResultScanner s = t.getScanner(new Scan());
1088    while (s.next() != null) {
1089      // do nothing
1090    }
1091    LOG.info("HBase has been restarted");
1092    s.close();
1093    t.close();
1094    conn.close();
1095  }
1096
1097  /**
1098   * @return Current mini hbase cluster. Only has something in it after a call
1099   * to {@link #startMiniCluster()}.
1100   * @see #startMiniCluster()
1101   */
1102  public MiniHBaseCluster getMiniHBaseCluster() {
1103    if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1104      return (MiniHBaseCluster)this.hbaseCluster;
1105    }
1106    throw new RuntimeException(hbaseCluster + " not an instance of " +
1107                               MiniHBaseCluster.class.getName());
1108  }
1109
1110  /**
1111   * Stops mini hbase, zk, and hdfs clusters.
1112   * @throws IOException
1113   * @see #startMiniCluster(int)
1114   */
1115  public void shutdownMiniCluster() throws Exception {
1116    LOG.info("Shutting down minicluster");
1117    shutdownMiniHBaseCluster();
1118    shutdownMiniDFSCluster();
1119    shutdownMiniZKCluster();
1120
1121    cleanupTestDir();
1122    miniClusterRunning = false;
1123    LOG.info("Minicluster is down");
1124  }
1125
1126  /**
1127   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1128   * @throws java.io.IOException in case command is unsuccessful
1129   */
1130  public void shutdownMiniHBaseCluster() throws IOException {
1131    cleanup();
1132    if (this.hbaseCluster != null) {
1133      this.hbaseCluster.shutdown();
1134      // Wait till hbase is down before going on to shutdown zk.
1135      this.hbaseCluster.waitUntilShutDown();
1136      this.hbaseCluster = null;
1137    }
1138    if (zooKeeperWatcher != null) {
1139      zooKeeperWatcher.close();
1140      zooKeeperWatcher = null;
1141    }
1142  }
1143
1144  /**
1145   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1146   * @throws java.io.IOException throws in case command is unsuccessful
1147   */
1148  public void killMiniHBaseCluster() throws IOException {
1149    cleanup();
1150    if (this.hbaseCluster != null) {
1151      getMiniHBaseCluster().killAll();
1152      this.hbaseCluster = null;
1153    }
1154    if (zooKeeperWatcher != null) {
1155      zooKeeperWatcher.close();
1156      zooKeeperWatcher = null;
1157    }
1158  }
1159
1160  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1161  private void cleanup() throws IOException {
1162    if (hbaseAdmin != null) {
1163      hbaseAdmin.close();
1164      hbaseAdmin = null;
1165    }
1166    if (this.connection != null) {
1167      this.connection.close();
1168      this.connection = null;
1169    }
1170    // unset the configuration for MIN and MAX RS to start
1171    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1172    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1173  }
1174
1175  /**
1176   * Returns the path to the default root dir the minicluster uses. If <code>create</code>
1177   * is true, a new root directory path is fetched irrespective of whether it has been fetched
1178   * before or not. If false, previous path is used.
1179   * Note: this does not cause the root dir to be created.
1180   * @return Fully qualified path for the default hbase root dir
1181   * @throws IOException
1182   */
1183  public Path getDefaultRootDirPath(boolean create) throws IOException {
1184    if (!create) {
1185      return getDataTestDirOnTestFS();
1186    } else {
1187      return getNewDataTestDirOnTestFS();
1188    }
1189  }
1190
1191  /**
1192   * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)}
1193   * except that <code>create</code> flag is false.
1194   * Note: this does not cause the root dir to be created.
1195   * @return Fully qualified path for the default hbase root dir
1196   * @throws IOException
1197   */
1198  public Path getDefaultRootDirPath() throws IOException {
1199    return getDefaultRootDirPath(false);
1200  }
1201
1202  /**
1203   * Creates an hbase rootdir in user home directory.  Also creates hbase
1204   * version file.  Normally you won't make use of this method.  Root hbasedir
1205   * is created for you as part of mini cluster startup.  You'd only use this
1206   * method if you were doing manual operation.
1207   * @param create This flag decides whether to get a new
1208   * root or data directory path or not, if it has been fetched already.
1209   * Note : Directory will be made irrespective of whether path has been fetched or not.
1210   * If directory already exists, it will be overwritten
1211   * @return Fully qualified path to hbase root dir
1212   * @throws IOException
1213   */
1214  public Path createRootDir(boolean create) throws IOException {
1215    FileSystem fs = FileSystem.get(this.conf);
1216    Path hbaseRootdir = getDefaultRootDirPath(create);
1217    FSUtils.setRootDir(this.conf, hbaseRootdir);
1218    fs.mkdirs(hbaseRootdir);
1219    FSUtils.setVersion(fs, hbaseRootdir);
1220    return hbaseRootdir;
1221  }
1222
1223  /**
1224   * Same as {@link HBaseTestingUtility#createRootDir(boolean create)}
1225   * except that <code>create</code> flag is false.
1226   * @return Fully qualified path to hbase root dir
1227   * @throws IOException
1228   */
1229  public Path createRootDir() throws IOException {
1230    return createRootDir(false);
1231  }
1232
1233  /**
1234   * Creates a hbase walDir in the user's home directory.
1235   * Normally you won't make use of this method. Root hbaseWALDir
1236   * is created for you as part of mini cluster startup. You'd only use this
1237   * method if you were doing manual operation.
1238   *
1239   * @return Fully qualified path to hbase root dir
1240   * @throws IOException
1241  */
1242  public Path createWALRootDir() throws IOException {
1243    FileSystem fs = FileSystem.get(this.conf);
1244    Path walDir = getNewDataTestDirOnTestFS();
1245    FSUtils.setWALRootDir(this.conf, walDir);
1246    fs.mkdirs(walDir);
1247    return walDir;
1248  }
1249
1250  private void setHBaseFsTmpDir() throws IOException {
1251    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1252    if (hbaseFsTmpDirInString == null) {
1253      this.conf.set("hbase.fs.tmp.dir",  getDataTestDirOnTestFS("hbase-staging").toString());
1254      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1255    } else {
1256      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1257    }
1258  }
1259
1260  /**
1261   * Flushes all caches in the mini hbase cluster
1262   * @throws IOException
1263   */
1264  public void flush() throws IOException {
1265    getMiniHBaseCluster().flushcache();
1266  }
1267
1268  /**
1269   * Flushes all caches in the mini hbase cluster
1270   * @throws IOException
1271   */
1272  public void flush(TableName tableName) throws IOException {
1273    getMiniHBaseCluster().flushcache(tableName);
1274  }
1275
1276  /**
1277   * Compact all regions in the mini hbase cluster
1278   * @throws IOException
1279   */
1280  public void compact(boolean major) throws IOException {
1281    getMiniHBaseCluster().compact(major);
1282  }
1283
1284  /**
1285   * Compact all of a table's reagion in the mini hbase cluster
1286   * @throws IOException
1287   */
1288  public void compact(TableName tableName, boolean major) throws IOException {
1289    getMiniHBaseCluster().compact(tableName, major);
1290  }
1291
1292  /**
1293   * Create a table.
1294   * @param tableName
1295   * @param family
1296   * @return A Table instance for the created table.
1297   * @throws IOException
1298   */
1299  public Table createTable(TableName tableName, String family)
1300  throws IOException{
1301    return createTable(tableName, new String[]{family});
1302  }
1303
1304  /**
1305   * Create a table.
1306   * @param tableName
1307   * @param families
1308   * @return A Table instance for the created table.
1309   * @throws IOException
1310   */
1311  public Table createTable(TableName tableName, String[] families)
1312  throws IOException {
1313    List<byte[]> fams = new ArrayList<>(families.length);
1314    for (String family : families) {
1315      fams.add(Bytes.toBytes(family));
1316    }
1317    return createTable(tableName, fams.toArray(new byte[0][]));
1318  }
1319
1320  /**
1321   * Create a table.
1322   * @param tableName
1323   * @param family
1324   * @return A Table instance for the created table.
1325   * @throws IOException
1326   */
1327  public Table createTable(TableName tableName, byte[] family)
1328  throws IOException{
1329    return createTable(tableName, new byte[][]{family});
1330  }
1331
1332  /**
1333   * Create a table with multiple regions.
1334   * @param tableName
1335   * @param family
1336   * @param numRegions
1337   * @return A Table instance for the created table.
1338   * @throws IOException
1339   */
1340  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1341      throws IOException {
1342    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1343    byte[] startKey = Bytes.toBytes("aaaaa");
1344    byte[] endKey = Bytes.toBytes("zzzzz");
1345    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1346
1347    return createTable(tableName, new byte[][] { family }, splitKeys);
1348  }
1349
1350  /**
1351   * Create a table.
1352   * @param tableName
1353   * @param families
1354   * @return A Table instance for the created table.
1355   * @throws IOException
1356   */
1357  public Table createTable(TableName tableName, byte[][] families)
1358  throws IOException {
1359    return createTable(tableName, families, (byte[][]) null);
1360  }
1361
1362  /**
1363   * Create a table with multiple regions.
1364   * @param tableName
1365   * @param families
1366   * @return A Table instance for the created table.
1367   * @throws IOException
1368   */
1369  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1370    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1371  }
1372
1373  /**
1374   * Create a table.
1375   * @param tableName
1376   * @param families
1377   * @param splitKeys
1378   * @return A Table instance for the created table.
1379   * @throws IOException
1380   */
1381  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1382      throws IOException {
1383    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1384  }
1385
1386  /**
1387   * Create a table.
1388   * @param tableName the table name
1389   * @param families the families
1390   * @param splitKeys the splitkeys
1391   * @param replicaCount the region replica count
1392   * @return A Table instance for the created table.
1393   * @throws IOException throws IOException
1394   */
1395  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1396      int replicaCount) throws IOException {
1397    return createTable(tableName, families, splitKeys, replicaCount,
1398      new Configuration(getConfiguration()));
1399  }
1400
1401  public Table createTable(TableName tableName, byte[][] families,
1402      int numVersions, byte[] startKey, byte[] endKey, int numRegions)
1403  throws IOException{
1404    HTableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1405
1406    getAdmin().createTable(desc, startKey, endKey, numRegions);
1407    // HBaseAdmin only waits for regions to appear in hbase:meta we
1408    // should wait until they are assigned
1409    waitUntilAllRegionsAssigned(tableName);
1410    return getConnection().getTable(tableName);
1411  }
1412
1413  /**
1414   * Create a table.
1415   * @param htd
1416   * @param families
1417   * @param c Configuration to use
1418   * @return A Table instance for the created table.
1419   * @throws IOException
1420   */
1421  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1422  throws IOException {
1423    return createTable(htd, families, null, c);
1424  }
1425
1426  /**
1427   * Create a table.
1428   * @param htd table descriptor
1429   * @param families array of column families
1430   * @param splitKeys array of split keys
1431   * @param c Configuration to use
1432   * @return A Table instance for the created table.
1433   * @throws IOException if getAdmin or createTable fails
1434   */
1435  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1436      Configuration c) throws IOException {
1437    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1438    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1439    // on is interfering.
1440    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1441  }
1442
1443  /**
1444   * Create a table.
1445   * @param htd table descriptor
1446   * @param families array of column families
1447   * @param splitKeys array of split keys
1448   * @param type Bloom type
1449   * @param blockSize block size
1450   * @param c Configuration to use
1451   * @return A Table instance for the created table.
1452   * @throws IOException if getAdmin or createTable fails
1453   */
1454
1455  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1456      BloomType type, int blockSize, Configuration c) throws IOException {
1457    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1458    for (byte[] family : families) {
1459      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1460        .setBloomFilterType(type)
1461        .setBlocksize(blockSize);
1462      if (isNewVersionBehaviorEnabled()) {
1463          cfdb.setNewVersionBehavior(true);
1464      }
1465      builder.setColumnFamily(cfdb.build());
1466    }
1467    TableDescriptor td = builder.build();
1468    getAdmin().createTable(td, splitKeys);
1469    // HBaseAdmin only waits for regions to appear in hbase:meta
1470    // we should wait until they are assigned
1471    waitUntilAllRegionsAssigned(td.getTableName());
1472    return getConnection().getTable(td.getTableName());
1473  }
1474
1475  /**
1476   * Create a table.
1477   * @param htd table descriptor
1478   * @param splitRows array of split keys
1479   * @return A Table instance for the created table.
1480   * @throws IOException
1481   */
1482  public Table createTable(TableDescriptor htd, byte[][] splitRows)
1483      throws IOException {
1484    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1485    if (isNewVersionBehaviorEnabled()) {
1486      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1487         builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family)
1488           .setNewVersionBehavior(true).build());
1489      }
1490    }
1491    getAdmin().createTable(builder.build(), splitRows);
1492    // HBaseAdmin only waits for regions to appear in hbase:meta
1493    // we should wait until they are assigned
1494    waitUntilAllRegionsAssigned(htd.getTableName());
1495    return getConnection().getTable(htd.getTableName());
1496  }
1497
1498  /**
1499   * Create a table.
1500   * @param tableName the table name
1501   * @param families the families
1502   * @param splitKeys the split keys
1503   * @param replicaCount the replica count
1504   * @param c Configuration to use
1505   * @return A Table instance for the created table.
1506   * @throws IOException
1507   */
1508  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1509      int replicaCount, final Configuration c) throws IOException {
1510    HTableDescriptor htd = new HTableDescriptor(tableName);
1511    htd.setRegionReplication(replicaCount);
1512    return createTable(htd, families, splitKeys, c);
1513  }
1514
1515  /**
1516   * Create a table.
1517   * @param tableName
1518   * @param family
1519   * @param numVersions
1520   * @return A Table instance for the created table.
1521   * @throws IOException
1522   */
1523  public Table createTable(TableName tableName, byte[] family, int numVersions)
1524  throws IOException {
1525    return createTable(tableName, new byte[][]{family}, numVersions);
1526  }
1527
1528  /**
1529   * Create a table.
1530   * @param tableName
1531   * @param families
1532   * @param numVersions
1533   * @return A Table instance for the created table.
1534   * @throws IOException
1535   */
1536  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1537      throws IOException {
1538    return createTable(tableName, families, numVersions, (byte[][]) null);
1539  }
1540
1541  /**
1542   * Create a table.
1543   * @param tableName
1544   * @param families
1545   * @param numVersions
1546   * @param splitKeys
1547   * @return A Table instance for the created table.
1548   * @throws IOException
1549   */
1550  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1551      byte[][] splitKeys) throws IOException {
1552    HTableDescriptor desc = new HTableDescriptor(tableName);
1553    for (byte[] family : families) {
1554      HColumnDescriptor hcd = new HColumnDescriptor(family).setMaxVersions(numVersions);
1555      if (isNewVersionBehaviorEnabled()) {
1556        hcd.setNewVersionBehavior(true);
1557      }
1558      desc.addFamily(hcd);
1559    }
1560    getAdmin().createTable(desc, splitKeys);
1561    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1562    // assigned
1563    waitUntilAllRegionsAssigned(tableName);
1564    return getConnection().getTable(tableName);
1565  }
1566
1567  /**
1568   * Create a table with multiple regions.
1569   * @param tableName
1570   * @param families
1571   * @param numVersions
1572   * @return A Table instance for the created table.
1573   * @throws IOException
1574   */
1575  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1576      throws IOException {
1577    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1578  }
1579
1580  /**
1581   * Create a table.
1582   * @param tableName
1583   * @param families
1584   * @param numVersions
1585   * @param blockSize
1586   * @return A Table instance for the created table.
1587   * @throws IOException
1588   */
1589  public Table createTable(TableName tableName, byte[][] families,
1590    int numVersions, int blockSize) throws IOException {
1591    HTableDescriptor desc = new HTableDescriptor(tableName);
1592    for (byte[] family : families) {
1593      HColumnDescriptor hcd = new HColumnDescriptor(family)
1594          .setMaxVersions(numVersions)
1595          .setBlocksize(blockSize);
1596      if (isNewVersionBehaviorEnabled()) {
1597        hcd.setNewVersionBehavior(true);
1598      }
1599      desc.addFamily(hcd);
1600    }
1601    getAdmin().createTable(desc);
1602    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1603    // assigned
1604    waitUntilAllRegionsAssigned(tableName);
1605    return getConnection().getTable(tableName);
1606  }
1607
1608  public Table createTable(TableName tableName, byte[][] families,
1609      int numVersions, int blockSize, String cpName) throws IOException {
1610      HTableDescriptor desc = new HTableDescriptor(tableName);
1611      for (byte[] family : families) {
1612        HColumnDescriptor hcd = new HColumnDescriptor(family)
1613            .setMaxVersions(numVersions)
1614            .setBlocksize(blockSize);
1615        if (isNewVersionBehaviorEnabled()) {
1616          hcd.setNewVersionBehavior(true);
1617        }
1618        desc.addFamily(hcd);
1619      }
1620      if(cpName != null) {
1621        desc.addCoprocessor(cpName);
1622      }
1623      getAdmin().createTable(desc);
1624      // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1625      // assigned
1626      waitUntilAllRegionsAssigned(tableName);
1627      return getConnection().getTable(tableName);
1628    }
1629
1630  /**
1631   * Create a table.
1632   * @param tableName
1633   * @param families
1634   * @param numVersions
1635   * @return A Table instance for the created table.
1636   * @throws IOException
1637   */
1638  public Table createTable(TableName tableName, byte[][] families,
1639      int[] numVersions)
1640  throws IOException {
1641    HTableDescriptor desc = new HTableDescriptor(tableName);
1642    int i = 0;
1643    for (byte[] family : families) {
1644      HColumnDescriptor hcd = new HColumnDescriptor(family)
1645          .setMaxVersions(numVersions[i]);
1646      if (isNewVersionBehaviorEnabled()) {
1647        hcd.setNewVersionBehavior(true);
1648      }
1649      desc.addFamily(hcd);
1650      i++;
1651    }
1652    getAdmin().createTable(desc);
1653    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1654    // assigned
1655    waitUntilAllRegionsAssigned(tableName);
1656    return getConnection().getTable(tableName);
1657  }
1658
1659  /**
1660   * Create a table.
1661   * @param tableName
1662   * @param family
1663   * @param splitRows
1664   * @return A Table instance for the created table.
1665   * @throws IOException
1666   */
1667  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1668      throws IOException {
1669    HTableDescriptor desc = new HTableDescriptor(tableName);
1670    HColumnDescriptor hcd = new HColumnDescriptor(family);
1671    if (isNewVersionBehaviorEnabled()) {
1672      hcd.setNewVersionBehavior(true);
1673    }
1674    desc.addFamily(hcd);
1675    getAdmin().createTable(desc, splitRows);
1676    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1677    // assigned
1678    waitUntilAllRegionsAssigned(tableName);
1679    return getConnection().getTable(tableName);
1680  }
1681
1682  /**
1683   * Create a table with multiple regions.
1684   * @param tableName
1685   * @param family
1686   * @return A Table instance for the created table.
1687   * @throws IOException
1688   */
1689  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1690    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1691  }
1692
1693  /**
1694   * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
1695   */
1696  @SuppressWarnings("serial")
1697  public static void modifyTableSync(Admin admin, TableDescriptor desc)
1698      throws IOException, InterruptedException {
1699    admin.modifyTable(desc);
1700    Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
1701      setFirst(0);
1702      setSecond(0);
1703    }};
1704    int i = 0;
1705    do {
1706      status = admin.getAlterStatus(desc.getTableName());
1707      if (status.getSecond() != 0) {
1708        LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
1709          + " regions updated.");
1710        Thread.sleep(1 * 1000L);
1711      } else {
1712        LOG.debug("All regions updated.");
1713        break;
1714      }
1715    } while (status.getFirst() != 0 && i++ < 500);
1716    if (status.getFirst() != 0) {
1717      throw new IOException("Failed to update all regions even after 500 seconds.");
1718    }
1719  }
1720
1721  /**
1722   * Set the number of Region replicas.
1723   */
1724  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1725      throws IOException, InterruptedException {
1726    admin.disableTable(table);
1727    HTableDescriptor desc = new HTableDescriptor(admin.getTableDescriptor(table));
1728    desc.setRegionReplication(replicaCount);
1729    admin.modifyTable(desc.getTableName(), desc);
1730    admin.enableTable(table);
1731  }
1732
1733  /**
1734   * Drop an existing table
1735   * @param tableName existing table
1736   */
1737  public void deleteTable(TableName tableName) throws IOException {
1738    try {
1739      getAdmin().disableTable(tableName);
1740    } catch (TableNotEnabledException e) {
1741      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1742    }
1743    getAdmin().deleteTable(tableName);
1744  }
1745
1746  /**
1747   * Drop an existing table
1748   * @param tableName existing table
1749   */
1750  public void deleteTableIfAny(TableName tableName) throws IOException {
1751    try {
1752      deleteTable(tableName);
1753    } catch (TableNotFoundException e) {
1754      // ignore
1755    }
1756  }
1757
1758  // ==========================================================================
1759  // Canned table and table descriptor creation
1760  // TODO replace HBaseTestCase
1761
1762  public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1763  public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1764  public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1765  public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1766  private static final int MAXVERSIONS = 3;
1767
1768  public static final char FIRST_CHAR = 'a';
1769  public static final char LAST_CHAR = 'z';
1770  public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1771  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1772
1773  @Deprecated
1774  public HTableDescriptor createTableDescriptor(final String name,
1775      final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1776    return this.createTableDescriptor(TableName.valueOf(name), minVersions, versions, ttl,
1777        keepDeleted);
1778  }
1779
1780  /**
1781   * Create a table of name <code>name</code>.
1782   * @param name Name to give table.
1783   * @return Column descriptor.
1784   */
1785  @Deprecated
1786  public HTableDescriptor createTableDescriptor(final String name) {
1787    return createTableDescriptor(TableName.valueOf(name),  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1788        MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1789  }
1790
1791  public HTableDescriptor createTableDescriptor(final TableName name,
1792      final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1793    HTableDescriptor htd = new HTableDescriptor(name);
1794    for (byte[] cfName : new byte[][]{ fam1, fam2, fam3 }) {
1795      HColumnDescriptor hcd = new HColumnDescriptor(cfName)
1796          .setMinVersions(minVersions)
1797          .setMaxVersions(versions)
1798          .setKeepDeletedCells(keepDeleted)
1799          .setBlockCacheEnabled(false)
1800          .setTimeToLive(ttl);
1801      if (isNewVersionBehaviorEnabled()) {
1802          hcd.setNewVersionBehavior(true);
1803      }
1804      htd.addFamily(hcd);
1805    }
1806    return htd;
1807  }
1808
1809  /**
1810   * Create a table of name <code>name</code>.
1811   * @param name Name to give table.
1812   * @return Column descriptor.
1813   */
1814  public HTableDescriptor createTableDescriptor(final TableName name) {
1815    return createTableDescriptor(name,  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
1816        MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
1817  }
1818
1819  public HTableDescriptor createTableDescriptor(final TableName tableName,
1820      byte[] family) {
1821    return createTableDescriptor(tableName, new byte[][] {family}, 1);
1822  }
1823
1824  public HTableDescriptor createTableDescriptor(final TableName tableName,
1825      byte[][] families, int maxVersions) {
1826    HTableDescriptor desc = new HTableDescriptor(tableName);
1827    for (byte[] family : families) {
1828      HColumnDescriptor hcd = new HColumnDescriptor(family)
1829          .setMaxVersions(maxVersions);
1830      if (isNewVersionBehaviorEnabled()) {
1831          hcd.setNewVersionBehavior(true);
1832      }
1833      desc.addFamily(hcd);
1834    }
1835    return desc;
1836  }
1837
1838  /**
1839   * Create an HRegion that writes to the local tmp dirs
1840   * @param desc a table descriptor indicating which table the region belongs to
1841   * @param startKey the start boundary of the region
1842   * @param endKey the end boundary of the region
1843   * @return a region that writes to local dir for testing
1844   * @throws IOException
1845   */
1846  public HRegion createLocalHRegion(TableDescriptor desc, byte [] startKey,
1847      byte [] endKey)
1848  throws IOException {
1849    HRegionInfo hri = new HRegionInfo(desc.getTableName(), startKey, endKey);
1850    return createLocalHRegion(hri, desc);
1851  }
1852
1853  /**
1854   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1855   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when you're finished with it.
1856   */
1857  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1858    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1859  }
1860
1861  /**
1862   * Create an HRegion that writes to the local tmp dirs with specified wal
1863   * @param info regioninfo
1864   * @param desc table descriptor
1865   * @param wal wal for this region.
1866   * @return created hregion
1867   * @throws IOException
1868   */
1869  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc, WAL wal)
1870      throws IOException {
1871    return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
1872  }
1873
1874  /**
1875   * @param tableName
1876   * @param startKey
1877   * @param stopKey
1878   * @param callingMethod
1879   * @param conf
1880   * @param isReadOnly
1881   * @param families
1882   * @throws IOException
1883   * @return A region on which you must call
1884             {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
1885   * @deprecated use
1886   * {@link #createLocalHRegion(TableName, byte[], byte[], boolean, Durability, WAL, byte[]...)}
1887   */
1888  @Deprecated
1889  public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1890      String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1891      WAL wal, byte[]... families) throws IOException {
1892    return this
1893        .createLocalHRegion(TableName.valueOf(tableName), startKey, stopKey, isReadOnly, durability,
1894            wal, families);
1895  }
1896
1897  /**
1898   * @param tableName
1899   * @param startKey
1900   * @param stopKey
1901   * @param isReadOnly
1902   * @param families
1903   * @return A region on which you must call
1904   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
1905   * @throws IOException
1906   */
1907  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1908      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
1909    return createLocalHRegionWithInMemoryFlags(tableName,startKey, stopKey, isReadOnly,
1910        durability, wal, null, families);
1911  }
1912
1913  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1914      byte[] stopKey,
1915      boolean isReadOnly, Durability durability, WAL wal, boolean[] compactedMemStore,
1916      byte[]... families)
1917      throws IOException {
1918    HTableDescriptor htd = new HTableDescriptor(tableName);
1919    htd.setReadOnly(isReadOnly);
1920    int i=0;
1921    for (byte[] family : families) {
1922      HColumnDescriptor hcd = new HColumnDescriptor(family);
1923      if(compactedMemStore != null && i < compactedMemStore.length) {
1924        hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1925      } else {
1926        hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1927
1928      }
1929      i++;
1930      // Set default to be three versions.
1931      hcd.setMaxVersions(Integer.MAX_VALUE);
1932      htd.addFamily(hcd);
1933    }
1934    htd.setDurability(durability);
1935    HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
1936    return createLocalHRegion(info, htd, wal);
1937  }
1938
1939  //
1940  // ==========================================================================
1941
1942  /**
1943   * Provide an existing table name to truncate.
1944   * Scans the table and issues a delete for each row read.
1945   * @param tableName existing table
1946   * @return HTable to that new table
1947   * @throws IOException
1948   */
1949  public Table deleteTableData(TableName tableName) throws IOException {
1950    Table table = getConnection().getTable(tableName);
1951    Scan scan = new Scan();
1952    ResultScanner resScan = table.getScanner(scan);
1953    for(Result res : resScan) {
1954      Delete del = new Delete(res.getRow());
1955      table.delete(del);
1956    }
1957    resScan = table.getScanner(scan);
1958    resScan.close();
1959    return table;
1960  }
1961
1962  /**
1963   * Truncate a table using the admin command.
1964   * Effectively disables, deletes, and recreates the table.
1965   * @param tableName table which must exist.
1966   * @param preserveRegions keep the existing split points
1967   * @return HTable for the new table
1968   */
1969  public Table truncateTable(final TableName tableName, final boolean preserveRegions) throws
1970      IOException {
1971    Admin admin = getAdmin();
1972    if (!admin.isTableDisabled(tableName)) {
1973      admin.disableTable(tableName);
1974    }
1975    admin.truncateTable(tableName, preserveRegions);
1976    return getConnection().getTable(tableName);
1977  }
1978
1979  /**
1980   * Truncate a table using the admin command.
1981   * Effectively disables, deletes, and recreates the table.
1982   * For previous behavior of issuing row deletes, see
1983   * deleteTableData.
1984   * Expressly does not preserve regions of existing table.
1985   * @param tableName table which must exist.
1986   * @return HTable for the new table
1987   */
1988  public Table truncateTable(final TableName tableName) throws IOException {
1989    return truncateTable(tableName, false);
1990  }
1991
1992  /**
1993   * Load table with rows from 'aaa' to 'zzz'.
1994   * @param t Table
1995   * @param f Family
1996   * @return Count of rows loaded.
1997   * @throws IOException
1998   */
1999  public int loadTable(final Table t, final byte[] f) throws IOException {
2000    return loadTable(t, new byte[][] {f});
2001  }
2002
2003  /**
2004   * Load table with rows from 'aaa' to 'zzz'.
2005   * @param t Table
2006   * @param f Family
2007   * @return Count of rows loaded.
2008   * @throws IOException
2009   */
2010  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2011    return loadTable(t, new byte[][] {f}, null, writeToWAL);
2012  }
2013
2014  /**
2015   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2016   * @param t Table
2017   * @param f Array of Families to load
2018   * @return Count of rows loaded.
2019   * @throws IOException
2020   */
2021  public int loadTable(final Table t, final byte[][] f) throws IOException {
2022    return loadTable(t, f, null);
2023  }
2024
2025  /**
2026   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2027   * @param t Table
2028   * @param f Array of Families to load
2029   * @param value the values of the cells. If null is passed, the row key is used as value
2030   * @return Count of rows loaded.
2031   * @throws IOException
2032   */
2033  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2034    return loadTable(t, f, value, true);
2035  }
2036
2037  /**
2038   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2039   * @param t Table
2040   * @param f Array of Families to load
2041   * @param value the values of the cells. If null is passed, the row key is used as value
2042   * @return Count of rows loaded.
2043   * @throws IOException
2044   */
2045  public int loadTable(final Table t, final byte[][] f, byte[] value,
2046      boolean writeToWAL) throws IOException {
2047    List<Put> puts = new ArrayList<>();
2048    for (byte[] row : HBaseTestingUtility.ROWS) {
2049      Put put = new Put(row);
2050      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2051      for (int i = 0; i < f.length; i++) {
2052        byte[] value1 = value != null ? value : row;
2053        put.addColumn(f[i], f[i], value1);
2054      }
2055      puts.add(put);
2056    }
2057    t.put(puts);
2058    return puts.size();
2059  }
2060
2061  /** A tracker for tracking and validating table rows
2062   * generated with {@link HBaseTestingUtility#loadTable(Table, byte[])}
2063   */
2064  public static class SeenRowTracker {
2065    int dim = 'z' - 'a' + 1;
2066    int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
2067    byte[] startRow;
2068    byte[] stopRow;
2069
2070    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2071      this.startRow = startRow;
2072      this.stopRow = stopRow;
2073    }
2074
2075    void reset() {
2076      for (byte[] row : ROWS) {
2077        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2078      }
2079    }
2080
2081    int i(byte b) {
2082      return b - 'a';
2083    }
2084
2085    public void addRow(byte[] row) {
2086      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2087    }
2088
2089    /** Validate that all the rows between startRow and stopRow are seen exactly once, and
2090     * all other rows none
2091     */
2092    public void validate() {
2093      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2094        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2095          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2096            int count = seenRows[i(b1)][i(b2)][i(b3)];
2097            int expectedCount = 0;
2098            if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
2099                && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
2100              expectedCount = 1;
2101            }
2102            if (count != expectedCount) {
2103              String row = new String(new byte[] {b1,b2,b3}, StandardCharsets.UTF_8);
2104              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " +
2105                  "instead of " + expectedCount);
2106            }
2107          }
2108        }
2109      }
2110    }
2111  }
2112
2113  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2114    return loadRegion(r, f, false);
2115  }
2116
2117  public int loadRegion(final Region r, final byte[] f) throws IOException {
2118    return loadRegion((HRegion)r, f);
2119  }
2120
2121  /**
2122   * Load region with rows from 'aaa' to 'zzz'.
2123   * @param r Region
2124   * @param f Family
2125   * @param flush flush the cache if true
2126   * @return Count of rows loaded.
2127   * @throws IOException
2128   */
2129  public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
2130  throws IOException {
2131    byte[] k = new byte[3];
2132    int rowCount = 0;
2133    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2134      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2135        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2136          k[0] = b1;
2137          k[1] = b2;
2138          k[2] = b3;
2139          Put put = new Put(k);
2140          put.setDurability(Durability.SKIP_WAL);
2141          put.addColumn(f, null, k);
2142          if (r.getWAL() == null) {
2143            put.setDurability(Durability.SKIP_WAL);
2144          }
2145          int preRowCount = rowCount;
2146          int pause = 10;
2147          int maxPause = 1000;
2148          while (rowCount == preRowCount) {
2149            try {
2150              r.put(put);
2151              rowCount++;
2152            } catch (RegionTooBusyException e) {
2153              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2154              Threads.sleep(pause);
2155            }
2156          }
2157        }
2158      }
2159      if (flush) {
2160        r.flush(true);
2161      }
2162    }
2163    return rowCount;
2164  }
2165
2166  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2167      throws IOException {
2168    for (int i = startRow; i < endRow; i++) {
2169      byte[] data = Bytes.toBytes(String.valueOf(i));
2170      Put put = new Put(data);
2171      put.addColumn(f, null, data);
2172      t.put(put);
2173    }
2174  }
2175
2176  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
2177      throws IOException {
2178    Random r = new Random();
2179    byte[] row = new byte[rowSize];
2180    for (int i = 0; i < totalRows; i++) {
2181      r.nextBytes(row);
2182      Put put = new Put(row);
2183      put.addColumn(f, new byte[]{0}, new byte[]{0});
2184      t.put(put);
2185    }
2186  }
2187
2188  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2189      int replicaId)
2190      throws IOException {
2191    for (int i = startRow; i < endRow; i++) {
2192      String failMsg = "Failed verification of row :" + i;
2193      byte[] data = Bytes.toBytes(String.valueOf(i));
2194      Get get = new Get(data);
2195      get.setReplicaId(replicaId);
2196      get.setConsistency(Consistency.TIMELINE);
2197      Result result = table.get(get);
2198      assertTrue(failMsg, result.containsColumn(f, null));
2199      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2200      Cell cell = result.getColumnLatestCell(f, null);
2201      assertTrue(failMsg,
2202        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2203          cell.getValueLength()));
2204    }
2205  }
2206
2207  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2208      throws IOException {
2209    verifyNumericRows((HRegion)region, f, startRow, endRow);
2210  }
2211
2212  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2213      throws IOException {
2214    verifyNumericRows(region, f, startRow, endRow, true);
2215  }
2216
2217  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2218      final boolean present) throws IOException {
2219    verifyNumericRows((HRegion)region, f, startRow, endRow, present);
2220  }
2221
2222  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2223      final boolean present) throws IOException {
2224    for (int i = startRow; i < endRow; i++) {
2225      String failMsg = "Failed verification of row :" + i;
2226      byte[] data = Bytes.toBytes(String.valueOf(i));
2227      Result result = region.get(new Get(data));
2228
2229      boolean hasResult = result != null && !result.isEmpty();
2230      assertEquals(failMsg + result, present, hasResult);
2231      if (!present) continue;
2232
2233      assertTrue(failMsg, result.containsColumn(f, null));
2234      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2235      Cell cell = result.getColumnLatestCell(f, null);
2236      assertTrue(failMsg,
2237        Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2238          cell.getValueLength()));
2239    }
2240  }
2241
2242  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2243      throws IOException {
2244    for (int i = startRow; i < endRow; i++) {
2245      byte[] data = Bytes.toBytes(String.valueOf(i));
2246      Delete delete = new Delete(data);
2247      delete.addFamily(f);
2248      t.delete(delete);
2249    }
2250  }
2251
2252  /**
2253   * Return the number of rows in the given table.
2254   */
2255  public int countRows(final Table table) throws IOException {
2256    return countRows(table, new Scan());
2257  }
2258
2259  public int countRows(final Table table, final Scan scan) throws IOException {
2260    try (ResultScanner results = table.getScanner(scan)) {
2261      int count = 0;
2262      while (results.next() != null) {
2263        count++;
2264      }
2265      return count;
2266    }
2267  }
2268
2269  public int countRows(final Table table, final byte[]... families) throws IOException {
2270    Scan scan = new Scan();
2271    for (byte[] family: families) {
2272      scan.addFamily(family);
2273    }
2274    return countRows(table, scan);
2275  }
2276
2277  /**
2278   * Return the number of rows in the given table.
2279   */
2280  public int countRows(final TableName tableName) throws IOException {
2281    Table table = getConnection().getTable(tableName);
2282    try {
2283      return countRows(table);
2284    } finally {
2285      table.close();
2286    }
2287  }
2288
2289  public int countRows(final Region region) throws IOException {
2290    return countRows(region, new Scan());
2291  }
2292
2293  public int countRows(final Region region, final Scan scan) throws IOException {
2294    InternalScanner scanner = region.getScanner(scan);
2295    try {
2296      return countRows(scanner);
2297    } finally {
2298      scanner.close();
2299    }
2300  }
2301
2302  public int countRows(final InternalScanner scanner) throws IOException {
2303    int scannedCount = 0;
2304    List<Cell> results = new ArrayList<>();
2305    boolean hasMore = true;
2306    while (hasMore) {
2307      hasMore = scanner.next(results);
2308      scannedCount += results.size();
2309      results.clear();
2310    }
2311    return scannedCount;
2312  }
2313
2314  /**
2315   * Return an md5 digest of the entire contents of a table.
2316   */
2317  public String checksumRows(final Table table) throws Exception {
2318
2319    Scan scan = new Scan();
2320    ResultScanner results = table.getScanner(scan);
2321    MessageDigest digest = MessageDigest.getInstance("MD5");
2322    for (Result res : results) {
2323      digest.update(res.getRow());
2324    }
2325    results.close();
2326    return digest.toString();
2327  }
2328
2329  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2330  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2331  static {
2332    int i = 0;
2333    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2334      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2335        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2336          ROWS[i][0] = b1;
2337          ROWS[i][1] = b2;
2338          ROWS[i][2] = b3;
2339          i++;
2340        }
2341      }
2342    }
2343  }
2344
2345  public static final byte[][] KEYS = {
2346    HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2347    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2348    Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2349    Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2350    Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2351    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2352    Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2353    Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2354    Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
2355  };
2356
2357  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
2358      Bytes.toBytes("bbb"),
2359      Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2360      Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2361      Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2362      Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2363      Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2364      Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2365      Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2366      Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
2367  };
2368
2369  /**
2370   * Create rows in hbase:meta for regions of the specified table with the specified
2371   * start keys.  The first startKey should be a 0 length byte array if you
2372   * want to form a proper range of regions.
2373   * @param conf
2374   * @param htd
2375   * @param startKeys
2376   * @return list of region info for regions added to meta
2377   * @throws IOException
2378   * @deprecated since 2.0 version and will be removed in 3.0 version.
2379   *             use {@link #createMultiRegionsInMeta(Configuration, TableDescriptor, byte[][])}
2380   */
2381  @Deprecated
2382  public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
2383      final HTableDescriptor htd, byte [][] startKeys) throws IOException {
2384    return createMultiRegionsInMeta(conf, (TableDescriptor) htd, startKeys)
2385        .stream().map(ImmutableHRegionInfo::new).collect(Collectors.toList());
2386  }
2387  /**
2388   * Create rows in hbase:meta for regions of the specified table with the specified
2389   * start keys.  The first startKey should be a 0 length byte array if you
2390   * want to form a proper range of regions.
2391   * @param conf
2392   * @param htd
2393   * @param startKeys
2394   * @return list of region info for regions added to meta
2395   * @throws IOException
2396   */
2397  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2398      final TableDescriptor htd, byte [][] startKeys)
2399  throws IOException {
2400    Table meta = getConnection().getTable(TableName.META_TABLE_NAME);
2401    Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2402    List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2403    MetaTableAccessor
2404        .updateTableState(getConnection(), htd.getTableName(), TableState.State.ENABLED);
2405    // add custom ones
2406    for (int i = 0; i < startKeys.length; i++) {
2407      int j = (i + 1) % startKeys.length;
2408      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName())
2409          .setStartKey(startKeys[i])
2410          .setEndKey(startKeys[j])
2411          .build();
2412      MetaTableAccessor.addRegionToMeta(getConnection(), hri);
2413      newRegions.add(hri);
2414    }
2415
2416    meta.close();
2417    return newRegions;
2418  }
2419
2420  /**
2421   * Create an unmanaged WAL. Be sure to close it when you're through.
2422   */
2423  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2424      throws IOException {
2425    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2426    // unless I pass along via the conf.
2427    Configuration confForWAL = new Configuration(conf);
2428    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2429    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2430  }
2431
2432  /**
2433   * Create a region with it's own WAL. Be sure to call
2434   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2435   */
2436  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2437      final Configuration conf, final TableDescriptor htd) throws IOException {
2438    return createRegionAndWAL(info, rootDir, conf, htd, true);
2439  }
2440
2441  /**
2442   * Create a region with it's own WAL. Be sure to call
2443   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
2444   */
2445  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2446      final Configuration conf, final TableDescriptor htd, boolean initialize)
2447      throws IOException {
2448    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2449    WAL wal = createWal(conf, rootDir, info);
2450    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2451  }
2452
2453  /**
2454   * Returns all rows from the hbase:meta table.
2455   *
2456   * @throws IOException When reading the rows fails.
2457   */
2458  public List<byte[]> getMetaTableRows() throws IOException {
2459    // TODO: Redo using MetaTableAccessor class
2460    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2461    List<byte[]> rows = new ArrayList<>();
2462    ResultScanner s = t.getScanner(new Scan());
2463    for (Result result : s) {
2464      LOG.info("getMetaTableRows: row -> " +
2465        Bytes.toStringBinary(result.getRow()));
2466      rows.add(result.getRow());
2467    }
2468    s.close();
2469    t.close();
2470    return rows;
2471  }
2472
2473  /**
2474   * Returns all rows from the hbase:meta table for a given user table
2475   *
2476   * @throws IOException When reading the rows fails.
2477   */
2478  public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2479    // TODO: Redo using MetaTableAccessor.
2480    Table t = getConnection().getTable(TableName.META_TABLE_NAME);
2481    List<byte[]> rows = new ArrayList<>();
2482    ResultScanner s = t.getScanner(new Scan());
2483    for (Result result : s) {
2484      RegionInfo info = MetaTableAccessor.getRegionInfo(result);
2485      if (info == null) {
2486        LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2487        // TODO figure out what to do for this new hosed case.
2488        continue;
2489      }
2490
2491      if (info.getTable().equals(tableName)) {
2492        LOG.info("getMetaTableRows: row -> " +
2493            Bytes.toStringBinary(result.getRow()) + info);
2494        rows.add(result.getRow());
2495      }
2496    }
2497    s.close();
2498    t.close();
2499    return rows;
2500  }
2501
2502  /**
2503   * Returns all regions of the specified table
2504   *
2505   * @param tableName the table name
2506   * @return all regions of the specified table
2507   * @throws IOException when getting the regions fails.
2508   */
2509  private List<RegionInfo> getRegions(TableName tableName) throws IOException {
2510    try (Admin admin = getConnection().getAdmin()) {
2511      return admin.getRegions(tableName);
2512    }
2513  }
2514
2515  /*
2516   * Find any other region server which is different from the one identified by parameter
2517   * @param rs
2518   * @return another region server
2519   */
2520  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2521    for (JVMClusterUtil.RegionServerThread rst :
2522      getMiniHBaseCluster().getRegionServerThreads()) {
2523      if (!(rst.getRegionServer() == rs)) {
2524        return rst.getRegionServer();
2525      }
2526    }
2527    return null;
2528  }
2529
2530  /**
2531   * Tool to get the reference to the region server object that holds the
2532   * region of the specified user table.
2533   * @param tableName user table to lookup in hbase:meta
2534   * @return region server that holds it, null if the row doesn't exist
2535   * @throws IOException
2536   * @throws InterruptedException
2537   */
2538  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2539      throws IOException, InterruptedException {
2540    List<RegionInfo> regions = getRegions(tableName);
2541    if (regions == null || regions.isEmpty()) {
2542      return null;
2543    }
2544    LOG.debug("Found " + regions.size() + " regions for table " +
2545        tableName);
2546
2547    byte[] firstRegionName = regions.stream()
2548        .filter(r -> !r.isOffline())
2549        .map(RegionInfo::getRegionName)
2550        .findFirst()
2551        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2552
2553    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2554    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2555      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2556    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2557      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2558    RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2559    while(retrier.shouldRetry()) {
2560      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2561      if (index != -1) {
2562        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2563      }
2564      // Came back -1.  Region may not be online yet.  Sleep a while.
2565      retrier.sleepUntilNextRetry();
2566    }
2567    return null;
2568  }
2569
2570  /**
2571   * Starts a <code>MiniMRCluster</code> with a default number of
2572   * <code>TaskTracker</code>'s.
2573   *
2574   * @throws IOException When starting the cluster fails.
2575   */
2576  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2577    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2578    conf.setIfUnset(
2579        "yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2580        "99.0");
2581    startMiniMapReduceCluster(2);
2582    return mrCluster;
2583  }
2584
2585  /**
2586   * Tasktracker has a bug where changing the hadoop.log.dir system property
2587   * will not change its internal static LOG_DIR variable.
2588   */
2589  private void forceChangeTaskLogDir() {
2590    Field logDirField;
2591    try {
2592      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2593      logDirField.setAccessible(true);
2594
2595      Field modifiersField = Field.class.getDeclaredField("modifiers");
2596      modifiersField.setAccessible(true);
2597      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2598
2599      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2600    } catch (SecurityException e) {
2601      throw new RuntimeException(e);
2602    } catch (NoSuchFieldException e) {
2603      // TODO Auto-generated catch block
2604      throw new RuntimeException(e);
2605    } catch (IllegalArgumentException e) {
2606      throw new RuntimeException(e);
2607    } catch (IllegalAccessException e) {
2608      throw new RuntimeException(e);
2609    }
2610  }
2611
2612  /**
2613   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2614   * filesystem.
2615   * @param servers  The number of <code>TaskTracker</code>'s to start.
2616   * @throws IOException When starting the cluster fails.
2617   */
2618  private void startMiniMapReduceCluster(final int servers) throws IOException {
2619    if (mrCluster != null) {
2620      throw new IllegalStateException("MiniMRCluster is already running");
2621    }
2622    LOG.info("Starting mini mapreduce cluster...");
2623    setupClusterTestDir();
2624    createDirsAndSetProperties();
2625
2626    forceChangeTaskLogDir();
2627
2628    //// hadoop2 specific settings
2629    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2630    // we up the VM usable so that processes don't get killed.
2631    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2632
2633    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2634    // this avoids the problem by disabling speculative task execution in tests.
2635    conf.setBoolean("mapreduce.map.speculative", false);
2636    conf.setBoolean("mapreduce.reduce.speculative", false);
2637    ////
2638
2639    // Allow the user to override FS URI for this map-reduce cluster to use.
2640    mrCluster = new MiniMRCluster(servers,
2641      FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2642      null, null, new JobConf(this.conf));
2643    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2644    if (jobConf == null) {
2645      jobConf = mrCluster.createJobConf();
2646    }
2647
2648    jobConf.set("mapreduce.cluster.local.dir",
2649      conf.get("mapreduce.cluster.local.dir")); //Hadoop MiniMR overwrites this while it should not
2650    LOG.info("Mini mapreduce cluster started");
2651
2652    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2653    // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2654    // necessary config properties here.  YARN-129 required adding a few properties.
2655    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2656    // this for mrv2 support; mr1 ignores this
2657    conf.set("mapreduce.framework.name", "yarn");
2658    conf.setBoolean("yarn.is.minicluster", true);
2659    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2660    if (rmAddress != null) {
2661      conf.set("yarn.resourcemanager.address", rmAddress);
2662    }
2663    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2664    if (historyAddress != null) {
2665      conf.set("mapreduce.jobhistory.address", historyAddress);
2666    }
2667    String schedulerAddress =
2668      jobConf.get("yarn.resourcemanager.scheduler.address");
2669    if (schedulerAddress != null) {
2670      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2671    }
2672    String mrJobHistoryWebappAddress =
2673      jobConf.get("mapreduce.jobhistory.webapp.address");
2674    if (mrJobHistoryWebappAddress != null) {
2675      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2676    }
2677    String yarnRMWebappAddress =
2678      jobConf.get("yarn.resourcemanager.webapp.address");
2679    if (yarnRMWebappAddress != null) {
2680      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2681    }
2682  }
2683
2684  /**
2685   * Stops the previously started <code>MiniMRCluster</code>.
2686   */
2687  public void shutdownMiniMapReduceCluster() {
2688    if (mrCluster != null) {
2689      LOG.info("Stopping mini mapreduce cluster...");
2690      mrCluster.shutdown();
2691      mrCluster = null;
2692      LOG.info("Mini mapreduce cluster stopped");
2693    }
2694    // Restore configuration to point to local jobtracker
2695    conf.set("mapreduce.jobtracker.address", "local");
2696  }
2697
2698  /**
2699   * Create a stubbed out RegionServerService, mainly for getting FS.
2700   */
2701  public RegionServerServices createMockRegionServerService() throws IOException {
2702    return createMockRegionServerService((ServerName)null);
2703  }
2704
2705  /**
2706   * Create a stubbed out RegionServerService, mainly for getting FS.
2707   * This version is used by TestTokenAuthentication
2708   */
2709  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws
2710      IOException {
2711    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2712    rss.setFileSystem(getTestFileSystem());
2713    rss.setRpcServer(rpc);
2714    return rss;
2715  }
2716
2717  /**
2718   * Create a stubbed out RegionServerService, mainly for getting FS.
2719   * This version is used by TestOpenRegionHandler
2720   */
2721  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2722    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2723    rss.setFileSystem(getTestFileSystem());
2724    return rss;
2725  }
2726
2727  /**
2728   * Switches the logger for the given class to DEBUG level.
2729   *
2730   * @param clazz  The class for which to switch to debug logging.
2731   */
2732  public void enableDebug(Class<?> clazz) {
2733    Logger l = LoggerFactory.getLogger(clazz);
2734    if (l instanceof Log4JLogger) {
2735      ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
2736    } else if (l instanceof Log4jLoggerAdapter) {
2737      LogManager.getLogger(clazz).setLevel(org.apache.log4j.Level.DEBUG);
2738    } else if (l instanceof Jdk14Logger) {
2739      ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
2740    }
2741  }
2742
2743  /**
2744   * Expire the Master's session
2745   * @throws Exception
2746   */
2747  public void expireMasterSession() throws Exception {
2748    HMaster master = getMiniHBaseCluster().getMaster();
2749    expireSession(master.getZooKeeper(), false);
2750  }
2751
2752  /**
2753   * Expire a region server's session
2754   * @param index which RS
2755   */
2756  public void expireRegionServerSession(int index) throws Exception {
2757    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2758    expireSession(rs.getZooKeeper(), false);
2759    decrementMinRegionServerCount();
2760  }
2761
2762  private void decrementMinRegionServerCount() {
2763    // decrement the count for this.conf, for newly spwaned master
2764    // this.hbaseCluster shares this configuration too
2765    decrementMinRegionServerCount(getConfiguration());
2766
2767    // each master thread keeps a copy of configuration
2768    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2769      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2770    }
2771  }
2772
2773  private void decrementMinRegionServerCount(Configuration conf) {
2774    int currentCount = conf.getInt(
2775        ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2776    if (currentCount != -1) {
2777      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
2778          Math.max(currentCount - 1, 1));
2779    }
2780  }
2781
2782  public void expireSession(ZKWatcher nodeZK) throws Exception {
2783   expireSession(nodeZK, false);
2784  }
2785
2786  /**
2787   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2788   * http://hbase.apache.org/book.html#trouble.zookeeper
2789   * There are issues when doing this:
2790   * [1] http://www.mail-archive.com/[email protected]/msg01942.html
2791   * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2792   *
2793   * @param nodeZK - the ZK watcher to expire
2794   * @param checkStatus - true to check if we can create a Table with the
2795   *                    current configuration.
2796   */
2797  public void expireSession(ZKWatcher nodeZK, boolean checkStatus)
2798    throws Exception {
2799    Configuration c = new Configuration(this.conf);
2800    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2801    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2802    byte[] password = zk.getSessionPasswd();
2803    long sessionID = zk.getSessionId();
2804
2805    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2806    //  so we create a first watcher to be sure that the
2807    //  event was sent. We expect that if our watcher receives the event
2808    //  other watchers on the same machine will get is as well.
2809    // When we ask to close the connection, ZK does not close it before
2810    //  we receive all the events, so don't have to capture the event, just
2811    //  closing the connection should be enough.
2812    ZooKeeper monitor = new ZooKeeper(quorumServers,
2813      1000, new org.apache.zookeeper.Watcher(){
2814      @Override
2815      public void process(WatchedEvent watchedEvent) {
2816        LOG.info("Monitor ZKW received event="+watchedEvent);
2817      }
2818    } , sessionID, password);
2819
2820    // Making it expire
2821    ZooKeeper newZK = new ZooKeeper(quorumServers,
2822        1000, EmptyWatcher.instance, sessionID, password);
2823
2824    //ensure that we have connection to the server before closing down, otherwise
2825    //the close session event will be eaten out before we start CONNECTING state
2826    long start = System.currentTimeMillis();
2827    while (newZK.getState() != States.CONNECTED
2828         && System.currentTimeMillis() - start < 1000) {
2829       Thread.sleep(1);
2830    }
2831    newZK.close();
2832    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2833
2834    // Now closing & waiting to be sure that the clients get it.
2835    monitor.close();
2836
2837    if (checkStatus) {
2838      getConnection().getTable(TableName.META_TABLE_NAME).close();
2839    }
2840  }
2841
2842  /**
2843   * Get the Mini HBase cluster.
2844   *
2845   * @return hbase cluster
2846   * @see #getHBaseClusterInterface()
2847   */
2848  public MiniHBaseCluster getHBaseCluster() {
2849    return getMiniHBaseCluster();
2850  }
2851
2852  /**
2853   * Returns the HBaseCluster instance.
2854   * <p>Returned object can be any of the subclasses of HBaseCluster, and the
2855   * tests referring this should not assume that the cluster is a mini cluster or a
2856   * distributed one. If the test only works on a mini cluster, then specific
2857   * method {@link #getMiniHBaseCluster()} can be used instead w/o the
2858   * need to type-cast.
2859   */
2860  public HBaseCluster getHBaseClusterInterface() {
2861    //implementation note: we should rename this method as #getHBaseCluster(),
2862    //but this would require refactoring 90+ calls.
2863    return hbaseCluster;
2864  }
2865
2866  /**
2867   * Get a Connection to the cluster.
2868   * Not thread-safe (This class needs a lot of work to make it thread-safe).
2869   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2870   * @throws IOException
2871   */
2872  public Connection getConnection() throws IOException {
2873    if (this.connection == null) {
2874      this.connection = ConnectionFactory.createConnection(this.conf);
2875    }
2876    return this.connection;
2877  }
2878
2879  /**
2880   * Returns a Admin instance.
2881   * This instance is shared between HBaseTestingUtility instance users. Closing it has no effect,
2882   * it will be closed automatically when the cluster shutdowns
2883   *
2884   * @return HBaseAdmin instance which is guaranteed to support only {@link Admin} interface.
2885   *   Functions in HBaseAdmin not provided by {@link Admin} interface can be changed/deleted
2886   *   anytime.
2887   * @deprecated Since 2.0. Will be removed in 3.0. Use {@link #getAdmin()} instead.
2888   */
2889  @Deprecated
2890  public synchronized HBaseAdmin getHBaseAdmin()
2891  throws IOException {
2892    if (hbaseAdmin == null){
2893      this.hbaseAdmin = (HBaseAdmin) getConnection().getAdmin();
2894    }
2895    return hbaseAdmin;
2896  }
2897
2898  /**
2899   * Returns an Admin instance which is shared between HBaseTestingUtility instance users.
2900   * Closing it has no effect, it will be closed automatically when the cluster shutdowns
2901   */
2902  public synchronized Admin getAdmin() throws IOException {
2903    if (hbaseAdmin == null){
2904      this.hbaseAdmin = (HBaseAdmin) getConnection().getAdmin();
2905    }
2906    return hbaseAdmin;
2907  }
2908
2909  private HBaseAdmin hbaseAdmin = null;
2910
2911  /**
2912   * Returns an {@link Hbck} instance. Needs be closed when done.
2913   */
2914  public Hbck getHbck() throws IOException {
2915    return ((ClusterConnection) getConnection()).getHbck();
2916  }
2917
2918  /**
2919   * Unassign the named region.
2920   *
2921   * @param regionName  The region to unassign.
2922   */
2923  public void unassignRegion(String regionName) throws IOException {
2924    unassignRegion(Bytes.toBytes(regionName));
2925  }
2926
2927  /**
2928   * Unassign the named region.
2929   *
2930   * @param regionName  The region to unassign.
2931   */
2932  public void unassignRegion(byte[] regionName) throws IOException {
2933    getAdmin().unassign(regionName, true);
2934  }
2935
2936  /**
2937   * Closes the region containing the given row.
2938   *
2939   * @param row  The row to find the containing region.
2940   * @param table  The table to find the region.
2941   */
2942  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2943    unassignRegionByRow(Bytes.toBytes(row), table);
2944  }
2945
2946  /**
2947   * Closes the region containing the given row.
2948   *
2949   * @param row  The row to find the containing region.
2950   * @param table  The table to find the region.
2951   * @throws IOException
2952   */
2953  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2954    HRegionLocation hrl = table.getRegionLocation(row);
2955    unassignRegion(hrl.getRegionInfo().getRegionName());
2956  }
2957
2958  /*
2959   * Retrieves a splittable region randomly from tableName
2960   *
2961   * @param tableName name of table
2962   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2963   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2964   */
2965  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2966    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2967    int regCount = regions.size();
2968    Set<Integer> attempted = new HashSet<>();
2969    int idx;
2970    int attempts = 0;
2971    do {
2972      regions = getHBaseCluster().getRegions(tableName);
2973      if (regCount != regions.size()) {
2974        // if there was region movement, clear attempted Set
2975        attempted.clear();
2976      }
2977      regCount = regions.size();
2978      // There are chances that before we get the region for the table from an RS the region may
2979      // be going for CLOSE.  This may be because online schema change is enabled
2980      if (regCount > 0) {
2981        idx = random.nextInt(regCount);
2982        // if we have just tried this region, there is no need to try again
2983        if (attempted.contains(idx))
2984          continue;
2985        try {
2986          regions.get(idx).checkSplit();
2987          return regions.get(idx);
2988        } catch (Exception ex) {
2989          LOG.warn("Caught exception", ex);
2990          attempted.add(idx);
2991        }
2992      }
2993      attempts++;
2994    } while (maxAttempts == -1 || attempts < maxAttempts);
2995    return null;
2996  }
2997
2998  public MiniDFSCluster getDFSCluster() {
2999    return dfsCluster;
3000  }
3001
3002  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3003    setDFSCluster(cluster, true);
3004  }
3005
3006  /**
3007   * Set the MiniDFSCluster
3008   * @param cluster cluster to use
3009   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before
3010   * it is set.
3011   * @throws IllegalStateException if the passed cluster is up when it is required to be down
3012   * @throws IOException if the FileSystem could not be set from the passed dfs cluster
3013   */
3014  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3015      throws IllegalStateException, IOException {
3016    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3017      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3018    }
3019    this.dfsCluster = cluster;
3020    this.setFs();
3021  }
3022
3023  public FileSystem getTestFileSystem() throws IOException {
3024    return HFileSystem.get(conf);
3025  }
3026
3027  /**
3028   * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
3029   * (30 seconds).
3030   * @param table Table to wait on.
3031   * @throws InterruptedException
3032   * @throws IOException
3033   */
3034  public void waitTableAvailable(TableName table)
3035      throws InterruptedException, IOException {
3036    waitTableAvailable(table.getName(), 30000);
3037  }
3038
3039  public void waitTableAvailable(TableName table, long timeoutMillis)
3040      throws InterruptedException, IOException {
3041    waitFor(timeoutMillis, predicateTableAvailable(table));
3042  }
3043
3044  /**
3045   * Wait until all regions in a table have been assigned
3046   * @param table Table to wait on.
3047   * @param timeoutMillis Timeout.
3048   * @throws InterruptedException
3049   * @throws IOException
3050   */
3051  public void waitTableAvailable(byte[] table, long timeoutMillis)
3052  throws InterruptedException, IOException {
3053    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
3054  }
3055
3056  public String explainTableAvailability(TableName tableName) throws IOException {
3057    String msg = explainTableState(tableName, TableState.State.ENABLED) + ", ";
3058    if (getHBaseCluster().getMaster().isAlive()) {
3059      Map<RegionInfo, ServerName> assignments =
3060          getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
3061              .getRegionAssignments();
3062      final List<Pair<RegionInfo, ServerName>> metaLocations =
3063          MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
3064      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
3065        RegionInfo hri = metaLocation.getFirst();
3066        ServerName sn = metaLocation.getSecond();
3067        if (!assignments.containsKey(hri)) {
3068          msg += ", region " + hri
3069              + " not assigned, but found in meta, it expected to be on " + sn;
3070
3071        } else if (sn == null) {
3072          msg += ",  region " + hri
3073              + " assigned,  but has no server in meta";
3074        } else if (!sn.equals(assignments.get(hri))) {
3075          msg += ",  region " + hri
3076              + " assigned,  but has different servers in meta and AM ( " +
3077              sn + " <> " + assignments.get(hri);
3078        }
3079      }
3080    }
3081    return msg;
3082  }
3083
3084  public String explainTableState(final TableName table, TableState.State state)
3085      throws IOException {
3086    TableState tableState = MetaTableAccessor.getTableState(connection, table);
3087    if (tableState == null) {
3088      return "TableState in META: No table state in META for table " + table
3089          + " last state in meta (including deleted is " + findLastTableState(table) + ")";
3090    } else if (!tableState.inStates(state)) {
3091      return "TableState in META: Not " + state + " state, but " + tableState;
3092    } else {
3093      return "TableState in META: OK";
3094    }
3095  }
3096
3097  @Nullable
3098  public TableState findLastTableState(final TableName table) throws IOException {
3099    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
3100    MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
3101      @Override
3102      public boolean visit(Result r) throws IOException {
3103        if (!Arrays.equals(r.getRow(), table.getName()))
3104          return false;
3105        TableState state = MetaTableAccessor.getTableState(r);
3106        if (state != null)
3107          lastTableState.set(state);
3108        return true;
3109      }
3110    };
3111    MetaTableAccessor
3112        .scanMeta(connection, null, null,
3113            MetaTableAccessor.QueryType.TABLE,
3114            Integer.MAX_VALUE, visitor);
3115    return lastTableState.get();
3116  }
3117
3118  /**
3119   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3120   * regions have been all assigned.  Will timeout after default period (30 seconds)
3121   * Tolerates nonexistent table.
3122   * @param table Table to wait on.
3123   * @param table
3124   * @throws InterruptedException
3125   * @throws IOException
3126   */
3127  public void waitTableEnabled(TableName table)
3128      throws InterruptedException, IOException {
3129    waitTableEnabled(table, 30000);
3130  }
3131
3132  /**
3133   * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3134   * regions have been all assigned.
3135   * @see #waitTableEnabled(TableName, long)
3136   * @param table Table to wait on.
3137   * @param timeoutMillis Time to wait on it being marked enabled.
3138   * @throws InterruptedException
3139   * @throws IOException
3140   */
3141  public void waitTableEnabled(byte[] table, long timeoutMillis)
3142  throws InterruptedException, IOException {
3143    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3144  }
3145
3146  public void waitTableEnabled(TableName table, long timeoutMillis)
3147  throws IOException {
3148    waitFor(timeoutMillis, predicateTableEnabled(table));
3149  }
3150
3151  /**
3152   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3153   * Will timeout after default period (30 seconds)
3154   * @param table Table to wait on.
3155   * @throws InterruptedException
3156   * @throws IOException
3157   */
3158  public void waitTableDisabled(byte[] table)
3159          throws InterruptedException, IOException {
3160    waitTableDisabled(table, 30000);
3161  }
3162
3163  public void waitTableDisabled(TableName table, long millisTimeout)
3164          throws InterruptedException, IOException {
3165    waitFor(millisTimeout, predicateTableDisabled(table));
3166  }
3167
3168  /**
3169   * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3170   * @param table Table to wait on.
3171   * @param timeoutMillis Time to wait on it being marked disabled.
3172   * @throws InterruptedException
3173   * @throws IOException
3174   */
3175  public void waitTableDisabled(byte[] table, long timeoutMillis)
3176          throws InterruptedException, IOException {
3177    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
3178  }
3179
3180  /**
3181   * Make sure that at least the specified number of region servers
3182   * are running
3183   * @param num minimum number of region servers that should be running
3184   * @return true if we started some servers
3185   * @throws IOException
3186   */
3187  public boolean ensureSomeRegionServersAvailable(final int num)
3188      throws IOException {
3189    boolean startedServer = false;
3190    MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3191    for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
3192      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3193      startedServer = true;
3194    }
3195
3196    return startedServer;
3197  }
3198
3199
3200  /**
3201   * Make sure that at least the specified number of region servers
3202   * are running. We don't count the ones that are currently stopping or are
3203   * stopped.
3204   * @param num minimum number of region servers that should be running
3205   * @return true if we started some servers
3206   * @throws IOException
3207   */
3208  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
3209    throws IOException {
3210    boolean startedServer = ensureSomeRegionServersAvailable(num);
3211
3212    int nonStoppedServers = 0;
3213    for (JVMClusterUtil.RegionServerThread rst :
3214      getMiniHBaseCluster().getRegionServerThreads()) {
3215
3216      HRegionServer hrs = rst.getRegionServer();
3217      if (hrs.isStopping() || hrs.isStopped()) {
3218        LOG.info("A region server is stopped or stopping:"+hrs);
3219      } else {
3220        nonStoppedServers++;
3221      }
3222    }
3223    for (int i=nonStoppedServers; i<num; ++i) {
3224      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3225      startedServer = true;
3226    }
3227    return startedServer;
3228  }
3229
3230
3231  /**
3232   * This method clones the passed <code>c</code> configuration setting a new
3233   * user into the clone.  Use it getting new instances of FileSystem.  Only
3234   * works for DistributedFileSystem w/o Kerberos.
3235   * @param c Initial configuration
3236   * @param differentiatingSuffix Suffix to differentiate this user from others.
3237   * @return A new configuration instance with a different user set into it.
3238   * @throws IOException
3239   */
3240  public static User getDifferentUser(final Configuration c,
3241    final String differentiatingSuffix)
3242  throws IOException {
3243    FileSystem currentfs = FileSystem.get(c);
3244    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
3245      return User.getCurrent();
3246    }
3247    // Else distributed filesystem.  Make a new instance per daemon.  Below
3248    // code is taken from the AppendTestUtil over in hdfs.
3249    String username = User.getCurrent().getName() +
3250      differentiatingSuffix;
3251    User user = User.createUserForTesting(c, username,
3252        new String[]{"supergroup"});
3253    return user;
3254  }
3255
3256  public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3257      throws IOException {
3258    NavigableSet<String> online = new TreeSet<>();
3259    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3260      try {
3261        for (RegionInfo region :
3262            ProtobufUtil.getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3263          online.add(region.getRegionNameAsString());
3264        }
3265      } catch (RegionServerStoppedException e) {
3266        // That's fine.
3267      }
3268    }
3269    for (MasterThread mt : cluster.getLiveMasterThreads()) {
3270      try {
3271        for (RegionInfo region :
3272            ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
3273          online.add(region.getRegionNameAsString());
3274        }
3275      } catch (RegionServerStoppedException e) {
3276        // That's fine.
3277      } catch (ServerNotRunningYetException e) {
3278        // That's fine.
3279      }
3280    }
3281    return online;
3282  }
3283
3284  /**
3285   * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
3286   * makes tests linger.  Here is the exception you'll see:
3287   * <pre>
3288   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
3289   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
3290   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
3291   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3292   * </pre>
3293   * @param stream A DFSClient.DFSOutputStream.
3294   * @param max
3295   * @throws NoSuchFieldException
3296   * @throws SecurityException
3297   * @throws IllegalAccessException
3298   * @throws IllegalArgumentException
3299   */
3300  public static void setMaxRecoveryErrorCount(final OutputStream stream,
3301      final int max) {
3302    try {
3303      Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
3304      for (Class<?> clazz: clazzes) {
3305        String className = clazz.getSimpleName();
3306        if (className.equals("DFSOutputStream")) {
3307          if (clazz.isInstance(stream)) {
3308            Field maxRecoveryErrorCountField =
3309              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3310            maxRecoveryErrorCountField.setAccessible(true);
3311            maxRecoveryErrorCountField.setInt(stream, max);
3312            break;
3313          }
3314        }
3315      }
3316    } catch (Exception e) {
3317      LOG.info("Could not set max recovery field", e);
3318    }
3319  }
3320
3321  /**
3322   * Uses directly the assignment manager to assign the region. and waits until the specified region
3323   * has completed assignment.
3324   * @return true if the region is assigned false otherwise.
3325   */
3326  public boolean assignRegion(final RegionInfo regionInfo)
3327      throws IOException, InterruptedException {
3328    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
3329    am.assign(regionInfo);
3330    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
3331  }
3332
3333  /**
3334   * Move region to destination server and wait till region is completely moved and online
3335   *
3336   * @param destRegion region to move
3337   * @param destServer destination server of the region
3338   * @throws InterruptedException
3339   * @throws IOException
3340   */
3341  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3342      throws InterruptedException, IOException {
3343    HMaster master = getMiniHBaseCluster().getMaster();
3344    // TODO: Here we start the move. The move can take a while.
3345    getAdmin().move(destRegion.getEncodedNameAsBytes(),
3346        Bytes.toBytes(destServer.getServerName()));
3347    while (true) {
3348      ServerName serverName = master.getAssignmentManager().getRegionStates()
3349          .getRegionServerOfRegion(destRegion);
3350      if (serverName != null && serverName.equals(destServer)) {
3351        assertRegionOnServer(destRegion, serverName, 2000);
3352        break;
3353      }
3354      Thread.sleep(10);
3355    }
3356  }
3357
3358  /**
3359   * Wait until all regions for a table in hbase:meta have a non-empty
3360   * info:server, up to a configuable timeout value (default is 60 seconds)
3361   * This means all regions have been deployed,
3362   * master has been informed and updated hbase:meta with the regions deployed
3363   * server.
3364   * @param tableName the table name
3365   * @throws IOException
3366   */
3367  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3368    waitUntilAllRegionsAssigned(tableName,
3369      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3370  }
3371
3372  /**
3373   * Waith until all system table's regions get assigned
3374   * @throws IOException
3375   */
3376  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3377    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3378    waitUntilAllRegionsAssigned(TableName.NAMESPACE_TABLE_NAME);
3379  }
3380
3381  /**
3382   * Wait until all regions for a table in hbase:meta have a non-empty
3383   * info:server, or until timeout.  This means all regions have been deployed,
3384   * master has been informed and updated hbase:meta with the regions deployed
3385   * server.
3386   * @param tableName the table name
3387   * @param timeout timeout, in milliseconds
3388   * @throws IOException
3389   */
3390  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3391      throws IOException {
3392    if (!TableName.isMetaTableName(tableName)) {
3393      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3394        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " +
3395            timeout + "ms");
3396        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3397          @Override
3398          public String explainFailure() throws IOException {
3399            return explainTableAvailability(tableName);
3400          }
3401
3402          @Override
3403          public boolean evaluate() throws IOException {
3404            Scan scan = new Scan();
3405            scan.addFamily(HConstants.CATALOG_FAMILY);
3406            boolean tableFound = false;
3407            try (ResultScanner s = meta.getScanner(scan)) {
3408              for (Result r; (r = s.next()) != null;) {
3409                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3410                HRegionInfo info = HRegionInfo.parseFromOrNull(b);
3411                if (info != null && info.getTable().equals(tableName)) {
3412                  // Get server hosting this region from catalog family. Return false if no server
3413                  // hosting this region, or if the server hosting this region was recently killed
3414                  // (for fault tolerance testing).
3415                  tableFound = true;
3416                  byte[] server =
3417                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3418                  if (server == null) {
3419                    return false;
3420                  } else {
3421                    byte[] startCode =
3422                        r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3423                    ServerName serverName =
3424                        ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," +
3425                            Bytes.toLong(startCode));
3426                    if (!getHBaseClusterInterface().isDistributedCluster() &&
3427                        getHBaseCluster().isKilledRS(serverName)) {
3428                      return false;
3429                    }
3430                  }
3431                  if (RegionStateStore.getRegionState(r,
3432                    info.getReplicaId()) != RegionState.State.OPEN) {
3433                    return false;
3434                  }
3435                }
3436              }
3437            }
3438            if (!tableFound) {
3439              LOG.warn("Didn't find the entries for table " + tableName + " in meta, already deleted?");
3440            }
3441            return tableFound;
3442          }
3443        });
3444      }
3445    }
3446    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3447    // check from the master state if we are using a mini cluster
3448    if (!getHBaseClusterInterface().isDistributedCluster()) {
3449      // So, all regions are in the meta table but make sure master knows of the assignments before
3450      // returning -- sometimes this can lag.
3451      HMaster master = getHBaseCluster().getMaster();
3452      final RegionStates states = master.getAssignmentManager().getRegionStates();
3453      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3454        @Override
3455        public String explainFailure() throws IOException {
3456          return explainTableAvailability(tableName);
3457        }
3458
3459        @Override
3460        public boolean evaluate() throws IOException {
3461          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3462          return hris != null && !hris.isEmpty();
3463        }
3464      });
3465    }
3466    LOG.info("All regions for table " + tableName + " assigned.");
3467  }
3468
3469  /**
3470   * Do a small get/scan against one store. This is required because store
3471   * has no actual methods of querying itself, and relies on StoreScanner.
3472   */
3473  public static List<Cell> getFromStoreFile(HStore store,
3474                                                Get get) throws IOException {
3475    Scan scan = new Scan(get);
3476    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3477        scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3478        // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3479        // readpoint 0.
3480        0);
3481
3482    List<Cell> result = new ArrayList<>();
3483    scanner.next(result);
3484    if (!result.isEmpty()) {
3485      // verify that we are on the row we want:
3486      Cell kv = result.get(0);
3487      if (!CellUtil.matchingRows(kv, get.getRow())) {
3488        result.clear();
3489      }
3490    }
3491    scanner.close();
3492    return result;
3493  }
3494
3495  /**
3496   * Create region split keys between startkey and endKey
3497   *
3498   * @param startKey
3499   * @param endKey
3500   * @param numRegions the number of regions to be created. it has to be greater than 3.
3501   * @return
3502   */
3503  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3504    assertTrue(numRegions>3);
3505    byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3506    byte [][] result = new byte[tmpSplitKeys.length+1][];
3507    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3508    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3509    return result;
3510  }
3511
3512  /**
3513   * Do a small get/scan against one store. This is required because store
3514   * has no actual methods of querying itself, and relies on StoreScanner.
3515   */
3516  public static List<Cell> getFromStoreFile(HStore store,
3517                                                byte [] row,
3518                                                NavigableSet<byte[]> columns
3519                                                ) throws IOException {
3520    Get get = new Get(row);
3521    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3522    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3523
3524    return getFromStoreFile(store,get);
3525  }
3526
3527  public static void assertKVListsEqual(String additionalMsg,
3528      final List<? extends Cell> expected,
3529      final List<? extends Cell> actual) {
3530    final int eLen = expected.size();
3531    final int aLen = actual.size();
3532    final int minLen = Math.min(eLen, aLen);
3533
3534    int i;
3535    for (i = 0; i < minLen
3536        && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0;
3537        ++i) {}
3538
3539    if (additionalMsg == null) {
3540      additionalMsg = "";
3541    }
3542    if (!additionalMsg.isEmpty()) {
3543      additionalMsg = ". " + additionalMsg;
3544    }
3545
3546    if (eLen != aLen || i != minLen) {
3547      throw new AssertionError(
3548          "Expected and actual KV arrays differ at position " + i + ": " +
3549          safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3550          safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3551    }
3552  }
3553
3554  public static <T> String safeGetAsStr(List<T> lst, int i) {
3555    if (0 <= i && i < lst.size()) {
3556      return lst.get(i).toString();
3557    } else {
3558      return "<out_of_range>";
3559    }
3560  }
3561
3562  public String getClusterKey() {
3563    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3564        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3565        + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3566            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3567  }
3568
3569  /** Creates a random table with the given parameters */
3570  public Table createRandomTable(TableName tableName,
3571      final Collection<String> families,
3572      final int maxVersions,
3573      final int numColsPerRow,
3574      final int numFlushes,
3575      final int numRegions,
3576      final int numRowsPerFlush)
3577      throws IOException, InterruptedException {
3578
3579    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3580        " regions, " + numFlushes + " storefiles per region, " +
3581        numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3582        "\n");
3583
3584    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3585    final int numCF = families.size();
3586    final byte[][] cfBytes = new byte[numCF][];
3587    {
3588      int cfIndex = 0;
3589      for (String cf : families) {
3590        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3591      }
3592    }
3593
3594    final int actualStartKey = 0;
3595    final int actualEndKey = Integer.MAX_VALUE;
3596    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3597    final int splitStartKey = actualStartKey + keysPerRegion;
3598    final int splitEndKey = actualEndKey - keysPerRegion;
3599    final String keyFormat = "%08x";
3600    final Table table = createTable(tableName, cfBytes,
3601        maxVersions,
3602        Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3603        Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3604        numRegions);
3605
3606    if (hbaseCluster != null) {
3607      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3608    }
3609
3610    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3611
3612    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3613      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3614        final byte[] row = Bytes.toBytes(String.format(keyFormat,
3615            actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3616
3617        Put put = new Put(row);
3618        Delete del = new Delete(row);
3619        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3620          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3621          final long ts = rand.nextInt();
3622          final byte[] qual = Bytes.toBytes("col" + iCol);
3623          if (rand.nextBoolean()) {
3624            final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3625                "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3626                ts + "_random_" + rand.nextLong());
3627            put.addColumn(cf, qual, ts, value);
3628          } else if (rand.nextDouble() < 0.8) {
3629            del.addColumn(cf, qual, ts);
3630          } else {
3631            del.addColumns(cf, qual, ts);
3632          }
3633        }
3634
3635        if (!put.isEmpty()) {
3636          mutator.mutate(put);
3637        }
3638
3639        if (!del.isEmpty()) {
3640          mutator.mutate(del);
3641        }
3642      }
3643      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3644      mutator.flush();
3645      if (hbaseCluster != null) {
3646        getMiniHBaseCluster().flushcache(table.getName());
3647      }
3648    }
3649    mutator.close();
3650
3651    return table;
3652  }
3653
3654  private static Random random = new Random();
3655
3656  private static final PortAllocator portAllocator = new PortAllocator(random);
3657
3658  public static int randomFreePort() {
3659    return portAllocator.randomFreePort();
3660  }
3661
3662  static class PortAllocator {
3663    private static final int MIN_RANDOM_PORT = 0xc000;
3664    private static final int MAX_RANDOM_PORT = 0xfffe;
3665
3666    /** A set of ports that have been claimed using {@link #randomFreePort()}. */
3667    private final Set<Integer> takenRandomPorts = new HashSet<>();
3668
3669    private final Random random;
3670    private final AvailablePortChecker portChecker;
3671
3672    public PortAllocator(Random random) {
3673      this.random = random;
3674      this.portChecker = new AvailablePortChecker() {
3675        @Override
3676        public boolean available(int port) {
3677          try {
3678            ServerSocket sock = new ServerSocket(port);
3679            sock.close();
3680            return true;
3681          } catch (IOException ex) {
3682            return false;
3683          }
3684        }
3685      };
3686    }
3687
3688    public PortAllocator(Random random, AvailablePortChecker portChecker) {
3689      this.random = random;
3690      this.portChecker = portChecker;
3691    }
3692
3693    /**
3694     * Returns a random free port and marks that port as taken. Not thread-safe. Expected to be
3695     * called from single-threaded test setup code/
3696     */
3697    public int randomFreePort() {
3698      int port = 0;
3699      do {
3700        port = randomPort();
3701        if (takenRandomPorts.contains(port)) {
3702          port = 0;
3703          continue;
3704        }
3705        takenRandomPorts.add(port);
3706
3707        if (!portChecker.available(port)) {
3708          port = 0;
3709        }
3710      } while (port == 0);
3711      return port;
3712    }
3713
3714    /**
3715     * Returns a random port. These ports cannot be registered with IANA and are
3716     * intended for dynamic allocation (see http://bit.ly/dynports).
3717     */
3718    private int randomPort() {
3719      return MIN_RANDOM_PORT
3720          + random.nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
3721    }
3722
3723    interface AvailablePortChecker {
3724      boolean available(int port);
3725    }
3726  }
3727
3728  public static String randomMultiCastAddress() {
3729    return "226.1.1." + random.nextInt(254);
3730  }
3731
3732  public static void waitForHostPort(String host, int port)
3733      throws IOException {
3734    final int maxTimeMs = 10000;
3735    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3736    IOException savedException = null;
3737    LOG.info("Waiting for server at " + host + ":" + port);
3738    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3739      try {
3740        Socket sock = new Socket(InetAddress.getByName(host), port);
3741        sock.close();
3742        savedException = null;
3743        LOG.info("Server at " + host + ":" + port + " is available");
3744        break;
3745      } catch (UnknownHostException e) {
3746        throw new IOException("Failed to look up " + host, e);
3747      } catch (IOException e) {
3748        savedException = e;
3749      }
3750      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3751    }
3752
3753    if (savedException != null) {
3754      throw savedException;
3755    }
3756  }
3757
3758  /**
3759   * Creates a pre-split table for load testing. If the table already exists,
3760   * logs a warning and continues.
3761   * @return the number of regions the table was split into
3762   */
3763  public static int createPreSplitLoadTestTable(Configuration conf,
3764      TableName tableName, byte[] columnFamily, Algorithm compression,
3765      DataBlockEncoding dataBlockEncoding) throws IOException {
3766    return createPreSplitLoadTestTable(conf, tableName,
3767      columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
3768      Durability.USE_DEFAULT);
3769  }
3770  /**
3771   * Creates a pre-split table for load testing. If the table already exists,
3772   * logs a warning and continues.
3773   * @return the number of regions the table was split into
3774   */
3775  public static int createPreSplitLoadTestTable(Configuration conf,
3776      TableName tableName, byte[] columnFamily, Algorithm compression,
3777      DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3778      Durability durability)
3779          throws IOException {
3780    HTableDescriptor desc = new HTableDescriptor(tableName);
3781    desc.setDurability(durability);
3782    desc.setRegionReplication(regionReplication);
3783    HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
3784    hcd.setDataBlockEncoding(dataBlockEncoding);
3785    hcd.setCompressionType(compression);
3786    return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer);
3787  }
3788
3789  /**
3790   * Creates a pre-split table for load testing. If the table already exists,
3791   * logs a warning and continues.
3792   * @return the number of regions the table was split into
3793   */
3794  public static int createPreSplitLoadTestTable(Configuration conf,
3795      TableName tableName, byte[][] columnFamilies, Algorithm compression,
3796      DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3797      Durability durability)
3798          throws IOException {
3799    HTableDescriptor desc = new HTableDescriptor(tableName);
3800    desc.setDurability(durability);
3801    desc.setRegionReplication(regionReplication);
3802    HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length];
3803    for (int i = 0; i < columnFamilies.length; i++) {
3804      HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]);
3805      hcd.setDataBlockEncoding(dataBlockEncoding);
3806      hcd.setCompressionType(compression);
3807      hcds[i] = hcd;
3808    }
3809    return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer);
3810  }
3811
3812  /**
3813   * Creates a pre-split table for load testing. If the table already exists,
3814   * logs a warning and continues.
3815   * @return the number of regions the table was split into
3816   */
3817  public static int createPreSplitLoadTestTable(Configuration conf,
3818      TableDescriptor desc, ColumnFamilyDescriptor hcd) throws IOException {
3819    return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3820  }
3821
3822  /**
3823   * Creates a pre-split table for load testing. If the table already exists,
3824   * logs a warning and continues.
3825   * @return the number of regions the table was split into
3826   */
3827  public static int createPreSplitLoadTestTable(Configuration conf,
3828      TableDescriptor desc, ColumnFamilyDescriptor hcd, int numRegionsPerServer) throws IOException {
3829    return createPreSplitLoadTestTable(conf, desc, new ColumnFamilyDescriptor[] {hcd},
3830        numRegionsPerServer);
3831  }
3832
3833  /**
3834   * Creates a pre-split table for load testing. If the table already exists,
3835   * logs a warning and continues.
3836   * @return the number of regions the table was split into
3837   */
3838  public static int createPreSplitLoadTestTable(Configuration conf,
3839      TableDescriptor desc, ColumnFamilyDescriptor[] hcds,
3840      int numRegionsPerServer) throws IOException {
3841    return createPreSplitLoadTestTable(conf, desc, hcds,
3842      new RegionSplitter.HexStringSplit(), numRegionsPerServer);
3843  }
3844
3845  /**
3846   * Creates a pre-split table for load testing. If the table already exists,
3847   * logs a warning and continues.
3848   * @return the number of regions the table was split into
3849   */
3850  public static int createPreSplitLoadTestTable(Configuration conf,
3851      TableDescriptor td, ColumnFamilyDescriptor[] cds,
3852      SplitAlgorithm splitter, int numRegionsPerServer) throws IOException {
3853    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
3854    for (ColumnFamilyDescriptor cd : cds) {
3855      if (!td.hasColumnFamily(cd.getName())) {
3856        builder.setColumnFamily(cd);
3857      }
3858    }
3859    td = builder.build();
3860    int totalNumberOfRegions = 0;
3861    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3862    Admin admin = unmanagedConnection.getAdmin();
3863
3864    try {
3865      // create a table a pre-splits regions.
3866      // The number of splits is set as:
3867      //    region servers * regions per region server).
3868      int numberOfServers =
3869          admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
3870              .size();
3871      if (numberOfServers == 0) {
3872        throw new IllegalStateException("No live regionservers");
3873      }
3874
3875      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3876      LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3877          "pre-splitting table into " + totalNumberOfRegions + " regions " +
3878          "(regions per server: " + numRegionsPerServer + ")");
3879
3880      byte[][] splits = splitter.split(
3881          totalNumberOfRegions);
3882
3883      admin.createTable(td, splits);
3884    } catch (MasterNotRunningException e) {
3885      LOG.error("Master not running", e);
3886      throw new IOException(e);
3887    } catch (TableExistsException e) {
3888      LOG.warn("Table " + td.getTableName() +
3889          " already exists, continuing");
3890    } finally {
3891      admin.close();
3892      unmanagedConnection.close();
3893    }
3894    return totalNumberOfRegions;
3895  }
3896
3897  public static int getMetaRSPort(Connection connection) throws IOException {
3898    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3899      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3900    }
3901  }
3902
3903  /**
3904   *  Due to async racing issue, a region may not be in
3905   *  the online region list of a region server yet, after
3906   *  the assignment znode is deleted and the new assignment
3907   *  is recorded in master.
3908   */
3909  public void assertRegionOnServer(
3910      final RegionInfo hri, final ServerName server,
3911      final long timeout) throws IOException, InterruptedException {
3912    long timeoutTime = System.currentTimeMillis() + timeout;
3913    while (true) {
3914      List<RegionInfo> regions = getAdmin().getRegions(server);
3915      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3916      long now = System.currentTimeMillis();
3917      if (now > timeoutTime) break;
3918      Thread.sleep(10);
3919    }
3920    fail("Could not find region " + hri.getRegionNameAsString()
3921      + " on server " + server);
3922  }
3923
3924  /**
3925   * Check to make sure the region is open on the specified
3926   * region server, but not on any other one.
3927   */
3928  public void assertRegionOnlyOnServer(
3929      final RegionInfo hri, final ServerName server,
3930      final long timeout) throws IOException, InterruptedException {
3931    long timeoutTime = System.currentTimeMillis() + timeout;
3932    while (true) {
3933      List<RegionInfo> regions = getAdmin().getRegions(server);
3934      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3935        List<JVMClusterUtil.RegionServerThread> rsThreads =
3936          getHBaseCluster().getLiveRegionServerThreads();
3937        for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
3938          HRegionServer rs = rsThread.getRegionServer();
3939          if (server.equals(rs.getServerName())) {
3940            continue;
3941          }
3942          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3943          for (HRegion r: hrs) {
3944            assertTrue("Region should not be double assigned",
3945              r.getRegionInfo().getRegionId() != hri.getRegionId());
3946          }
3947        }
3948        return; // good, we are happy
3949      }
3950      long now = System.currentTimeMillis();
3951      if (now > timeoutTime) break;
3952      Thread.sleep(10);
3953    }
3954    fail("Could not find region " + hri.getRegionNameAsString()
3955      + " on server " + server);
3956  }
3957
3958  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd)
3959      throws IOException {
3960    TableDescriptor td
3961        = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
3962            .setColumnFamily(cd)
3963            .build();
3964    HRegionInfo info =
3965        new HRegionInfo(TableName.valueOf(tableName), null, null, false);
3966    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3967  }
3968
3969  public void setFileSystemURI(String fsURI) {
3970    FS_URI = fsURI;
3971  }
3972
3973  /**
3974   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3975   */
3976  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3977    return new ExplainingPredicate<IOException>() {
3978      @Override
3979      public String explainFailure() throws IOException {
3980        final RegionStates regionStates = getMiniHBaseCluster().getMaster()
3981            .getAssignmentManager().getRegionStates();
3982        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3983      }
3984
3985      @Override
3986      public boolean evaluate() throws IOException {
3987        HMaster master = getMiniHBaseCluster().getMaster();
3988        if (master == null) return false;
3989        AssignmentManager am = master.getAssignmentManager();
3990        if (am == null) return false;
3991        return !am.hasRegionsInTransition();
3992      }
3993    };
3994  }
3995
3996  /**
3997   * Returns a {@link Predicate} for checking that table is enabled
3998   */
3999  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
4000    return new ExplainingPredicate<IOException>() {
4001      @Override
4002      public String explainFailure() throws IOException {
4003        return explainTableState(tableName, TableState.State.ENABLED);
4004      }
4005
4006      @Override
4007      public boolean evaluate() throws IOException {
4008        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
4009      }
4010    };
4011  }
4012
4013  /**
4014   * Returns a {@link Predicate} for checking that table is enabled
4015   */
4016  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
4017    return new ExplainingPredicate<IOException>() {
4018      @Override
4019      public String explainFailure() throws IOException {
4020        return explainTableState(tableName, TableState.State.DISABLED);
4021      }
4022
4023      @Override
4024      public boolean evaluate() throws IOException {
4025        return getAdmin().isTableDisabled(tableName);
4026      }
4027    };
4028  }
4029
4030  /**
4031   * Returns a {@link Predicate} for checking that table is enabled
4032   */
4033  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
4034    return new ExplainingPredicate<IOException>() {
4035      @Override
4036      public String explainFailure() throws IOException {
4037        return explainTableAvailability(tableName);
4038      }
4039
4040      @Override
4041      public boolean evaluate() throws IOException {
4042        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
4043        if (tableAvailable) {
4044          try (Table table = getConnection().getTable(tableName)) {
4045            TableDescriptor htd = table.getDescriptor();
4046            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
4047                .getAllRegionLocations()) {
4048              Scan scan = new Scan().withStartRow(loc.getRegionInfo().getStartKey())
4049                  .withStopRow(loc.getRegionInfo().getEndKey()).setOneRowLimit()
4050                  .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
4051              for (byte[] family : htd.getColumnFamilyNames()) {
4052                scan.addFamily(family);
4053              }
4054              try (ResultScanner scanner = table.getScanner(scan)) {
4055                scanner.next();
4056              }
4057            }
4058          }
4059        }
4060        return tableAvailable;
4061      }
4062    };
4063  }
4064
4065  /**
4066   * Wait until no regions in transition.
4067   * @param timeout How long to wait.
4068   * @throws IOException
4069   */
4070  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
4071    waitFor(timeout, predicateNoRegionsInTransition());
4072  }
4073
4074  /**
4075   * Wait until no regions in transition. (time limit 15min)
4076   * @throws IOException
4077   */
4078  public void waitUntilNoRegionsInTransition() throws IOException {
4079    waitUntilNoRegionsInTransition(15 * 60000);
4080  }
4081
4082  /**
4083   * Wait until labels is ready in VisibilityLabelsCache.
4084   * @param timeoutMillis
4085   * @param labels
4086   */
4087  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4088    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4089    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4090
4091      @Override
4092      public boolean evaluate() {
4093        for (String label : labels) {
4094          if (labelsCache.getLabelOrdinal(label) == 0) {
4095            return false;
4096          }
4097        }
4098        return true;
4099      }
4100
4101      @Override
4102      public String explainFailure() {
4103        for (String label : labels) {
4104          if (labelsCache.getLabelOrdinal(label) == 0) {
4105            return label + " is not available yet";
4106          }
4107        }
4108        return "";
4109      }
4110    });
4111  }
4112
4113  /**
4114   * Create a set of column descriptors with the combination of compression,
4115   * encoding, bloom codecs available.
4116   * @return the list of column descriptors
4117   */
4118  public static List<HColumnDescriptor> generateColumnDescriptors() {
4119    return generateColumnDescriptors("");
4120  }
4121
4122  /**
4123   * Create a set of column descriptors with the combination of compression,
4124   * encoding, bloom codecs available.
4125   * @param prefix family names prefix
4126   * @return the list of column descriptors
4127   */
4128  public static List<HColumnDescriptor> generateColumnDescriptors(final String prefix) {
4129    List<HColumnDescriptor> htds = new ArrayList<>();
4130    long familyId = 0;
4131    for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
4132      for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
4133        for (BloomType bloomType: BloomType.values()) {
4134          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4135          HColumnDescriptor htd = new HColumnDescriptor(name);
4136          htd.setCompressionType(compressionType);
4137          htd.setDataBlockEncoding(encodingType);
4138          htd.setBloomFilterType(bloomType);
4139          htds.add(htd);
4140          familyId++;
4141        }
4142      }
4143    }
4144    return htds;
4145  }
4146
4147  /**
4148   * Get supported compression algorithms.
4149   * @return supported compression algorithms.
4150   */
4151  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4152    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4153    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
4154    for (String algoName : allAlgos) {
4155      try {
4156        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4157        algo.getCompressor();
4158        supportedAlgos.add(algo);
4159      } catch (Throwable t) {
4160        // this algo is not available
4161      }
4162    }
4163    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4164  }
4165
4166  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
4167    Scan scan = new Scan(row);
4168    scan.setSmall(true);
4169    scan.setCaching(1);
4170    scan.setReversed(true);
4171    scan.addFamily(family);
4172    try (RegionScanner scanner = r.getScanner(scan)) {
4173      List<Cell> cells = new ArrayList<>(1);
4174      scanner.next(cells);
4175      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
4176        return null;
4177      }
4178      return Result.create(cells);
4179    }
4180  }
4181
4182  private boolean isTargetTable(final byte[] inRow, Cell c) {
4183    String inputRowString = Bytes.toString(inRow);
4184    int i = inputRowString.indexOf(HConstants.DELIMITER);
4185    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
4186    int o = outputRowString.indexOf(HConstants.DELIMITER);
4187    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
4188  }
4189
4190  /**
4191   * Sets up {@link MiniKdc} for testing security.
4192   * Uses {@link HBaseKerberosUtils} to set the given keytab file as
4193   * {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
4194   */
4195  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4196    Properties conf = MiniKdc.createConf();
4197    conf.put(MiniKdc.DEBUG, true);
4198    MiniKdc kdc = null;
4199    File dir = null;
4200    // There is time lag between selecting a port and trying to bind with it. It's possible that
4201    // another service captures the port in between which'll result in BindException.
4202    boolean bindException;
4203    int numTries = 0;
4204    do {
4205      try {
4206        bindException = false;
4207        dir = new File(getDataTestDir("kdc").toUri().getPath());
4208        kdc = new MiniKdc(conf, dir);
4209        kdc.start();
4210      } catch (BindException e) {
4211        FileUtils.deleteDirectory(dir);  // clean directory
4212        numTries++;
4213        if (numTries == 3) {
4214          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4215          throw e;
4216        }
4217        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4218        bindException = true;
4219      }
4220    } while (bindException);
4221    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4222    return kdc;
4223  }
4224
4225  public int getNumHFiles(final TableName tableName, final byte[] family) {
4226    int numHFiles = 0;
4227    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
4228      numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName,
4229                                    family);
4230    }
4231    return numHFiles;
4232  }
4233
4234  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
4235                               final byte[] family) {
4236    int numHFiles = 0;
4237    for (Region region : rs.getRegions(tableName)) {
4238      numHFiles += region.getStore(family).getStorefilesCount();
4239    }
4240    return numHFiles;
4241  }
4242
4243  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
4244    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
4245    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
4246    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
4247    assertEquals(ltdFamilies.size(), rtdFamilies.size());
4248    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), it2 =
4249         rtdFamilies.iterator(); it.hasNext();) {
4250      assertEquals(0,
4251          ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
4252    }
4253  }
4254}