001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.File;
022import java.io.IOException;
023import java.net.URL;
024import java.net.URLDecoder;
025import java.util.ArrayList;
026import java.util.Base64;
027import java.util.Collection;
028import java.util.Enumeration;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.zip.ZipEntry;
035import java.util.zip.ZipFile;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.MetaTableAccessor;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
054import org.apache.hadoop.hbase.security.User;
055import org.apache.hadoop.hbase.security.UserProvider;
056import org.apache.hadoop.hbase.security.token.TokenUtil;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.RegionSplitter;
059import org.apache.hadoop.hbase.zookeeper.ZKConfig;
060import org.apache.hadoop.io.Writable;
061import org.apache.hadoop.mapreduce.InputFormat;
062import org.apache.hadoop.mapreduce.Job;
063import org.apache.hadoop.util.StringUtils;
064
065import com.codahale.metrics.MetricRegistry;
066
067/**
068 * Utility for {@link TableMapper} and {@link TableReducer}
069 */
070@SuppressWarnings({ "rawtypes", "unchecked" })
071@InterfaceAudience.Public
072public class TableMapReduceUtil {
073  private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class);
074
075  /**
076   * Use this before submitting a TableMap job. It will appropriately set up
077   * the job.
078   *
079   * @param table  The table name to read from.
080   * @param scan  The scan instance with the columns, time range etc.
081   * @param mapper  The mapper class to use.
082   * @param outputKeyClass  The class of the output key.
083   * @param outputValueClass  The class of the output value.
084   * @param job  The current job to adjust.  Make sure the passed job is
085   * carrying all necessary HBase configuration.
086   * @throws IOException When setting up the details fails.
087   */
088  public static void initTableMapperJob(String table, Scan scan,
089      Class<? extends TableMapper> mapper,
090      Class<?> outputKeyClass,
091      Class<?> outputValueClass, Job job)
092  throws IOException {
093    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
094        job, true);
095  }
096
097
098  /**
099   * Use this before submitting a TableMap job. It will appropriately set up
100   * the job.
101   *
102   * @param table  The table name to read from.
103   * @param scan  The scan instance with the columns, time range etc.
104   * @param mapper  The mapper class to use.
105   * @param outputKeyClass  The class of the output key.
106   * @param outputValueClass  The class of the output value.
107   * @param job  The current job to adjust.  Make sure the passed job is
108   * carrying all necessary HBase configuration.
109   * @throws IOException When setting up the details fails.
110   */
111  public static void initTableMapperJob(TableName table,
112      Scan scan,
113      Class<? extends TableMapper> mapper,
114      Class<?> outputKeyClass,
115      Class<?> outputValueClass,
116      Job job) throws IOException {
117    initTableMapperJob(table.getNameAsString(),
118        scan,
119        mapper,
120        outputKeyClass,
121        outputValueClass,
122        job,
123        true);
124  }
125
126  /**
127   * Use this before submitting a TableMap job. It will appropriately set up
128   * the job.
129   *
130   * @param table Binary representation of the table name to read from.
131   * @param scan  The scan instance with the columns, time range etc.
132   * @param mapper  The mapper class to use.
133   * @param outputKeyClass  The class of the output key.
134   * @param outputValueClass  The class of the output value.
135   * @param job  The current job to adjust.  Make sure the passed job is
136   * carrying all necessary HBase configuration.
137   * @throws IOException When setting up the details fails.
138   */
139   public static void initTableMapperJob(byte[] table, Scan scan,
140      Class<? extends TableMapper> mapper,
141      Class<?> outputKeyClass,
142      Class<?> outputValueClass, Job job)
143  throws IOException {
144      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
145              job, true);
146  }
147
148   /**
149    * Use this before submitting a TableMap job. It will appropriately set up
150    * the job.
151    *
152    * @param table  The table name to read from.
153    * @param scan  The scan instance with the columns, time range etc.
154    * @param mapper  The mapper class to use.
155    * @param outputKeyClass  The class of the output key.
156    * @param outputValueClass  The class of the output value.
157    * @param job  The current job to adjust.  Make sure the passed job is
158    * carrying all necessary HBase configuration.
159    * @param addDependencyJars upload HBase jars and jars for any of the configured
160    *           job classes via the distributed cache (tmpjars).
161    * @throws IOException When setting up the details fails.
162    */
163   public static void initTableMapperJob(String table, Scan scan,
164       Class<? extends TableMapper> mapper,
165       Class<?> outputKeyClass,
166       Class<?> outputValueClass, Job job,
167       boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
168   throws IOException {
169     initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
170         addDependencyJars, true, inputFormatClass);
171   }
172
173
174  /**
175   * Use this before submitting a TableMap job. It will appropriately set up
176   * the job.
177   *
178   * @param table  The table name to read from.
179   * @param scan  The scan instance with the columns, time range etc.
180   * @param mapper  The mapper class to use.
181   * @param outputKeyClass  The class of the output key.
182   * @param outputValueClass  The class of the output value.
183   * @param job  The current job to adjust.  Make sure the passed job is
184   * carrying all necessary HBase configuration.
185   * @param addDependencyJars upload HBase jars and jars for any of the configured
186   *           job classes via the distributed cache (tmpjars).
187   * @param initCredentials whether to initialize hbase auth credentials for the job
188   * @param inputFormatClass the input format
189   * @throws IOException When setting up the details fails.
190   */
191  public static void initTableMapperJob(String table, Scan scan,
192      Class<? extends TableMapper> mapper,
193      Class<?> outputKeyClass,
194      Class<?> outputValueClass, Job job,
195      boolean addDependencyJars, boolean initCredentials,
196      Class<? extends InputFormat> inputFormatClass)
197  throws IOException {
198    job.setInputFormatClass(inputFormatClass);
199    if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
200    if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
201    job.setMapperClass(mapper);
202    if (Put.class.equals(outputValueClass)) {
203      job.setCombinerClass(PutCombiner.class);
204    }
205    Configuration conf = job.getConfiguration();
206    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
207    conf.set(TableInputFormat.INPUT_TABLE, table);
208    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
209    conf.setStrings("io.serializations", conf.get("io.serializations"),
210        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
211        CellSerialization.class.getName());
212    if (addDependencyJars) {
213      addDependencyJars(job);
214    }
215    if (initCredentials) {
216      initCredentials(job);
217    }
218  }
219
220  /**
221   * Use this before submitting a TableMap job. It will appropriately set up
222   * the job.
223   *
224   * @param table Binary representation of the table name to read from.
225   * @param scan  The scan instance with the columns, time range etc.
226   * @param mapper  The mapper class to use.
227   * @param outputKeyClass  The class of the output key.
228   * @param outputValueClass  The class of the output value.
229   * @param job  The current job to adjust.  Make sure the passed job is
230   * carrying all necessary HBase configuration.
231   * @param addDependencyJars upload HBase jars and jars for any of the configured
232   *           job classes via the distributed cache (tmpjars).
233   * @param inputFormatClass The class of the input format
234   * @throws IOException When setting up the details fails.
235   */
236  public static void initTableMapperJob(byte[] table, Scan scan,
237      Class<? extends TableMapper> mapper,
238      Class<?> outputKeyClass,
239      Class<?> outputValueClass, Job job,
240      boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
241  throws IOException {
242      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
243              outputValueClass, job, addDependencyJars, inputFormatClass);
244  }
245
246  /**
247   * Use this before submitting a TableMap job. It will appropriately set up
248   * the job.
249   *
250   * @param table Binary representation of the table name to read from.
251   * @param scan  The scan instance with the columns, time range etc.
252   * @param mapper  The mapper class to use.
253   * @param outputKeyClass  The class of the output key.
254   * @param outputValueClass  The class of the output value.
255   * @param job  The current job to adjust.  Make sure the passed job is
256   * carrying all necessary HBase configuration.
257   * @param addDependencyJars upload HBase jars and jars for any of the configured
258   *           job classes via the distributed cache (tmpjars).
259   * @throws IOException When setting up the details fails.
260   */
261  public static void initTableMapperJob(byte[] table, Scan scan,
262      Class<? extends TableMapper> mapper,
263      Class<?> outputKeyClass,
264      Class<?> outputValueClass, Job job,
265      boolean addDependencyJars)
266  throws IOException {
267      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
268              outputValueClass, job, addDependencyJars, TableInputFormat.class);
269  }
270
271  /**
272   * Use this before submitting a TableMap job. It will appropriately set up
273   * the job.
274   *
275   * @param table The table name to read from.
276   * @param scan  The scan instance with the columns, time range etc.
277   * @param mapper  The mapper class to use.
278   * @param outputKeyClass  The class of the output key.
279   * @param outputValueClass  The class of the output value.
280   * @param job  The current job to adjust.  Make sure the passed job is
281   * carrying all necessary HBase configuration.
282   * @param addDependencyJars upload HBase jars and jars for any of the configured
283   *           job classes via the distributed cache (tmpjars).
284   * @throws IOException When setting up the details fails.
285   */
286  public static void initTableMapperJob(String table, Scan scan,
287      Class<? extends TableMapper> mapper,
288      Class<?> outputKeyClass,
289      Class<?> outputValueClass, Job job,
290      boolean addDependencyJars)
291  throws IOException {
292      initTableMapperJob(table, scan, mapper, outputKeyClass,
293              outputValueClass, job, addDependencyJars, TableInputFormat.class);
294  }
295
296  /**
297   * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
298   * direct memory will likely cause the map tasks to OOM when opening the region. This
299   * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
300   * wants to override this behavior in their job.
301   */
302  public static void resetCacheConfig(Configuration conf) {
303    conf.setFloat(
304      HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
305    conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
306    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
307  }
308
309  /**
310   * Sets up the job for reading from one or more table snapshots, with one or more scans
311   * per snapshot.
312   * It bypasses hbase servers and read directly from snapshot files.
313   *
314   * @param snapshotScans     map of snapshot name to scans on that snapshot.
315   * @param mapper            The mapper class to use.
316   * @param outputKeyClass    The class of the output key.
317   * @param outputValueClass  The class of the output value.
318   * @param job               The current job to adjust.  Make sure the passed job is
319   *                          carrying all necessary HBase configuration.
320   * @param addDependencyJars upload HBase jars and jars for any of the configured
321   *                          job classes via the distributed cache (tmpjars).
322   */
323  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
324      Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
325      Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
326    MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
327
328    job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
329    if (outputValueClass != null) {
330      job.setMapOutputValueClass(outputValueClass);
331    }
332    if (outputKeyClass != null) {
333      job.setMapOutputKeyClass(outputKeyClass);
334    }
335    job.setMapperClass(mapper);
336    Configuration conf = job.getConfiguration();
337    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
338
339    if (addDependencyJars) {
340      addDependencyJars(job);
341      addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class);
342    }
343
344    resetCacheConfig(job.getConfiguration());
345  }
346
347  /**
348   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
349   * from snapshot files.
350   * @param snapshotName The name of the snapshot (of a table) to read from.
351   * @param scan The scan instance with the columns, time range etc.
352   * @param mapper The mapper class to use.
353   * @param outputKeyClass The class of the output key.
354   * @param outputValueClass The class of the output value.
355   * @param job The current job to adjust. Make sure the passed job is carrying all necessary HBase
356   *          configuration.
357   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
358   *          the distributed cache (tmpjars).
359   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
360   *          have write permissions to this directory, and this should not be a subdirectory of
361   *          rootdir. After the job is finished, restore directory can be deleted.
362   * @throws IOException When setting up the details fails.
363   * @see TableSnapshotInputFormat
364   */
365  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
366      Class<? extends TableMapper> mapper,
367      Class<?> outputKeyClass,
368      Class<?> outputValueClass, Job job,
369      boolean addDependencyJars, Path tmpRestoreDir)
370      throws IOException {
371    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
372    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
373      addDependencyJars, false, TableSnapshotInputFormat.class);
374    resetCacheConfig(job.getConfiguration());
375  }
376
377  /**
378   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
379   * and read directly from snapshot files.
380   *
381   * @param snapshotName The name of the snapshot (of a table) to read from.
382   * @param scan  The scan instance with the columns, time range etc.
383   * @param mapper  The mapper class to use.
384   * @param outputKeyClass  The class of the output key.
385   * @param outputValueClass  The class of the output value.
386   * @param job  The current job to adjust.  Make sure the passed job is
387   * carrying all necessary HBase configuration.
388   * @param addDependencyJars upload HBase jars and jars for any of the configured
389   *           job classes via the distributed cache (tmpjars).
390   *
391   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
392   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
393   * After the job is finished, restore directory can be deleted.
394   * @param splitAlgo algorithm to split
395   * @param numSplitsPerRegion how many input splits to generate per one region
396   * @throws IOException When setting up the details fails.
397   * @see TableSnapshotInputFormat
398   */
399  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
400                                                Class<? extends TableMapper> mapper,
401                                                Class<?> outputKeyClass,
402                                                Class<?> outputValueClass, Job job,
403                                                boolean addDependencyJars, Path tmpRestoreDir,
404                                                RegionSplitter.SplitAlgorithm splitAlgo,
405                                                int numSplitsPerRegion)
406          throws IOException {
407    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo,
408            numSplitsPerRegion);
409    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
410            outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
411    resetCacheConfig(job.getConfiguration());
412  }
413
414  /**
415   * Use this before submitting a Multi TableMap job. It will appropriately set
416   * up the job.
417   *
418   * @param scans The list of {@link Scan} objects to read from.
419   * @param mapper The mapper class to use.
420   * @param outputKeyClass The class of the output key.
421   * @param outputValueClass The class of the output value.
422   * @param job The current job to adjust. Make sure the passed job is carrying
423   *          all necessary HBase configuration.
424   * @throws IOException When setting up the details fails.
425   */
426  public static void initTableMapperJob(List<Scan> scans,
427      Class<? extends TableMapper> mapper,
428      Class<?> outputKeyClass,
429      Class<?> outputValueClass, Job job) throws IOException {
430    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
431        true);
432  }
433
434  /**
435   * Use this before submitting a Multi TableMap job. It will appropriately set
436   * up the job.
437   *
438   * @param scans The list of {@link Scan} objects to read from.
439   * @param mapper The mapper class to use.
440   * @param outputKeyClass The class of the output key.
441   * @param outputValueClass The class of the output value.
442   * @param job The current job to adjust. Make sure the passed job is carrying
443   *          all necessary HBase configuration.
444   * @param addDependencyJars upload HBase jars and jars for any of the
445   *          configured job classes via the distributed cache (tmpjars).
446   * @throws IOException When setting up the details fails.
447   */
448  public static void initTableMapperJob(List<Scan> scans,
449      Class<? extends TableMapper> mapper,
450      Class<?> outputKeyClass,
451      Class<?> outputValueClass, Job job,
452      boolean addDependencyJars) throws IOException {
453    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
454      addDependencyJars, true);
455  }
456
457  /**
458   * Use this before submitting a Multi TableMap job. It will appropriately set
459   * up the job.
460   *
461   * @param scans The list of {@link Scan} objects to read from.
462   * @param mapper The mapper class to use.
463   * @param outputKeyClass The class of the output key.
464   * @param outputValueClass The class of the output value.
465   * @param job The current job to adjust. Make sure the passed job is carrying
466   *          all necessary HBase configuration.
467   * @param addDependencyJars upload HBase jars and jars for any of the
468   *          configured job classes via the distributed cache (tmpjars).
469   * @param initCredentials whether to initialize hbase auth credentials for the job
470   * @throws IOException When setting up the details fails.
471   */
472  public static void initTableMapperJob(List<Scan> scans,
473      Class<? extends TableMapper> mapper,
474      Class<?> outputKeyClass,
475      Class<?> outputValueClass, Job job,
476      boolean addDependencyJars,
477      boolean initCredentials) throws IOException {
478    job.setInputFormatClass(MultiTableInputFormat.class);
479    if (outputValueClass != null) {
480      job.setMapOutputValueClass(outputValueClass);
481    }
482    if (outputKeyClass != null) {
483      job.setMapOutputKeyClass(outputKeyClass);
484    }
485    job.setMapperClass(mapper);
486    Configuration conf = job.getConfiguration();
487    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
488    List<String> scanStrings = new ArrayList<>();
489
490    for (Scan scan : scans) {
491      scanStrings.add(convertScanToString(scan));
492    }
493    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
494      scanStrings.toArray(new String[scanStrings.size()]));
495
496    if (addDependencyJars) {
497      addDependencyJars(job);
498    }
499
500    if (initCredentials) {
501      initCredentials(job);
502    }
503  }
504
505  public static void initCredentials(Job job) throws IOException {
506    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
507    if (userProvider.isHadoopSecurityEnabled()) {
508      // propagate delegation related props from launcher job to MR job
509      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
510        job.getConfiguration().set("mapreduce.job.credentials.binary",
511                                   System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
512      }
513    }
514
515    if (userProvider.isHBaseSecurityEnabled()) {
516      try {
517        // init credentials for remote cluster
518        String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
519        User user = userProvider.getCurrent();
520        if (quorumAddress != null) {
521          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
522              quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
523          Connection peerConn = ConnectionFactory.createConnection(peerConf);
524          try {
525            TokenUtil.addTokenForJob(peerConn, user, job);
526          } finally {
527            peerConn.close();
528          }
529        }
530
531        Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
532        try {
533          TokenUtil.addTokenForJob(conn, user, job);
534        } finally {
535          conn.close();
536        }
537      } catch (InterruptedException ie) {
538        LOG.info("Interrupted obtaining user authentication token");
539        Thread.currentThread().interrupt();
540      }
541    }
542  }
543
544  /**
545   * Obtain an authentication token, for the specified cluster, on behalf of the current user
546   * and add it to the credentials for the given map reduce job.
547   *
548   * The quorumAddress is the key to the ZK ensemble, which contains:
549   * hbase.zookeeper.quorum, hbase.zookeeper.client.port and
550   * zookeeper.znode.parent
551   *
552   * @param job The job that requires the permission.
553   * @param quorumAddress string that contains the 3 required configuratins
554   * @throws IOException When the authentication token cannot be obtained.
555   * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
556   */
557  @Deprecated
558  public static void initCredentialsForCluster(Job job, String quorumAddress)
559      throws IOException {
560    Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
561        quorumAddress);
562    initCredentialsForCluster(job, peerConf);
563  }
564
565  /**
566   * Obtain an authentication token, for the specified cluster, on behalf of the current user
567   * and add it to the credentials for the given map reduce job.
568   *
569   * @param job The job that requires the permission.
570   * @param conf The configuration to use in connecting to the peer cluster
571   * @throws IOException When the authentication token cannot be obtained.
572   */
573  public static void initCredentialsForCluster(Job job, Configuration conf)
574      throws IOException {
575    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
576    if (userProvider.isHBaseSecurityEnabled()) {
577      try {
578        Connection peerConn = ConnectionFactory.createConnection(conf);
579        try {
580          TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
581        } finally {
582          peerConn.close();
583        }
584      } catch (InterruptedException e) {
585        LOG.info("Interrupted obtaining user authentication token");
586        Thread.interrupted();
587      }
588    }
589  }
590
591  /**
592   * Writes the given scan into a Base64 encoded string.
593   *
594   * @param scan  The scan to write out.
595   * @return The scan saved in a Base64 encoded string.
596   * @throws IOException When writing the scan fails.
597   */
598  public static String convertScanToString(Scan scan) throws IOException {
599    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
600    return Bytes.toString(Base64.getEncoder().encode(proto.toByteArray()));
601  }
602
603  /**
604   * Converts the given Base64 string back into a Scan instance.
605   *
606   * @param base64  The scan details.
607   * @return The newly created Scan instance.
608   * @throws IOException When reading the scan instance fails.
609   */
610  public static Scan convertStringToScan(String base64) throws IOException {
611    byte [] decoded = Base64.getDecoder().decode(base64);
612    return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded));
613  }
614
615  /**
616   * Use this before submitting a TableReduce job. It will
617   * appropriately set up the JobConf.
618   *
619   * @param table  The output table.
620   * @param reducer  The reducer class to use.
621   * @param job  The current job to adjust.
622   * @throws IOException When determining the region count fails.
623   */
624  public static void initTableReducerJob(String table,
625    Class<? extends TableReducer> reducer, Job job)
626  throws IOException {
627    initTableReducerJob(table, reducer, job, null);
628  }
629
630  /**
631   * Use this before submitting a TableReduce job. It will
632   * appropriately set up the JobConf.
633   *
634   * @param table  The output table.
635   * @param reducer  The reducer class to use.
636   * @param job  The current job to adjust.
637   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
638   * default partitioner.
639   * @throws IOException When determining the region count fails.
640   */
641  public static void initTableReducerJob(String table,
642    Class<? extends TableReducer> reducer, Job job,
643    Class partitioner) throws IOException {
644    initTableReducerJob(table, reducer, job, partitioner, null, null, null);
645  }
646
647  /**
648   * Use this before submitting a TableReduce job. It will
649   * appropriately set up the JobConf.
650   *
651   * @param table  The output table.
652   * @param reducer  The reducer class to use.
653   * @param job  The current job to adjust.  Make sure the passed job is
654   * carrying all necessary HBase configuration.
655   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
656   * default partitioner.
657   * @param quorumAddress Distant cluster to write to; default is null for
658   * output to the cluster that is designated in <code>hbase-site.xml</code>.
659   * Set this String to the zookeeper ensemble of an alternate remote cluster
660   * when you would have the reduce write a cluster that is other than the
661   * default; e.g. copying tables between clusters, the source would be
662   * designated by <code>hbase-site.xml</code> and this param would have the
663   * ensemble address of the remote cluster.  The format to pass is particular.
664   * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
665   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
666   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
667   * @param serverClass redefined hbase.regionserver.class
668   * @param serverImpl redefined hbase.regionserver.impl
669   * @throws IOException When determining the region count fails.
670   */
671  public static void initTableReducerJob(String table,
672    Class<? extends TableReducer> reducer, Job job,
673    Class partitioner, String quorumAddress, String serverClass,
674    String serverImpl) throws IOException {
675    initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
676        serverClass, serverImpl, true);
677  }
678
679  /**
680   * Use this before submitting a TableReduce job. It will
681   * appropriately set up the JobConf.
682   *
683   * @param table  The output table.
684   * @param reducer  The reducer class to use.
685   * @param job  The current job to adjust.  Make sure the passed job is
686   * carrying all necessary HBase configuration.
687   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
688   * default partitioner.
689   * @param quorumAddress Distant cluster to write to; default is null for
690   * output to the cluster that is designated in <code>hbase-site.xml</code>.
691   * Set this String to the zookeeper ensemble of an alternate remote cluster
692   * when you would have the reduce write a cluster that is other than the
693   * default; e.g. copying tables between clusters, the source would be
694   * designated by <code>hbase-site.xml</code> and this param would have the
695   * ensemble address of the remote cluster.  The format to pass is particular.
696   * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
697   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
698   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
699   * @param serverClass redefined hbase.regionserver.class
700   * @param serverImpl redefined hbase.regionserver.impl
701   * @param addDependencyJars upload HBase jars and jars for any of the configured
702   *           job classes via the distributed cache (tmpjars).
703   * @throws IOException When determining the region count fails.
704   */
705  public static void initTableReducerJob(String table,
706    Class<? extends TableReducer> reducer, Job job,
707    Class partitioner, String quorumAddress, String serverClass,
708    String serverImpl, boolean addDependencyJars) throws IOException {
709
710    Configuration conf = job.getConfiguration();
711    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
712    job.setOutputFormatClass(TableOutputFormat.class);
713    if (reducer != null) job.setReducerClass(reducer);
714    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
715    conf.setStrings("io.serializations", conf.get("io.serializations"),
716        MutationSerialization.class.getName(), ResultSerialization.class.getName());
717    // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
718    if (quorumAddress != null) {
719      // Calling this will validate the format
720      ZKConfig.validateClusterKey(quorumAddress);
721      conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
722    }
723    if (serverClass != null && serverImpl != null) {
724      conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
725      conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
726    }
727    job.setOutputKeyClass(ImmutableBytesWritable.class);
728    job.setOutputValueClass(Writable.class);
729    if (partitioner == HRegionPartitioner.class) {
730      job.setPartitionerClass(HRegionPartitioner.class);
731      int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
732      if (job.getNumReduceTasks() > regions) {
733        job.setNumReduceTasks(regions);
734      }
735    } else if (partitioner != null) {
736      job.setPartitionerClass(partitioner);
737    }
738
739    if (addDependencyJars) {
740      addDependencyJars(job);
741    }
742
743    initCredentials(job);
744  }
745
746  /**
747   * Ensures that the given number of reduce tasks for the given job
748   * configuration does not exceed the number of regions for the given table.
749   *
750   * @param table  The table to get the region count for.
751   * @param job  The current job to adjust.
752   * @throws IOException When retrieving the table details fails.
753   */
754  public static void limitNumReduceTasks(String table, Job job)
755  throws IOException {
756    int regions =
757      MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table));
758    if (job.getNumReduceTasks() > regions)
759      job.setNumReduceTasks(regions);
760  }
761
762  /**
763   * Sets the number of reduce tasks for the given job configuration to the
764   * number of regions the given table has.
765   *
766   * @param table  The table to get the region count for.
767   * @param job  The current job to adjust.
768   * @throws IOException When retrieving the table details fails.
769   */
770  public static void setNumReduceTasks(String table, Job job)
771  throws IOException {
772    job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(),
773       TableName.valueOf(table)));
774  }
775
776  /**
777   * Sets the number of rows to return and cache with each scanner iteration.
778   * Higher caching values will enable faster mapreduce jobs at the expense of
779   * requiring more heap to contain the cached rows.
780   *
781   * @param job The current job to adjust.
782   * @param batchSize The number of rows to return in batch with each scanner
783   * iteration.
784   */
785  public static void setScannerCaching(Job job, int batchSize) {
786    job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
787  }
788
789  /**
790   * Add HBase and its dependencies (only) to the job configuration.
791   * <p>
792   * This is intended as a low-level API, facilitating code reuse between this
793   * class and its mapred counterpart. It also of use to external tools that
794   * need to build a MapReduce job that interacts with HBase but want
795   * fine-grained control over the jars shipped to the cluster.
796   * </p>
797   * @param conf The Configuration object to extend with dependencies.
798   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
799   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
800   */
801  public static void addHBaseDependencyJars(Configuration conf) throws IOException {
802    addDependencyJarsForClasses(conf,
803      // explicitly pull a class from each module
804      org.apache.hadoop.hbase.HConstants.class,                      // hbase-common
805      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
806      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded
807      org.apache.hadoop.hbase.client.Put.class,                      // hbase-client
808      org.apache.hadoop.hbase.ipc.RpcServer.class,                   // hbase-server
809      org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
810      org.apache.hadoop.hbase.mapreduce.JobUtil.class,               // hbase-hadoop2-compat
811      org.apache.hadoop.hbase.mapreduce.TableMapper.class,           // hbase-server
812      org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class,  // hbase-metrics
813      org.apache.hadoop.hbase.metrics.Snapshot.class,                // hbase-metrics-api
814      org.apache.zookeeper.ZooKeeper.class,
815      org.apache.hbase.thirdparty.io.netty.channel.Channel.class,
816      com.google.protobuf.Message.class,
817      org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.class,
818      org.apache.hbase.thirdparty.com.google.common.collect.Lists.class,
819      org.apache.htrace.core.Tracer.class,
820      com.codahale.metrics.MetricRegistry.class,
821      org.apache.commons.lang3.ArrayUtils.class,
822      com.fasterxml.jackson.databind.ObjectMapper.class,
823      com.fasterxml.jackson.core.Versioned.class,
824      com.fasterxml.jackson.annotation.JsonView.class,
825      org.apache.hadoop.hbase.zookeeper.ZKWatcher.class);
826  }
827
828  /**
829   * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
830   * Also exposed to shell scripts via `bin/hbase mapredcp`.
831   */
832  public static String buildDependencyClasspath(Configuration conf) {
833    if (conf == null) {
834      throw new IllegalArgumentException("Must provide a configuration object.");
835    }
836    Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars"));
837    if (paths.isEmpty()) {
838      throw new IllegalArgumentException("Configuration contains no tmpjars.");
839    }
840    StringBuilder sb = new StringBuilder();
841    for (String s : paths) {
842      // entries can take the form 'file:/path/to/file.jar'.
843      int idx = s.indexOf(":");
844      if (idx != -1) s = s.substring(idx + 1);
845      if (sb.length() > 0) sb.append(File.pathSeparator);
846      sb.append(s);
847    }
848    return sb.toString();
849  }
850
851  /**
852   * Add the HBase dependency jars as well as jars for any of the configured
853   * job classes to the job configuration, so that JobClient will ship them
854   * to the cluster and add them to the DistributedCache.
855   */
856  public static void addDependencyJars(Job job) throws IOException {
857    addHBaseDependencyJars(job.getConfiguration());
858    try {
859      addDependencyJarsForClasses(job.getConfiguration(),
860          // when making changes here, consider also mapred.TableMapReduceUtil
861          // pull job classes
862          job.getMapOutputKeyClass(),
863          job.getMapOutputValueClass(),
864          job.getInputFormatClass(),
865          job.getOutputKeyClass(),
866          job.getOutputValueClass(),
867          job.getOutputFormatClass(),
868          job.getPartitionerClass(),
869          job.getCombinerClass());
870    } catch (ClassNotFoundException e) {
871      throw new IOException(e);
872    }
873  }
874
875  /**
876   * Add the jars containing the given classes to the job's configuration
877   * such that JobClient will ship them to the cluster and add them to
878   * the DistributedCache.
879   * @deprecated rely on {@link #addDependencyJars(Job)} instead.
880   */
881  @Deprecated
882  public static void addDependencyJars(Configuration conf,
883      Class<?>... classes) throws IOException {
884    LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it"
885             + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " +
886             "instead. See HBASE-8386 for more details.");
887    addDependencyJarsForClasses(conf, classes);
888  }
889
890  /**
891   * Add the jars containing the given classes to the job's configuration
892   * such that JobClient will ship them to the cluster and add them to
893   * the DistributedCache.
894   *
895   * N.B. that this method at most adds one jar per class given. If there is more than one
896   * jar available containing a class with the same name as a given class, we don't define
897   * which of those jars might be chosen.
898   *
899   * @param conf The Hadoop Configuration to modify
900   * @param classes will add just those dependencies needed to find the given classes
901   * @throws IOException if an underlying library call fails.
902   */
903  @InterfaceAudience.Private
904  public static void addDependencyJarsForClasses(Configuration conf,
905      Class<?>... classes) throws IOException {
906
907    FileSystem localFs = FileSystem.getLocal(conf);
908    Set<String> jars = new HashSet<>();
909    // Add jars that are already in the tmpjars variable
910    jars.addAll(conf.getStringCollection("tmpjars"));
911
912    // add jars as we find them to a map of contents jar name so that we can avoid
913    // creating new jars for classes that have already been packaged.
914    Map<String, String> packagedClasses = new HashMap<>();
915
916    // Add jars containing the specified classes
917    for (Class<?> clazz : classes) {
918      if (clazz == null) continue;
919
920      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
921      if (path == null) {
922        LOG.warn("Could not find jar for class " + clazz +
923                 " in order to ship it to the cluster.");
924        continue;
925      }
926      if (!localFs.exists(path)) {
927        LOG.warn("Could not validate jar file " + path + " for class "
928                 + clazz);
929        continue;
930      }
931      jars.add(path.toString());
932    }
933    if (jars.isEmpty()) return;
934
935    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
936  }
937
938  /**
939   * Finds the Jar for a class or creates it if it doesn't exist. If the class is in
940   * a directory in the classpath, it creates a Jar on the fly with the
941   * contents of the directory and returns the path to that Jar. If a Jar is
942   * created, it is created in the system temporary directory. Otherwise,
943   * returns an existing jar that contains a class of the same name. Maintains
944   * a mapping from jar contents to the tmp jar created.
945   * @param my_class the class to find.
946   * @param fs the FileSystem with which to qualify the returned path.
947   * @param packagedClasses a map of class name to path.
948   * @return a jar file that contains the class.
949   * @throws IOException
950   */
951  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
952      Map<String, String> packagedClasses)
953  throws IOException {
954    // attempt to locate an existing jar for the class.
955    String jar = findContainingJar(my_class, packagedClasses);
956    if (null == jar || jar.isEmpty()) {
957      jar = getJar(my_class);
958      updateMap(jar, packagedClasses);
959    }
960
961    if (null == jar || jar.isEmpty()) {
962      return null;
963    }
964
965    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
966    return new Path(jar).makeQualified(fs.getUri(), fs.getWorkingDirectory());
967  }
968
969  /**
970   * Add entries to <code>packagedClasses</code> corresponding to class files
971   * contained in <code>jar</code>.
972   * @param jar The jar who's content to list.
973   * @param packagedClasses map[class -> jar]
974   */
975  private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
976    if (null == jar || jar.isEmpty()) {
977      return;
978    }
979    ZipFile zip = null;
980    try {
981      zip = new ZipFile(jar);
982      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
983        ZipEntry entry = iter.nextElement();
984        if (entry.getName().endsWith("class")) {
985          packagedClasses.put(entry.getName(), jar);
986        }
987      }
988    } finally {
989      if (null != zip) zip.close();
990    }
991  }
992
993  /**
994   * Find a jar that contains a class of the same name, if any. It will return
995   * a jar file, even if that is not the first thing on the class path that
996   * has a class with the same name. Looks first on the classpath and then in
997   * the <code>packagedClasses</code> map.
998   * @param my_class the class to find.
999   * @return a jar file that contains the class, or null.
1000   * @throws IOException
1001   */
1002  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
1003      throws IOException {
1004    ClassLoader loader = my_class.getClassLoader();
1005
1006    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
1007
1008    if (loader != null) {
1009      // first search the classpath
1010      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
1011        URL url = itr.nextElement();
1012        if ("jar".equals(url.getProtocol())) {
1013          String toReturn = url.getPath();
1014          if (toReturn.startsWith("file:")) {
1015            toReturn = toReturn.substring("file:".length());
1016          }
1017          // URLDecoder is a misnamed class, since it actually decodes
1018          // x-www-form-urlencoded MIME type rather than actual
1019          // URL encoding (which the file path has). Therefore it would
1020          // decode +s to ' 's which is incorrect (spaces are actually
1021          // either unencoded or encoded as "%20"). Replace +s first, so
1022          // that they are kept sacred during the decoding process.
1023          toReturn = toReturn.replaceAll("\\+", "%2B");
1024          toReturn = URLDecoder.decode(toReturn, "UTF-8");
1025          return toReturn.replaceAll("!.*$", "");
1026        }
1027      }
1028    }
1029
1030    // now look in any jars we've packaged using JarFinder. Returns null when
1031    // no jar is found.
1032    return packagedClasses.get(class_file);
1033  }
1034
1035  /**
1036   * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job
1037   * configuration contexts (HBASE-8140) and also for testing on MRv2.
1038   * check if we have HADOOP-9426.
1039   * @param my_class the class to find.
1040   * @return a jar file that contains the class, or null.
1041   */
1042  private static String getJar(Class<?> my_class) {
1043    String ret = null;
1044    try {
1045      ret = JarFinder.getJar(my_class);
1046    } catch (Exception e) {
1047      // toss all other exceptions, related to reflection failure
1048      throw new RuntimeException("getJar invocation failed.", e);
1049    }
1050
1051    return ret;
1052  }
1053}