001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.File; 022import java.io.IOException; 023import java.net.URL; 024import java.net.URLDecoder; 025import java.util.ArrayList; 026import java.util.Base64; 027import java.util.Collection; 028import java.util.Enumeration; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.zip.ZipEntry; 035import java.util.zip.ZipFile; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.MetaTableAccessor; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047import org.apache.hadoop.hbase.client.Connection; 048import org.apache.hadoop.hbase.client.ConnectionFactory; 049import org.apache.hadoop.hbase.client.Put; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 054import org.apache.hadoop.hbase.security.User; 055import org.apache.hadoop.hbase.security.UserProvider; 056import org.apache.hadoop.hbase.security.token.TokenUtil; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.RegionSplitter; 059import org.apache.hadoop.hbase.zookeeper.ZKConfig; 060import org.apache.hadoop.io.Writable; 061import org.apache.hadoop.mapreduce.InputFormat; 062import org.apache.hadoop.mapreduce.Job; 063import org.apache.hadoop.util.StringUtils; 064 065import com.codahale.metrics.MetricRegistry; 066 067/** 068 * Utility for {@link TableMapper} and {@link TableReducer} 069 */ 070@SuppressWarnings({ "rawtypes", "unchecked" }) 071@InterfaceAudience.Public 072public class TableMapReduceUtil { 073 private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class); 074 075 /** 076 * Use this before submitting a TableMap job. It will appropriately set up 077 * the job. 078 * 079 * @param table The table name to read from. 080 * @param scan The scan instance with the columns, time range etc. 081 * @param mapper The mapper class to use. 082 * @param outputKeyClass The class of the output key. 083 * @param outputValueClass The class of the output value. 084 * @param job The current job to adjust. Make sure the passed job is 085 * carrying all necessary HBase configuration. 086 * @throws IOException When setting up the details fails. 087 */ 088 public static void initTableMapperJob(String table, Scan scan, 089 Class<? extends TableMapper> mapper, 090 Class<?> outputKeyClass, 091 Class<?> outputValueClass, Job job) 092 throws IOException { 093 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, 094 job, true); 095 } 096 097 098 /** 099 * Use this before submitting a TableMap job. It will appropriately set up 100 * the job. 101 * 102 * @param table The table name to read from. 103 * @param scan The scan instance with the columns, time range etc. 104 * @param mapper The mapper class to use. 105 * @param outputKeyClass The class of the output key. 106 * @param outputValueClass The class of the output value. 107 * @param job The current job to adjust. Make sure the passed job is 108 * carrying all necessary HBase configuration. 109 * @throws IOException When setting up the details fails. 110 */ 111 public static void initTableMapperJob(TableName table, 112 Scan scan, 113 Class<? extends TableMapper> mapper, 114 Class<?> outputKeyClass, 115 Class<?> outputValueClass, 116 Job job) throws IOException { 117 initTableMapperJob(table.getNameAsString(), 118 scan, 119 mapper, 120 outputKeyClass, 121 outputValueClass, 122 job, 123 true); 124 } 125 126 /** 127 * Use this before submitting a TableMap job. It will appropriately set up 128 * the job. 129 * 130 * @param table Binary representation of the table name to read from. 131 * @param scan The scan instance with the columns, time range etc. 132 * @param mapper The mapper class to use. 133 * @param outputKeyClass The class of the output key. 134 * @param outputValueClass The class of the output value. 135 * @param job The current job to adjust. Make sure the passed job is 136 * carrying all necessary HBase configuration. 137 * @throws IOException When setting up the details fails. 138 */ 139 public static void initTableMapperJob(byte[] table, Scan scan, 140 Class<? extends TableMapper> mapper, 141 Class<?> outputKeyClass, 142 Class<?> outputValueClass, Job job) 143 throws IOException { 144 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, 145 job, true); 146 } 147 148 /** 149 * Use this before submitting a TableMap job. It will appropriately set up 150 * the job. 151 * 152 * @param table The table name to read from. 153 * @param scan The scan instance with the columns, time range etc. 154 * @param mapper The mapper class to use. 155 * @param outputKeyClass The class of the output key. 156 * @param outputValueClass The class of the output value. 157 * @param job The current job to adjust. Make sure the passed job is 158 * carrying all necessary HBase configuration. 159 * @param addDependencyJars upload HBase jars and jars for any of the configured 160 * job classes via the distributed cache (tmpjars). 161 * @throws IOException When setting up the details fails. 162 */ 163 public static void initTableMapperJob(String table, Scan scan, 164 Class<? extends TableMapper> mapper, 165 Class<?> outputKeyClass, 166 Class<?> outputValueClass, Job job, 167 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) 168 throws IOException { 169 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, 170 addDependencyJars, true, inputFormatClass); 171 } 172 173 174 /** 175 * Use this before submitting a TableMap job. It will appropriately set up 176 * the job. 177 * 178 * @param table The table name to read from. 179 * @param scan The scan instance with the columns, time range etc. 180 * @param mapper The mapper class to use. 181 * @param outputKeyClass The class of the output key. 182 * @param outputValueClass The class of the output value. 183 * @param job The current job to adjust. Make sure the passed job is 184 * carrying all necessary HBase configuration. 185 * @param addDependencyJars upload HBase jars and jars for any of the configured 186 * job classes via the distributed cache (tmpjars). 187 * @param initCredentials whether to initialize hbase auth credentials for the job 188 * @param inputFormatClass the input format 189 * @throws IOException When setting up the details fails. 190 */ 191 public static void initTableMapperJob(String table, Scan scan, 192 Class<? extends TableMapper> mapper, 193 Class<?> outputKeyClass, 194 Class<?> outputValueClass, Job job, 195 boolean addDependencyJars, boolean initCredentials, 196 Class<? extends InputFormat> inputFormatClass) 197 throws IOException { 198 job.setInputFormatClass(inputFormatClass); 199 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); 200 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); 201 job.setMapperClass(mapper); 202 if (Put.class.equals(outputValueClass)) { 203 job.setCombinerClass(PutCombiner.class); 204 } 205 Configuration conf = job.getConfiguration(); 206 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 207 conf.set(TableInputFormat.INPUT_TABLE, table); 208 conf.set(TableInputFormat.SCAN, convertScanToString(scan)); 209 conf.setStrings("io.serializations", conf.get("io.serializations"), 210 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 211 CellSerialization.class.getName()); 212 if (addDependencyJars) { 213 addDependencyJars(job); 214 } 215 if (initCredentials) { 216 initCredentials(job); 217 } 218 } 219 220 /** 221 * Use this before submitting a TableMap job. It will appropriately set up 222 * the job. 223 * 224 * @param table Binary representation of the table name to read from. 225 * @param scan The scan instance with the columns, time range etc. 226 * @param mapper The mapper class to use. 227 * @param outputKeyClass The class of the output key. 228 * @param outputValueClass The class of the output value. 229 * @param job The current job to adjust. Make sure the passed job is 230 * carrying all necessary HBase configuration. 231 * @param addDependencyJars upload HBase jars and jars for any of the configured 232 * job classes via the distributed cache (tmpjars). 233 * @param inputFormatClass The class of the input format 234 * @throws IOException When setting up the details fails. 235 */ 236 public static void initTableMapperJob(byte[] table, Scan scan, 237 Class<? extends TableMapper> mapper, 238 Class<?> outputKeyClass, 239 Class<?> outputValueClass, Job job, 240 boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) 241 throws IOException { 242 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, 243 outputValueClass, job, addDependencyJars, inputFormatClass); 244 } 245 246 /** 247 * Use this before submitting a TableMap job. It will appropriately set up 248 * the job. 249 * 250 * @param table Binary representation of the table name to read from. 251 * @param scan The scan instance with the columns, time range etc. 252 * @param mapper The mapper class to use. 253 * @param outputKeyClass The class of the output key. 254 * @param outputValueClass The class of the output value. 255 * @param job The current job to adjust. Make sure the passed job is 256 * carrying all necessary HBase configuration. 257 * @param addDependencyJars upload HBase jars and jars for any of the configured 258 * job classes via the distributed cache (tmpjars). 259 * @throws IOException When setting up the details fails. 260 */ 261 public static void initTableMapperJob(byte[] table, Scan scan, 262 Class<? extends TableMapper> mapper, 263 Class<?> outputKeyClass, 264 Class<?> outputValueClass, Job job, 265 boolean addDependencyJars) 266 throws IOException { 267 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, 268 outputValueClass, job, addDependencyJars, TableInputFormat.class); 269 } 270 271 /** 272 * Use this before submitting a TableMap job. It will appropriately set up 273 * the job. 274 * 275 * @param table The table name to read from. 276 * @param scan The scan instance with the columns, time range etc. 277 * @param mapper The mapper class to use. 278 * @param outputKeyClass The class of the output key. 279 * @param outputValueClass The class of the output value. 280 * @param job The current job to adjust. Make sure the passed job is 281 * carrying all necessary HBase configuration. 282 * @param addDependencyJars upload HBase jars and jars for any of the configured 283 * job classes via the distributed cache (tmpjars). 284 * @throws IOException When setting up the details fails. 285 */ 286 public static void initTableMapperJob(String table, Scan scan, 287 Class<? extends TableMapper> mapper, 288 Class<?> outputKeyClass, 289 Class<?> outputValueClass, Job job, 290 boolean addDependencyJars) 291 throws IOException { 292 initTableMapperJob(table, scan, mapper, outputKeyClass, 293 outputValueClass, job, addDependencyJars, TableInputFormat.class); 294 } 295 296 /** 297 * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on 298 * direct memory will likely cause the map tasks to OOM when opening the region. This 299 * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user 300 * wants to override this behavior in their job. 301 */ 302 public static void resetCacheConfig(Configuration conf) { 303 conf.setFloat( 304 HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); 305 conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f); 306 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); 307 } 308 309 /** 310 * Sets up the job for reading from one or more table snapshots, with one or more scans 311 * per snapshot. 312 * It bypasses hbase servers and read directly from snapshot files. 313 * 314 * @param snapshotScans map of snapshot name to scans on that snapshot. 315 * @param mapper The mapper class to use. 316 * @param outputKeyClass The class of the output key. 317 * @param outputValueClass The class of the output value. 318 * @param job The current job to adjust. Make sure the passed job is 319 * carrying all necessary HBase configuration. 320 * @param addDependencyJars upload HBase jars and jars for any of the configured 321 * job classes via the distributed cache (tmpjars). 322 */ 323 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, 324 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 325 Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 326 MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); 327 328 job.setInputFormatClass(MultiTableSnapshotInputFormat.class); 329 if (outputValueClass != null) { 330 job.setMapOutputValueClass(outputValueClass); 331 } 332 if (outputKeyClass != null) { 333 job.setMapOutputKeyClass(outputKeyClass); 334 } 335 job.setMapperClass(mapper); 336 Configuration conf = job.getConfiguration(); 337 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 338 339 if (addDependencyJars) { 340 addDependencyJars(job); 341 addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class); 342 } 343 344 resetCacheConfig(job.getConfiguration()); 345 } 346 347 /** 348 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 349 * from snapshot files. 350 * @param snapshotName The name of the snapshot (of a table) to read from. 351 * @param scan The scan instance with the columns, time range etc. 352 * @param mapper The mapper class to use. 353 * @param outputKeyClass The class of the output key. 354 * @param outputValueClass The class of the output value. 355 * @param job The current job to adjust. Make sure the passed job is carrying all necessary HBase 356 * configuration. 357 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 358 * the distributed cache (tmpjars). 359 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should 360 * have write permissions to this directory, and this should not be a subdirectory of 361 * rootdir. After the job is finished, restore directory can be deleted. 362 * @throws IOException When setting up the details fails. 363 * @see TableSnapshotInputFormat 364 */ 365 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, 366 Class<? extends TableMapper> mapper, 367 Class<?> outputKeyClass, 368 Class<?> outputValueClass, Job job, 369 boolean addDependencyJars, Path tmpRestoreDir) 370 throws IOException { 371 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); 372 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job, 373 addDependencyJars, false, TableSnapshotInputFormat.class); 374 resetCacheConfig(job.getConfiguration()); 375 } 376 377 /** 378 * Sets up the job for reading from a table snapshot. It bypasses hbase servers 379 * and read directly from snapshot files. 380 * 381 * @param snapshotName The name of the snapshot (of a table) to read from. 382 * @param scan The scan instance with the columns, time range etc. 383 * @param mapper The mapper class to use. 384 * @param outputKeyClass The class of the output key. 385 * @param outputValueClass The class of the output value. 386 * @param job The current job to adjust. Make sure the passed job is 387 * carrying all necessary HBase configuration. 388 * @param addDependencyJars upload HBase jars and jars for any of the configured 389 * job classes via the distributed cache (tmpjars). 390 * 391 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should 392 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 393 * After the job is finished, restore directory can be deleted. 394 * @param splitAlgo algorithm to split 395 * @param numSplitsPerRegion how many input splits to generate per one region 396 * @throws IOException When setting up the details fails. 397 * @see TableSnapshotInputFormat 398 */ 399 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, 400 Class<? extends TableMapper> mapper, 401 Class<?> outputKeyClass, 402 Class<?> outputValueClass, Job job, 403 boolean addDependencyJars, Path tmpRestoreDir, 404 RegionSplitter.SplitAlgorithm splitAlgo, 405 int numSplitsPerRegion) 406 throws IOException { 407 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo, 408 numSplitsPerRegion); 409 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, 410 outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); 411 resetCacheConfig(job.getConfiguration()); 412 } 413 414 /** 415 * Use this before submitting a Multi TableMap job. It will appropriately set 416 * up the job. 417 * 418 * @param scans The list of {@link Scan} objects to read from. 419 * @param mapper The mapper class to use. 420 * @param outputKeyClass The class of the output key. 421 * @param outputValueClass The class of the output value. 422 * @param job The current job to adjust. Make sure the passed job is carrying 423 * all necessary HBase configuration. 424 * @throws IOException When setting up the details fails. 425 */ 426 public static void initTableMapperJob(List<Scan> scans, 427 Class<? extends TableMapper> mapper, 428 Class<?> outputKeyClass, 429 Class<?> outputValueClass, Job job) throws IOException { 430 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, 431 true); 432 } 433 434 /** 435 * Use this before submitting a Multi TableMap job. It will appropriately set 436 * up the job. 437 * 438 * @param scans The list of {@link Scan} objects to read from. 439 * @param mapper The mapper class to use. 440 * @param outputKeyClass The class of the output key. 441 * @param outputValueClass The class of the output value. 442 * @param job The current job to adjust. Make sure the passed job is carrying 443 * all necessary HBase configuration. 444 * @param addDependencyJars upload HBase jars and jars for any of the 445 * configured job classes via the distributed cache (tmpjars). 446 * @throws IOException When setting up the details fails. 447 */ 448 public static void initTableMapperJob(List<Scan> scans, 449 Class<? extends TableMapper> mapper, 450 Class<?> outputKeyClass, 451 Class<?> outputValueClass, Job job, 452 boolean addDependencyJars) throws IOException { 453 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, 454 addDependencyJars, true); 455 } 456 457 /** 458 * Use this before submitting a Multi TableMap job. It will appropriately set 459 * up the job. 460 * 461 * @param scans The list of {@link Scan} objects to read from. 462 * @param mapper The mapper class to use. 463 * @param outputKeyClass The class of the output key. 464 * @param outputValueClass The class of the output value. 465 * @param job The current job to adjust. Make sure the passed job is carrying 466 * all necessary HBase configuration. 467 * @param addDependencyJars upload HBase jars and jars for any of the 468 * configured job classes via the distributed cache (tmpjars). 469 * @param initCredentials whether to initialize hbase auth credentials for the job 470 * @throws IOException When setting up the details fails. 471 */ 472 public static void initTableMapperJob(List<Scan> scans, 473 Class<? extends TableMapper> mapper, 474 Class<?> outputKeyClass, 475 Class<?> outputValueClass, Job job, 476 boolean addDependencyJars, 477 boolean initCredentials) throws IOException { 478 job.setInputFormatClass(MultiTableInputFormat.class); 479 if (outputValueClass != null) { 480 job.setMapOutputValueClass(outputValueClass); 481 } 482 if (outputKeyClass != null) { 483 job.setMapOutputKeyClass(outputKeyClass); 484 } 485 job.setMapperClass(mapper); 486 Configuration conf = job.getConfiguration(); 487 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 488 List<String> scanStrings = new ArrayList<>(); 489 490 for (Scan scan : scans) { 491 scanStrings.add(convertScanToString(scan)); 492 } 493 job.getConfiguration().setStrings(MultiTableInputFormat.SCANS, 494 scanStrings.toArray(new String[scanStrings.size()])); 495 496 if (addDependencyJars) { 497 addDependencyJars(job); 498 } 499 500 if (initCredentials) { 501 initCredentials(job); 502 } 503 } 504 505 public static void initCredentials(Job job) throws IOException { 506 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); 507 if (userProvider.isHadoopSecurityEnabled()) { 508 // propagate delegation related props from launcher job to MR job 509 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 510 job.getConfiguration().set("mapreduce.job.credentials.binary", 511 System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 512 } 513 } 514 515 if (userProvider.isHBaseSecurityEnabled()) { 516 try { 517 // init credentials for remote cluster 518 String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); 519 User user = userProvider.getCurrent(); 520 if (quorumAddress != null) { 521 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), 522 quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); 523 Connection peerConn = ConnectionFactory.createConnection(peerConf); 524 try { 525 TokenUtil.addTokenForJob(peerConn, user, job); 526 } finally { 527 peerConn.close(); 528 } 529 } 530 531 Connection conn = ConnectionFactory.createConnection(job.getConfiguration()); 532 try { 533 TokenUtil.addTokenForJob(conn, user, job); 534 } finally { 535 conn.close(); 536 } 537 } catch (InterruptedException ie) { 538 LOG.info("Interrupted obtaining user authentication token"); 539 Thread.currentThread().interrupt(); 540 } 541 } 542 } 543 544 /** 545 * Obtain an authentication token, for the specified cluster, on behalf of the current user 546 * and add it to the credentials for the given map reduce job. 547 * 548 * The quorumAddress is the key to the ZK ensemble, which contains: 549 * hbase.zookeeper.quorum, hbase.zookeeper.client.port and 550 * zookeeper.znode.parent 551 * 552 * @param job The job that requires the permission. 553 * @param quorumAddress string that contains the 3 required configuratins 554 * @throws IOException When the authentication token cannot be obtained. 555 * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead. 556 */ 557 @Deprecated 558 public static void initCredentialsForCluster(Job job, String quorumAddress) 559 throws IOException { 560 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), 561 quorumAddress); 562 initCredentialsForCluster(job, peerConf); 563 } 564 565 /** 566 * Obtain an authentication token, for the specified cluster, on behalf of the current user 567 * and add it to the credentials for the given map reduce job. 568 * 569 * @param job The job that requires the permission. 570 * @param conf The configuration to use in connecting to the peer cluster 571 * @throws IOException When the authentication token cannot be obtained. 572 */ 573 public static void initCredentialsForCluster(Job job, Configuration conf) 574 throws IOException { 575 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); 576 if (userProvider.isHBaseSecurityEnabled()) { 577 try { 578 Connection peerConn = ConnectionFactory.createConnection(conf); 579 try { 580 TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); 581 } finally { 582 peerConn.close(); 583 } 584 } catch (InterruptedException e) { 585 LOG.info("Interrupted obtaining user authentication token"); 586 Thread.interrupted(); 587 } 588 } 589 } 590 591 /** 592 * Writes the given scan into a Base64 encoded string. 593 * 594 * @param scan The scan to write out. 595 * @return The scan saved in a Base64 encoded string. 596 * @throws IOException When writing the scan fails. 597 */ 598 public static String convertScanToString(Scan scan) throws IOException { 599 ClientProtos.Scan proto = ProtobufUtil.toScan(scan); 600 return Bytes.toString(Base64.getEncoder().encode(proto.toByteArray())); 601 } 602 603 /** 604 * Converts the given Base64 string back into a Scan instance. 605 * 606 * @param base64 The scan details. 607 * @return The newly created Scan instance. 608 * @throws IOException When reading the scan instance fails. 609 */ 610 public static Scan convertStringToScan(String base64) throws IOException { 611 byte [] decoded = Base64.getDecoder().decode(base64); 612 return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded)); 613 } 614 615 /** 616 * Use this before submitting a TableReduce job. It will 617 * appropriately set up the JobConf. 618 * 619 * @param table The output table. 620 * @param reducer The reducer class to use. 621 * @param job The current job to adjust. 622 * @throws IOException When determining the region count fails. 623 */ 624 public static void initTableReducerJob(String table, 625 Class<? extends TableReducer> reducer, Job job) 626 throws IOException { 627 initTableReducerJob(table, reducer, job, null); 628 } 629 630 /** 631 * Use this before submitting a TableReduce job. It will 632 * appropriately set up the JobConf. 633 * 634 * @param table The output table. 635 * @param reducer The reducer class to use. 636 * @param job The current job to adjust. 637 * @param partitioner Partitioner to use. Pass <code>null</code> to use 638 * default partitioner. 639 * @throws IOException When determining the region count fails. 640 */ 641 public static void initTableReducerJob(String table, 642 Class<? extends TableReducer> reducer, Job job, 643 Class partitioner) throws IOException { 644 initTableReducerJob(table, reducer, job, partitioner, null, null, null); 645 } 646 647 /** 648 * Use this before submitting a TableReduce job. It will 649 * appropriately set up the JobConf. 650 * 651 * @param table The output table. 652 * @param reducer The reducer class to use. 653 * @param job The current job to adjust. Make sure the passed job is 654 * carrying all necessary HBase configuration. 655 * @param partitioner Partitioner to use. Pass <code>null</code> to use 656 * default partitioner. 657 * @param quorumAddress Distant cluster to write to; default is null for 658 * output to the cluster that is designated in <code>hbase-site.xml</code>. 659 * Set this String to the zookeeper ensemble of an alternate remote cluster 660 * when you would have the reduce write a cluster that is other than the 661 * default; e.g. copying tables between clusters, the source would be 662 * designated by <code>hbase-site.xml</code> and this param would have the 663 * ensemble address of the remote cluster. The format to pass is particular. 664 * Pass <code> <hbase.zookeeper.quorum>:< 665 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 666 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 667 * @param serverClass redefined hbase.regionserver.class 668 * @param serverImpl redefined hbase.regionserver.impl 669 * @throws IOException When determining the region count fails. 670 */ 671 public static void initTableReducerJob(String table, 672 Class<? extends TableReducer> reducer, Job job, 673 Class partitioner, String quorumAddress, String serverClass, 674 String serverImpl) throws IOException { 675 initTableReducerJob(table, reducer, job, partitioner, quorumAddress, 676 serverClass, serverImpl, true); 677 } 678 679 /** 680 * Use this before submitting a TableReduce job. It will 681 * appropriately set up the JobConf. 682 * 683 * @param table The output table. 684 * @param reducer The reducer class to use. 685 * @param job The current job to adjust. Make sure the passed job is 686 * carrying all necessary HBase configuration. 687 * @param partitioner Partitioner to use. Pass <code>null</code> to use 688 * default partitioner. 689 * @param quorumAddress Distant cluster to write to; default is null for 690 * output to the cluster that is designated in <code>hbase-site.xml</code>. 691 * Set this String to the zookeeper ensemble of an alternate remote cluster 692 * when you would have the reduce write a cluster that is other than the 693 * default; e.g. copying tables between clusters, the source would be 694 * designated by <code>hbase-site.xml</code> and this param would have the 695 * ensemble address of the remote cluster. The format to pass is particular. 696 * Pass <code> <hbase.zookeeper.quorum>:< 697 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 698 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 699 * @param serverClass redefined hbase.regionserver.class 700 * @param serverImpl redefined hbase.regionserver.impl 701 * @param addDependencyJars upload HBase jars and jars for any of the configured 702 * job classes via the distributed cache (tmpjars). 703 * @throws IOException When determining the region count fails. 704 */ 705 public static void initTableReducerJob(String table, 706 Class<? extends TableReducer> reducer, Job job, 707 Class partitioner, String quorumAddress, String serverClass, 708 String serverImpl, boolean addDependencyJars) throws IOException { 709 710 Configuration conf = job.getConfiguration(); 711 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 712 job.setOutputFormatClass(TableOutputFormat.class); 713 if (reducer != null) job.setReducerClass(reducer); 714 conf.set(TableOutputFormat.OUTPUT_TABLE, table); 715 conf.setStrings("io.serializations", conf.get("io.serializations"), 716 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 717 // If passed a quorum/ensemble address, pass it on to TableOutputFormat. 718 if (quorumAddress != null) { 719 // Calling this will validate the format 720 ZKConfig.validateClusterKey(quorumAddress); 721 conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); 722 } 723 if (serverClass != null && serverImpl != null) { 724 conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); 725 conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); 726 } 727 job.setOutputKeyClass(ImmutableBytesWritable.class); 728 job.setOutputValueClass(Writable.class); 729 if (partitioner == HRegionPartitioner.class) { 730 job.setPartitionerClass(HRegionPartitioner.class); 731 int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table)); 732 if (job.getNumReduceTasks() > regions) { 733 job.setNumReduceTasks(regions); 734 } 735 } else if (partitioner != null) { 736 job.setPartitionerClass(partitioner); 737 } 738 739 if (addDependencyJars) { 740 addDependencyJars(job); 741 } 742 743 initCredentials(job); 744 } 745 746 /** 747 * Ensures that the given number of reduce tasks for the given job 748 * configuration does not exceed the number of regions for the given table. 749 * 750 * @param table The table to get the region count for. 751 * @param job The current job to adjust. 752 * @throws IOException When retrieving the table details fails. 753 */ 754 public static void limitNumReduceTasks(String table, Job job) 755 throws IOException { 756 int regions = 757 MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table)); 758 if (job.getNumReduceTasks() > regions) 759 job.setNumReduceTasks(regions); 760 } 761 762 /** 763 * Sets the number of reduce tasks for the given job configuration to the 764 * number of regions the given table has. 765 * 766 * @param table The table to get the region count for. 767 * @param job The current job to adjust. 768 * @throws IOException When retrieving the table details fails. 769 */ 770 public static void setNumReduceTasks(String table, Job job) 771 throws IOException { 772 job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(), 773 TableName.valueOf(table))); 774 } 775 776 /** 777 * Sets the number of rows to return and cache with each scanner iteration. 778 * Higher caching values will enable faster mapreduce jobs at the expense of 779 * requiring more heap to contain the cached rows. 780 * 781 * @param job The current job to adjust. 782 * @param batchSize The number of rows to return in batch with each scanner 783 * iteration. 784 */ 785 public static void setScannerCaching(Job job, int batchSize) { 786 job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); 787 } 788 789 /** 790 * Add HBase and its dependencies (only) to the job configuration. 791 * <p> 792 * This is intended as a low-level API, facilitating code reuse between this 793 * class and its mapred counterpart. It also of use to external tools that 794 * need to build a MapReduce job that interacts with HBase but want 795 * fine-grained control over the jars shipped to the cluster. 796 * </p> 797 * @param conf The Configuration object to extend with dependencies. 798 * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil 799 * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a> 800 */ 801 public static void addHBaseDependencyJars(Configuration conf) throws IOException { 802 addDependencyJarsForClasses(conf, 803 // explicitly pull a class from each module 804 org.apache.hadoop.hbase.HConstants.class, // hbase-common 805 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol 806 org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded 807 org.apache.hadoop.hbase.client.Put.class, // hbase-client 808 org.apache.hadoop.hbase.ipc.RpcServer.class, // hbase-server 809 org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat 810 org.apache.hadoop.hbase.mapreduce.JobUtil.class, // hbase-hadoop2-compat 811 org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-server 812 org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class, // hbase-metrics 813 org.apache.hadoop.hbase.metrics.Snapshot.class, // hbase-metrics-api 814 org.apache.zookeeper.ZooKeeper.class, 815 org.apache.hbase.thirdparty.io.netty.channel.Channel.class, 816 com.google.protobuf.Message.class, 817 org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.class, 818 org.apache.hbase.thirdparty.com.google.common.collect.Lists.class, 819 org.apache.htrace.core.Tracer.class, 820 com.codahale.metrics.MetricRegistry.class, 821 org.apache.commons.lang3.ArrayUtils.class, 822 com.fasterxml.jackson.databind.ObjectMapper.class, 823 com.fasterxml.jackson.core.Versioned.class, 824 com.fasterxml.jackson.annotation.JsonView.class, 825 org.apache.hadoop.hbase.zookeeper.ZKWatcher.class); 826 } 827 828 /** 829 * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. 830 * Also exposed to shell scripts via `bin/hbase mapredcp`. 831 */ 832 public static String buildDependencyClasspath(Configuration conf) { 833 if (conf == null) { 834 throw new IllegalArgumentException("Must provide a configuration object."); 835 } 836 Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars")); 837 if (paths.isEmpty()) { 838 throw new IllegalArgumentException("Configuration contains no tmpjars."); 839 } 840 StringBuilder sb = new StringBuilder(); 841 for (String s : paths) { 842 // entries can take the form 'file:/path/to/file.jar'. 843 int idx = s.indexOf(":"); 844 if (idx != -1) s = s.substring(idx + 1); 845 if (sb.length() > 0) sb.append(File.pathSeparator); 846 sb.append(s); 847 } 848 return sb.toString(); 849 } 850 851 /** 852 * Add the HBase dependency jars as well as jars for any of the configured 853 * job classes to the job configuration, so that JobClient will ship them 854 * to the cluster and add them to the DistributedCache. 855 */ 856 public static void addDependencyJars(Job job) throws IOException { 857 addHBaseDependencyJars(job.getConfiguration()); 858 try { 859 addDependencyJarsForClasses(job.getConfiguration(), 860 // when making changes here, consider also mapred.TableMapReduceUtil 861 // pull job classes 862 job.getMapOutputKeyClass(), 863 job.getMapOutputValueClass(), 864 job.getInputFormatClass(), 865 job.getOutputKeyClass(), 866 job.getOutputValueClass(), 867 job.getOutputFormatClass(), 868 job.getPartitionerClass(), 869 job.getCombinerClass()); 870 } catch (ClassNotFoundException e) { 871 throw new IOException(e); 872 } 873 } 874 875 /** 876 * Add the jars containing the given classes to the job's configuration 877 * such that JobClient will ship them to the cluster and add them to 878 * the DistributedCache. 879 * @deprecated rely on {@link #addDependencyJars(Job)} instead. 880 */ 881 @Deprecated 882 public static void addDependencyJars(Configuration conf, 883 Class<?>... classes) throws IOException { 884 LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it" 885 + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " + 886 "instead. See HBASE-8386 for more details."); 887 addDependencyJarsForClasses(conf, classes); 888 } 889 890 /** 891 * Add the jars containing the given classes to the job's configuration 892 * such that JobClient will ship them to the cluster and add them to 893 * the DistributedCache. 894 * 895 * N.B. that this method at most adds one jar per class given. If there is more than one 896 * jar available containing a class with the same name as a given class, we don't define 897 * which of those jars might be chosen. 898 * 899 * @param conf The Hadoop Configuration to modify 900 * @param classes will add just those dependencies needed to find the given classes 901 * @throws IOException if an underlying library call fails. 902 */ 903 @InterfaceAudience.Private 904 public static void addDependencyJarsForClasses(Configuration conf, 905 Class<?>... classes) throws IOException { 906 907 FileSystem localFs = FileSystem.getLocal(conf); 908 Set<String> jars = new HashSet<>(); 909 // Add jars that are already in the tmpjars variable 910 jars.addAll(conf.getStringCollection("tmpjars")); 911 912 // add jars as we find them to a map of contents jar name so that we can avoid 913 // creating new jars for classes that have already been packaged. 914 Map<String, String> packagedClasses = new HashMap<>(); 915 916 // Add jars containing the specified classes 917 for (Class<?> clazz : classes) { 918 if (clazz == null) continue; 919 920 Path path = findOrCreateJar(clazz, localFs, packagedClasses); 921 if (path == null) { 922 LOG.warn("Could not find jar for class " + clazz + 923 " in order to ship it to the cluster."); 924 continue; 925 } 926 if (!localFs.exists(path)) { 927 LOG.warn("Could not validate jar file " + path + " for class " 928 + clazz); 929 continue; 930 } 931 jars.add(path.toString()); 932 } 933 if (jars.isEmpty()) return; 934 935 conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); 936 } 937 938 /** 939 * Finds the Jar for a class or creates it if it doesn't exist. If the class is in 940 * a directory in the classpath, it creates a Jar on the fly with the 941 * contents of the directory and returns the path to that Jar. If a Jar is 942 * created, it is created in the system temporary directory. Otherwise, 943 * returns an existing jar that contains a class of the same name. Maintains 944 * a mapping from jar contents to the tmp jar created. 945 * @param my_class the class to find. 946 * @param fs the FileSystem with which to qualify the returned path. 947 * @param packagedClasses a map of class name to path. 948 * @return a jar file that contains the class. 949 * @throws IOException 950 */ 951 private static Path findOrCreateJar(Class<?> my_class, FileSystem fs, 952 Map<String, String> packagedClasses) 953 throws IOException { 954 // attempt to locate an existing jar for the class. 955 String jar = findContainingJar(my_class, packagedClasses); 956 if (null == jar || jar.isEmpty()) { 957 jar = getJar(my_class); 958 updateMap(jar, packagedClasses); 959 } 960 961 if (null == jar || jar.isEmpty()) { 962 return null; 963 } 964 965 LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); 966 return new Path(jar).makeQualified(fs.getUri(), fs.getWorkingDirectory()); 967 } 968 969 /** 970 * Add entries to <code>packagedClasses</code> corresponding to class files 971 * contained in <code>jar</code>. 972 * @param jar The jar who's content to list. 973 * @param packagedClasses map[class -> jar] 974 */ 975 private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException { 976 if (null == jar || jar.isEmpty()) { 977 return; 978 } 979 ZipFile zip = null; 980 try { 981 zip = new ZipFile(jar); 982 for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) { 983 ZipEntry entry = iter.nextElement(); 984 if (entry.getName().endsWith("class")) { 985 packagedClasses.put(entry.getName(), jar); 986 } 987 } 988 } finally { 989 if (null != zip) zip.close(); 990 } 991 } 992 993 /** 994 * Find a jar that contains a class of the same name, if any. It will return 995 * a jar file, even if that is not the first thing on the class path that 996 * has a class with the same name. Looks first on the classpath and then in 997 * the <code>packagedClasses</code> map. 998 * @param my_class the class to find. 999 * @return a jar file that contains the class, or null. 1000 * @throws IOException 1001 */ 1002 private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses) 1003 throws IOException { 1004 ClassLoader loader = my_class.getClassLoader(); 1005 1006 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; 1007 1008 if (loader != null) { 1009 // first search the classpath 1010 for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) { 1011 URL url = itr.nextElement(); 1012 if ("jar".equals(url.getProtocol())) { 1013 String toReturn = url.getPath(); 1014 if (toReturn.startsWith("file:")) { 1015 toReturn = toReturn.substring("file:".length()); 1016 } 1017 // URLDecoder is a misnamed class, since it actually decodes 1018 // x-www-form-urlencoded MIME type rather than actual 1019 // URL encoding (which the file path has). Therefore it would 1020 // decode +s to ' 's which is incorrect (spaces are actually 1021 // either unencoded or encoded as "%20"). Replace +s first, so 1022 // that they are kept sacred during the decoding process. 1023 toReturn = toReturn.replaceAll("\\+", "%2B"); 1024 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 1025 return toReturn.replaceAll("!.*$", ""); 1026 } 1027 } 1028 } 1029 1030 // now look in any jars we've packaged using JarFinder. Returns null when 1031 // no jar is found. 1032 return packagedClasses.get(class_file); 1033 } 1034 1035 /** 1036 * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job 1037 * configuration contexts (HBASE-8140) and also for testing on MRv2. 1038 * check if we have HADOOP-9426. 1039 * @param my_class the class to find. 1040 * @return a jar file that contains the class, or null. 1041 */ 1042 private static String getJar(Class<?> my_class) { 1043 String ret = null; 1044 try { 1045 ret = JarFinder.getJar(my_class); 1046 } catch (Exception e) { 1047 // toss all other exceptions, related to reflection failure 1048 throw new RuntimeException("getJar invocation failed.", e); 1049 } 1050 1051 return ret; 1052 } 1053}