001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.util.Base64; 022 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.mapreduce.Partitioner; 031 032/** 033 * A partitioner that takes start and end keys and uses bigdecimal to figure 034 * which reduce a key belongs to. Pass the start and end 035 * keys in the Configuration using <code>hbase.simpletotalorder.start</code> 036 * and <code>hbase.simpletotalorder.end</code>. The end key needs to be 037 * exclusive; i.e. one larger than the biggest key in your key space. 038 * You may be surprised at how this class partitions the space; it may not 039 * align with preconceptions; e.g. a start key of zero and an end key of 100 040 * divided in ten will not make regions whose range is 0-10, 10-20, and so on. 041 * Make your own partitioner if you need the region spacing to come out a 042 * particular way. 043 * @param <VALUE> 044 * @see #START 045 * @see #END 046 */ 047@InterfaceAudience.Public 048public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE> 049implements Configurable { 050 private final static Logger LOG = LoggerFactory.getLogger(SimpleTotalOrderPartitioner.class); 051 052 @Deprecated 053 public static final String START = "hbase.simpletotalorder.start"; 054 @Deprecated 055 public static final String END = "hbase.simpletotalorder.end"; 056 057 static final String START_BASE64 = "hbase.simpletotalorder.start.base64"; 058 static final String END_BASE64 = "hbase.simpletotalorder.end.base64"; 059 060 private Configuration c; 061 private byte [] startkey; 062 private byte [] endkey; 063 private byte [][] splits; 064 private int lastReduces = -1; 065 066 public static void setStartKey(Configuration conf, byte[] startKey) { 067 conf.set(START_BASE64, Bytes.toString(Base64.getEncoder().encode(startKey))); 068 } 069 070 public static void setEndKey(Configuration conf, byte[] endKey) { 071 conf.set(END_BASE64, Bytes.toString(Base64.getEncoder().encode(endKey))); 072 } 073 074 @SuppressWarnings("deprecation") 075 static byte[] getStartKey(Configuration conf) { 076 return getKeyFromConf(conf, START_BASE64, START); 077 } 078 079 @SuppressWarnings("deprecation") 080 static byte[] getEndKey(Configuration conf) { 081 return getKeyFromConf(conf, END_BASE64, END); 082 } 083 084 private static byte[] getKeyFromConf(Configuration conf, 085 String base64Key, String deprecatedKey) { 086 String encoded = conf.get(base64Key); 087 if (encoded != null) { 088 return Base64.getDecoder().decode(encoded); 089 } 090 String oldStyleVal = conf.get(deprecatedKey); 091 if (oldStyleVal == null) { 092 return null; 093 } 094 LOG.warn("Using deprecated configuration " + deprecatedKey + 095 " - please use static accessor methods instead."); 096 return Bytes.toBytesBinary(oldStyleVal); 097 } 098 099 @Override 100 public int getPartition(final ImmutableBytesWritable key, final VALUE value, 101 final int reduces) { 102 if (reduces == 1) return 0; 103 if (this.lastReduces != reduces) { 104 this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1); 105 for (int i = 0; i < splits.length; i++) { 106 LOG.info(Bytes.toStringBinary(splits[i])); 107 } 108 this.lastReduces = reduces; 109 } 110 int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(), 111 key.getLength()); 112 // Below code is from hfile index search. 113 if (pos < 0) { 114 pos++; 115 pos *= -1; 116 if (pos == 0) { 117 // falls before the beginning of the file. 118 throw new RuntimeException("Key outside start/stop range: " + 119 key.toString()); 120 } 121 pos--; 122 } 123 return pos; 124 } 125 126 @Override 127 public Configuration getConf() { 128 return this.c; 129 } 130 131 @Override 132 public void setConf(Configuration conf) { 133 this.c = conf; 134 this.startkey = getStartKey(conf); 135 this.endkey = getEndKey(conf); 136 if (startkey == null || endkey == null) { 137 throw new RuntimeException(this.getClass() + " not configured"); 138 } 139 LOG.info("startkey=" + Bytes.toStringBinary(startkey) + 140 ", endkey=" + Bytes.toStringBinary(endkey)); 141 // Reset last reduces count on change of Start / End key 142 this.lastReduces = -1; 143 } 144}