/**
* Estimate the number of distinct elements in a data stream
* (F0 estimation problem) based on the algorithm published in the
* Proceedings of 30th Annual European Symposium on Algorithms (ESA 2022)
* https://doi.org/10.48550/arXiv.2301.10191
*/
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class F0Estimator {
/**
* Estimate number of unique elements in the passed stream
* @param storageSize The storage to use
*/
public static <T> long estimateF0(Stream<T> stream, int storageSize) {
final float LOAD_FACTOR = 0.5f;
// Probability to add an element; in an array as a workaround
// for using it in a lammbda expression
final double[] p = {1.0};
Set<T> X = new HashSet<>(storageSize , LOAD_FACTOR);
Random random = new Random();
stream.forEach(element -> {
if (random.nextDouble() < p[0])
X.add(element);
else
X.remove(element);
if (X.size() >= storageSize) {
// Randomly keep each element in X with probability 1/2
X.removeIf(e -> random.nextDouble() < 0.5);
p[0] /= 2;
if (X.size() >= storageSize) {
throw new IllegalStateException("Threshold exceeded after sampling");
}
}
});
return (long) (X.size() / p[0]);
}
public static void main(String[] args) {
// Create a Random instance
Random random = new Random();
// Create a stream of 1e9 random integers with 65536 distinct values
Stream<Integer> stream = IntStream
.generate(random::nextInt).limit(1_000_000_000)
.map(i -> i & 0xffff)
.boxed();
final int STORAGE_SIZE = 1000;
long uniqueCount = estimateF0(stream, STORAGE_SIZE);
System.out.println("Estimated number of unique elements: " + uniqueCount);
}
}