001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.util.Base64;
022
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.mapreduce.Partitioner;
031
032/**
033 * A partitioner that takes start and end keys and uses bigdecimal to figure
034 * which reduce a key belongs to.  Pass the start and end
035 * keys in the Configuration using <code>hbase.simpletotalorder.start</code>
036 * and <code>hbase.simpletotalorder.end</code>.  The end key needs to be
037 * exclusive; i.e. one larger than the biggest key in your key space.
038 * You may be surprised at how this class partitions the space; it may not
039 * align with preconceptions; e.g. a start key of zero and an end key of 100
040 * divided in ten will not make regions whose range is 0-10, 10-20, and so on.
041 * Make your own partitioner if you need the region spacing to come out a
042 * particular way.
043 * @param <VALUE>
044 * @see #START
045 * @see #END
046 */
047@InterfaceAudience.Public
048public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
049implements Configurable {
050  private final static Logger LOG = LoggerFactory.getLogger(SimpleTotalOrderPartitioner.class);
051
052  @Deprecated
053  public static final String START = "hbase.simpletotalorder.start";
054  @Deprecated
055  public static final String END = "hbase.simpletotalorder.end";
056
057  static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
058  static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
059
060  private Configuration c;
061  private byte [] startkey;
062  private byte [] endkey;
063  private byte [][] splits;
064  private int lastReduces = -1;
065
066  public static void setStartKey(Configuration conf, byte[] startKey) {
067    conf.set(START_BASE64, Bytes.toString(Base64.getEncoder().encode(startKey)));
068  }
069
070  public static void setEndKey(Configuration conf, byte[] endKey) {
071    conf.set(END_BASE64, Bytes.toString(Base64.getEncoder().encode(endKey)));
072  }
073
074  @SuppressWarnings("deprecation")
075  static byte[] getStartKey(Configuration conf) {
076    return getKeyFromConf(conf, START_BASE64, START);
077  }
078
079  @SuppressWarnings("deprecation")
080  static byte[] getEndKey(Configuration conf) {
081    return getKeyFromConf(conf, END_BASE64, END);
082  }
083
084  private static byte[] getKeyFromConf(Configuration conf,
085      String base64Key, String deprecatedKey) {
086    String encoded = conf.get(base64Key);
087    if (encoded != null) {
088      return Base64.getDecoder().decode(encoded);
089    }
090    String oldStyleVal = conf.get(deprecatedKey);
091    if (oldStyleVal == null) {
092      return null;
093    }
094    LOG.warn("Using deprecated configuration " + deprecatedKey +
095        " - please use static accessor methods instead.");
096    return Bytes.toBytesBinary(oldStyleVal);
097  }
098
099  @Override
100  public int getPartition(final ImmutableBytesWritable key, final VALUE value,
101      final int reduces) {
102    if (reduces == 1) return 0;
103    if (this.lastReduces != reduces) {
104      this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
105      for (int i = 0; i < splits.length; i++) {
106        LOG.info(Bytes.toStringBinary(splits[i]));
107      }
108      this.lastReduces = reduces;
109    }
110    int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
111      key.getLength());
112    // Below code is from hfile index search.
113    if (pos < 0) {
114      pos++;
115      pos *= -1;
116      if (pos == 0) {
117        // falls before the beginning of the file.
118        throw new RuntimeException("Key outside start/stop range: " +
119          key.toString());
120      }
121      pos--;
122    }
123    return pos;
124  }
125
126  @Override
127  public Configuration getConf() {
128    return this.c;
129  }
130
131  @Override
132  public void setConf(Configuration conf) {
133    this.c = conf;
134    this.startkey = getStartKey(conf);
135    this.endkey = getEndKey(conf);
136    if (startkey == null || endkey == null) {
137      throw new RuntimeException(this.getClass() + " not configured");
138    }
139    LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
140        ", endkey=" + Bytes.toStringBinary(endkey));
141    // Reset last reduces count on change of Start / End key
142    this.lastReduces = -1;
143  }
144}