001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.util.HashSet;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033
034
035/**
036 * LossyCounting utility, bounded data structure that maintains approximate high frequency
037 * elements in data stream.
038 *
039 * Bucket size is 1 / error rate.  (Error rate is 0.02 by default)
040 * Lemma If element does not appear in set, then is frequency is less than e * N
041 *       (N is total element counts until now.)
042 * Based on paper:
043 * http://www.vldb.org/conf/2002/S10P03.pdf
044 */
045
046@InterfaceAudience.Public
047public class LossyCounting {
048  private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
049  private long bucketSize;
050  private long currentTerm;
051  private double errorRate;
052  private Map<String, Integer> data;
053  private long totalDataCount;
054  private String name;
055
056  public LossyCounting(double errorRate, String name) {
057    this.errorRate = errorRate;
058    this.name = name;
059    if (errorRate < 0.0 || errorRate > 1.0) {
060      throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
061    }
062    this.bucketSize = (long) Math.ceil(1 / errorRate);
063    this.currentTerm = 1;
064    this.totalDataCount = 0;
065    this.data = new ConcurrentHashMap<>();
066    calculateCurrentTerm();
067  }
068
069  public LossyCounting(String name) {
070    this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
071        name);
072  }
073
074  public Set<String> addByOne(String key) {
075    data.put(key, data.getOrDefault(key, 0) + 1);
076    totalDataCount++;
077    calculateCurrentTerm();
078    Set<String> dataToBeSwept = new HashSet<>();
079    if(totalDataCount % bucketSize == 0) {
080      dataToBeSwept = sweep();
081    }
082    return dataToBeSwept;
083  }
084
085  /**
086   * sweep low frequency data
087   * @return Names of elements got swept
088   */
089  private Set<String> sweep() {
090    Set<String> dataToBeSwept = new HashSet<>();
091    for(Map.Entry<String, Integer> entry : data.entrySet()) {
092      if(entry.getValue() + errorRate < currentTerm) {
093        dataToBeSwept.add(entry.getKey());
094      }
095    }
096    for(String key : dataToBeSwept) {
097      data.remove(key);
098    }
099    LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
100    return dataToBeSwept;
101  }
102
103  /**
104   * Calculate and set current term
105   */
106  private void calculateCurrentTerm() {
107    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
108  }
109
110  public long getBucketSize(){
111    return bucketSize;
112  }
113
114  public long getDataSize() {
115    return data.size();
116  }
117
118  public boolean contains(String key) {
119    return data.containsKey(key);
120  }
121
122  public long getCurrentTerm() {
123    return currentTerm;
124  }
125}
126