Skip to content
This repository has been archived by the owner on Jul 7, 2020. It is now read-only.

Recordinality implementation #101

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.mdown
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ pages 693–703, London, UK, 2002. Springer-Verlag.
Algorithmic Engineering of a State of The Art Cardinality Estimation
Algorithm. Proceedings of the EDBT 2013 Conference, ACM, Genoa, Italy

* Ahmed Helmi, Jerémie Lumbroso, Conrado Martínez and Alfredo Viola. Data Streams as Random Permutations:
the Distinct Element Problem. 22nd International Meeting on Probabilistic, Combinatorial, and
Asymptotic Methods in the Analysis of Algorithms (AofA'12), 2012, Montreal, Canada.

#### Top-K

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package com.clearspring.analytics.stream.cardinality;


/**
* Java implementation of Recordinality (R) algorithm from this paper:
* <p/>
* http://www.dmtcs.org/pdfpapers/dmAQ0124.pdf
* <p/>
* Recordinality counts the number of records (more
* generally, k-records) in the sequence
* <p/>
* It depends in the underlying permutation of the first
* occurrences of distinct values, very different from the other
* estimators
* <p/>
* The Recordinality estimator =>
* Z = k * (1 + 1/k)^(rk - k + 1) -1
* <p/>
* E_n[Z] = n (It's an unbiased estimator of n)
* <p/>
* The accuracy of Recordinality in terms of SE, asymptotically, satisfacted:
* <p/>
* SE_n[Z] = sqrt( (n/ke)^(1/k) - 1 )
* <p/>
* You can find more information in these slides:
* <p/>
* https://www.cs.upc.edu/~conrado/research/talks/aofa2012.pdf
* <p/>
* <p>
* Users have different motivations to use different types of hashing functions.
* Rather than try to keep up with all available hash functions and to remove
* the concern of causing future binary incompatibilities this class allows clients
* to offer the value in hashed int or long form. This way clients are free
* to change their hash function on their own time line. We recommend using Google's
* Guava Murmur3_128 implementation as it provides good performance and speed when
* high precision is required. In our tests the 32bit MurmurHash function included
* in this project is faster and produces better results than the 32 bit murmur3
* implementation google provides.
* </p>
*/


import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.PriorityQueue;
import java.util.HashSet;
import com.clearspring.analytics.hash.MurmurHash;



public class Recordinality implements ICardinality, Serializable {
private final int sampleSize;
private final PriorityQueue<Long> sampleSet = new PriorityQueue<>();
private final HashSet<Long> elements = new HashSet<>();
private long rk;


/**
* Initializes a new Recordinality instance with a configurable 'k'-size.
*/
public Recordinality(int sampleSize) {
this.sampleSize = sampleSize;
this.rk = 0;
}

/**
* Process the offered hash.
* You can find a pseudocode in the description links
*/
public boolean offerHashed(long hashedLong) {
// if the element is not in the hashmap...
if (!elements.contains(hashedLong)) {
//if we don't have k-values this is a k-max
if (sampleSize > sampleSet.size()) {
elements.add(hashedLong);
sampleSet.add(hashedLong);
rk+=1;
return true;
// if we have k values but this is a k-max insert it and remove the minimum
} else if (sampleSet.peek() < hashedLong) {
elements.remove(sampleSet.peek());
elements.add(hashedLong);
sampleSet.poll();
sampleSet.add(hashedLong);
rk+=1;
return true;
}
}
return false;
}


@Override
public boolean offerHashed(int hashedInt) {
throw new UnsupportedOperationException();
}

@Override
public boolean offer(Object o) {
long x = MurmurHash.hash64(o);
return offerHashed(x);
}

/**
* Return a estimation of distinct values
* You can find a pseudocode in the description links
*/
@Override
public long cardinality() {
if (sampleSet.size() < sampleSize) return sampleSet.size();
else {
long pow = rk - sampleSize + 1;
double estimate = (sampleSize * (Math.pow(1 + (1.0 / sampleSize), pow))) - 1;
return (long) estimate;
}
}


/**
* Return a estimated Standar Error from the estimated cardinality
*/
public double estimatedStandarError(){
if (sampleSet.size() < sampleSize) return 0;
else {
long estimateCardinality = cardinality();
double pow = 1.0/sampleSize;
return Math.sqrt( Math.pow(
estimateCardinality/(sampleSize*Math.E), pow)
- 1);
}
}

@Override
public int sizeof() {
return sampleSet.size() * 8;
}

@Override
public byte[] getBytes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutput dos = new DataOutputStream(baos);
writeBytes(dos);

return baos.toByteArray();
}

private void writeBytes(DataOutput serializedByteStream) throws IOException {
serializedByteStream.writeInt(sampleSize);
serializedByteStream.writeInt(elements.size() * 8);

for (Long e : elements) {
serializedByteStream.writeLong(e);
}
}

@Override
public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.clearspring.analytics.stream.cardinality;


/**
* Copyright (C) 2011 Clearspring Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import java.io.IOException;
import com.clearspring.analytics.TestUtils;
import org.junit.Ignore;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class TestRecordinality {

@Test
public void testComputeCount() {
Recordinality recordinality = new Recordinality(16);
recordinality.offer(0);
recordinality.offer(1);
recordinality.offer(2);
recordinality.offer(3);
recordinality.offer(16);
recordinality.offer(17);
recordinality.offer(18);
recordinality.offer(19);
recordinality.offer(19);
assertEquals(8, recordinality.cardinality());
}

@Test
public void testSerialization() throws IOException, ClassNotFoundException {
Recordinality r = new Recordinality(8);
r.offer("a");
r.offer("b");
r.offer("c");
r.offer("d");
r.offer("e");

Recordinality r2 = (Recordinality) TestUtils.deserialize(TestUtils.serialize(r));
assertEquals(r.cardinality(), r2.cardinality());
}

/**
* should not fail with probability 1/100
*/
@Test
@Ignore
public void testHighCardinality() {
int counter = 0;
for (int j = 0; j < 3; ++j) {
long start = System.currentTimeMillis();
Recordinality recordinality = new Recordinality(10);
int size = 10000000;
for (int i = 0; i < size; i++) {
recordinality.offer(TestICardinality.streamElement(i));
}
System.out.println("time: " + (System.currentTimeMillis() - start));
/**
* the algorithm RECORDINALITY is expected to provide estimates
* within σ, 2σ, 3σ of the exact count in respectively at least
* 68%, 95% and 99% of all cases.
*/
long estimate = recordinality.cardinality();
double estimatedError = recordinality.estimatedStandarError();
long permittedError = (long) (3*size*estimatedError);
long err = Math.abs(estimate - size);

if (err > permittedError) ++counter;

}
System.out.println("If counter (> 1) rerun the test. \nIf you have already done it, something is broken");
System.out.println("Counter: " + counter);
assertTrue(counter < 2);
}
}