001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.util.HashSet; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026 027import org.apache.hadoop.hbase.HBaseConfiguration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033 034 035/** 036 * LossyCounting utility, bounded data structure that maintains approximate high frequency 037 * elements in data stream. 038 * 039 * Bucket size is 1 / error rate. (Error rate is 0.02 by default) 040 * Lemma If element does not appear in set, then is frequency is less than e * N 041 * (N is total element counts until now.) 042 * Based on paper: 043 * http://www.vldb.org/conf/2002/S10P03.pdf 044 */ 045 046@InterfaceAudience.Public 047public class LossyCounting { 048 private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class); 049 private long bucketSize; 050 private long currentTerm; 051 private double errorRate; 052 private Map<String, Integer> data; 053 private long totalDataCount; 054 private String name; 055 056 public LossyCounting(double errorRate, String name) { 057 this.errorRate = errorRate; 058 this.name = name; 059 if (errorRate < 0.0 || errorRate > 1.0) { 060 throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]"); 061 } 062 this.bucketSize = (long) Math.ceil(1 / errorRate); 063 this.currentTerm = 1; 064 this.totalDataCount = 0; 065 this.data = new ConcurrentHashMap<>(); 066 calculateCurrentTerm(); 067 } 068 069 public LossyCounting(String name) { 070 this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02), 071 name); 072 } 073 074 public Set<String> addByOne(String key) { 075 data.put(key, data.getOrDefault(key, 0) + 1); 076 totalDataCount++; 077 calculateCurrentTerm(); 078 Set<String> dataToBeSwept = new HashSet<>(); 079 if(totalDataCount % bucketSize == 0) { 080 dataToBeSwept = sweep(); 081 } 082 return dataToBeSwept; 083 } 084 085 /** 086 * sweep low frequency data 087 * @return Names of elements got swept 088 */ 089 private Set<String> sweep() { 090 Set<String> dataToBeSwept = new HashSet<>(); 091 for(Map.Entry<String, Integer> entry : data.entrySet()) { 092 if(entry.getValue() + errorRate < currentTerm) { 093 dataToBeSwept.add(entry.getKey()); 094 } 095 } 096 for(String key : dataToBeSwept) { 097 data.remove(key); 098 } 099 LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size())); 100 return dataToBeSwept; 101 } 102 103 /** 104 * Calculate and set current term 105 */ 106 private void calculateCurrentTerm() { 107 this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize); 108 } 109 110 public long getBucketSize(){ 111 return bucketSize; 112 } 113 114 public long getDataSize() { 115 return data.size(); 116 } 117 118 public boolean contains(String key) { 119 return data.containsKey(key); 120 } 121 122 public long getCurrentTerm() { 123 return currentTerm; 124 } 125} 126