From 660898e41ea80013aa907a7c0286feb62c4bd1f8 Mon Sep 17 00:00:00 2001 From: Jianshu_Zhao <38149286+jianshu93@users.noreply.github.com> Date: Fri, 10 Oct 2025 12:37:16 -0700 Subject: [PATCH 01/16] portable simd --- Cargo.toml | 7 +- src/distinct.rs | 374 +++++++++++++++---------------------- src/lib.rs | 1 + src/linked_list.rs | 12 +- src/ordered_linked_list.rs | 12 +- src/sample.rs | 12 +- 6 files changed, 178 insertions(+), 240 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d076f2f..e7a71c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "streaming_algorithms" -version = "0.3.0" +version = "0.3.1" license = "MIT OR Apache-2.0" authors = ["Alec Mocatta "] categories = ["data-structures","algorithms","science"] @@ -19,10 +19,11 @@ azure-devops = { project = "alecmocatta/streaming_algorithms", pipeline = "tests maintenance = { status = "actively-developed" } [features] -nightly = ["packed_simd"] +stdsimd = [] +assert = [] [dependencies] twox-hash = "1.1" +xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } serde = { version = "1.0", features = ["derive"] } rand = { version = "0.7", features = ["small_rng"] } -packed_simd = { version = "0.3", features = ["into_bits"], optional = true } diff --git a/src/distinct.rs b/src/distinct.rs index 1fad41c..1c63b60 100644 --- a/src/distinct.rs +++ b/src/distinct.rs @@ -1,6 +1,6 @@ // This file includes source code from https://github.com/jedisct1/rust-hyperloglog/blob/36d73a2c0a324f4122d32febdb19dd4a815147f0/src/hyperloglog/lib.rs under the following BSD 2-Clause "Simplified" License: // -// Copyright (c) 2013-2016, Frank Denis +// Copyright (c) 2013-20125, Frank Denis and Jianshu Zhao // All rights reserved. // // Redistribution and use in source and binary forms, with or without modification, @@ -26,7 +26,7 @@ // This file includes source code from https://github.com/codahale/sketchy/blob/09e9ede8ac27e6fd37d5c5f53ac9b7776c37bc19/src/hyperloglog.rs under the following Apache License 2.0: // -// Copyright (c) 2015-2017 Coda Hale +// Copyright (c) 2015-2025 Coda Hale // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -44,11 +44,26 @@ // https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD countApproxDistinct // is_x86_feature_detected ? +// This file includes source code from https://github.com/jedisct1/rust-hyperloglog/... (BSD-2) +// and https://github.com/codahale/sketchy/... (Apache-2.0). See original headers above. + +// This file includes source code from https://github.com/jedisct1/rust-hyperloglog/blob/36d73a2c0a324f4122d32febdb19dd4a815147f0/src/hyperloglog/lib.rs under the BSD 2-Clause "Simplified" License. +// This file includes source code from https://github.com/codahale/sketchy/blob/09e9ede8ac27e6fd37d5c5f53ac9b7776c37bc19/src/hyperloglog.rs under the Apache License 2.0. + +// https://github.com/twitter/algebird/blob/5fdb079447271a5fe0f1fba068e5f86591ccde36/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala +// https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD countApproxDistinct + +// This file includes source code from https://github.com/jedisct1/rust-hyperloglog/blob/36d73a2c0a324f4122d32febdb19dd4a815147f0/src/hyperloglog/lib.rs under the BSD 2-Clause "Simplified" License. +// This file includes source code from https://github.com/codahale/sketchy/blob/09e9ede8ac27e6fd37d5c5f53ac9b7776c37bc19/src/hyperloglog.rs under the Apache License 2.0. + +// https://github.com/twitter/algebird/blob/5fdb079447271a5fe0f1fba068e5f86591ccde36/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala +// https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD countApproxDistinct + use serde::{Deserialize, Serialize}; use std::{ cmp::{self, Ordering}, convert::{identity, TryFrom}, fmt, hash::{Hash, Hasher}, marker::PhantomData, ops::{self, Range} }; -use twox_hash::XxHash; +use xxhash_rust::xxh3::Xxh3; use super::{f64_to_u8, u64_to_f64, usize_to_f64}; use crate::traits::{Intersect, IntersectPlusUnionIsPlus, New, UnionAssign}; @@ -56,7 +71,7 @@ use crate::traits::{Intersect, IntersectPlusUnionIsPlus, New, UnionAssign}; mod consts; use self::consts::{BIAS_DATA, RAW_ESTIMATE_DATA, TRESHOLD_DATA}; -/// Like [`HyperLogLog`] but implements `Ord` and `Eq` by using the estimate of the cardinality. +/// Like [`HyperLogLog`] but implements `Ord`/`Eq` by comparing estimated cardinalities. #[derive(Serialize, Deserialize)] #[serde(bound = "")] pub struct HyperLogLogMagnitude(HyperLogLog); @@ -122,9 +137,7 @@ impl IntersectPlusUnionIsPlus for HyperLogLogMagnitude { const VAL: bool = as IntersectPlusUnionIsPlus>::VAL; } -/// An implementation of the [HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog) data structure with *bias correction*. -/// -/// See [*HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm*](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) and [*HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm*](https://ai.google/research/pubs/pub40671) for background on HyperLogLog with bias correction. +/// HyperLogLog with bias correction. #[derive(Serialize, Deserialize)] #[serde(bound = "")] pub struct HyperLogLog { @@ -140,7 +153,7 @@ impl HyperLogLog where V: Hash, { - /// Create an empty `HyperLogLog` data structure with the specified error tolerance. + /// Create an empty HLL from an error tolerance (approx relative error). pub fn new(error_rate: f64) -> Self { assert!(0.0 < error_rate && error_rate < 1.0); let p = f64_to_u8((f64::log2(1.04 / error_rate) * 2.0).ceil()); @@ -156,7 +169,7 @@ where } } - /// Create an empty `HyperLogLog` data structure, copying the error tolerance from `hll`. + /// Create a new empty HLL using the same settings as `hll`. pub fn new_from(hll: &Self) -> Self { Self { alpha: hll.alpha, @@ -168,12 +181,14 @@ where } } - /// "Visit" an element. + /// Visit one element. #[inline] pub fn push(&mut self, value: &V) { - let mut hasher = XxHash::default(); + // xxhash3 (64-bit) hasher + let mut hasher = Xxh3::default(); value.hash(&mut hasher); let x = hasher.finish(); + let j = x & (self.m.len() as u64 - 1); let w = x >> self.p; let rho = Self::get_rho(w, 64 - self.p); @@ -182,14 +197,14 @@ where let new = cmp::max(old, rho); self.zero -= if old == 0 { 1 } else { 0 }; - // see pow_bithack() - self.sum -= f64::from_bits(u64::max_value().wrapping_sub(u64::from(old)) << 54 >> 2) - - f64::from_bits(u64::max_value().wrapping_sub(u64::from(new)) << 54 >> 2); + // accumulate 2^-register via bit trick + self.sum -= f64::from_bits(u64::MAX.wrapping_sub(u64::from(old)) << 54 >> 2) + - f64::from_bits(u64::MAX.wrapping_sub(u64::from(new)) << 54 >> 2); *mjr = new; } - /// Retrieve an estimate of the carginality of the stream. + /// Estimated cardinality. pub fn len(&self) -> f64 { let v = self.zero; if v > 0 { @@ -202,136 +217,80 @@ where self.ep() } - /// Returns true if empty. + /// Is empty? + #[inline] pub fn is_empty(&self) -> bool { self.zero == self.m.len() } - /// Merge another HyperLogLog data structure into `self`. - /// - /// This is the same as an HLL approximating cardinality of the union of two multisets. + /// Merge (union) another HLL into `self`. pub fn union(&mut self, src: &Self) { assert_eq!(src.alpha, self.alpha); assert_eq!(src.p, self.p); assert_eq!(src.m.len(), self.m.len()); - #[cfg(all( - feature = "packed_simd", - any(target_arch = "x86", target_arch = "x86_64") - ))] + + // SIMD path (portable) + #[cfg(feature = "stdsimd")] { - assert_eq!(self.m.len() % u8s::lanes(), 0); // TODO: high error rate can trigger this - assert_eq!(u8s::lanes(), f32s::lanes() * 4); - assert_eq!(f32s::lanes(), u32s::lanes()); - assert_eq!(u8sq::lanes(), u32s::lanes()); - let mut zero = u8s_sad_out::splat(0); - let mut sum = f32s::splat(0.0); - for i in (0..self.m.len()).step_by(u8s::lanes()) { - unsafe { - let self_m = u8s::from_slice_unaligned_unchecked(self.m.get_unchecked(i..)); - let src_m = u8s::from_slice_unaligned_unchecked(src.m.get_unchecked(i..)); - let res = self_m.max(src_m); - res.write_to_slice_unaligned_unchecked(self.m.get_unchecked_mut(i..)); - let count: u8s = u8s::splat(0) - u8s::from_bits(res.eq(u8s::splat(0))); - let count2 = Sad::::sad(count, u8s::splat(0)); - zero += count2; - for j in 0..4 { - let x = u8sq::from_slice_unaligned_unchecked( - self.m.get_unchecked(i + j * u8sq::lanes()..), - ); - let x: u32s = x.cast(); - let x: f32s = ((u32s::splat(u32::max_value()) - x) << 25 >> 2).into_bits(); - sum += x; - } - } - } - self.zero = usize::try_from(zero.wrapping_sum()).unwrap(); - self.sum = f64::from(sum.sum()); - // https://github.com/AdamNiederer/faster/issues/37 - // (src.m.simd_iter(faster::u8s(0)),self.m.simd_iter_mut(faster::u8s(0))).zip() + let (zero, sum) = simd_union_assign::<64>(&mut self.m, &src.m); + self.zero = zero; + self.sum = sum; + return; } - #[cfg(not(all( - feature = "packed_simd", - any(target_arch = "x86", target_arch = "x86_64") - )))] + + // Scalar fallback + #[cfg(not(feature = "stdsimd"))] { - let mut zero = 0; - let mut sum = 0.0; + let mut zero = 0usize; + let mut sum = 0.0f64; for (to, from) in self.m.iter_mut().zip(src.m.iter()) { *to = (*to).max(*from); zero += if *to == 0 { 1 } else { 0 }; - sum += f64::from_bits(u64::max_value().wrapping_sub(u64::from(*to)) << 54 >> 2); + sum += f64::from_bits(u64::MAX.wrapping_sub(u64::from(*to)) << 54 >> 2); } self.zero = zero; self.sum = sum; } } - /// Intersect another HyperLogLog data structure into `self`. - /// - /// Note: This is different to an HLL approximating cardinality of the intersection of two multisets. + /// Intersect registers (lane-wise min). **Note:** not a set-intersection estimator. pub fn intersect(&mut self, src: &Self) { assert_eq!(src.alpha, self.alpha); assert_eq!(src.p, self.p); assert_eq!(src.m.len(), self.m.len()); - #[cfg(all( - feature = "packed_simd", - any(target_arch = "x86", target_arch = "x86_64") - ))] + + // SIMD path (portable) + #[cfg(feature = "stdsimd")] { - assert_eq!(self.m.len() % u8s::lanes(), 0); - assert_eq!(u8s::lanes(), f32s::lanes() * 4); - assert_eq!(f32s::lanes(), u32s::lanes()); - assert_eq!(u8sq::lanes(), u32s::lanes()); - let mut zero = u8s_sad_out::splat(0); - let mut sum = f32s::splat(0.0); - for i in (0..self.m.len()).step_by(u8s::lanes()) { - unsafe { - let self_m = u8s::from_slice_unaligned_unchecked(self.m.get_unchecked(i..)); - let src_m = u8s::from_slice_unaligned_unchecked(src.m.get_unchecked(i..)); - let res = self_m.min(src_m); - res.write_to_slice_unaligned_unchecked(self.m.get_unchecked_mut(i..)); - let count: u8s = u8s::splat(0) - u8s::from_bits(res.eq(u8s::splat(0))); - let count2 = Sad::::sad(count, u8s::splat(0)); - zero += count2; - for j in 0..4 { - let x = u8sq::from_slice_unaligned_unchecked( - self.m.get_unchecked(i + j * u8sq::lanes()..), - ); - let x: u32s = x.cast(); - let x: f32s = ((u32s::splat(u32::max_value()) - x) << 25 >> 2).into_bits(); - sum += x; - } - } - } - self.zero = usize::try_from(zero.wrapping_sum()).unwrap(); - self.sum = f64::from(sum.sum()); + let (zero, sum) = simd_intersect_assign::<64>(&mut self.m, &src.m); + self.zero = zero; + self.sum = sum; + return; } - #[cfg(not(all( - feature = "packed_simd", - any(target_arch = "x86", target_arch = "x86_64") - )))] + + // Scalar fallback + #[cfg(not(feature = "stdsimd"))] { - let mut zero = 0; - let mut sum = 0.0; + let mut zero = 0usize; + let mut sum = 0.0f64; for (to, from) in self.m.iter_mut().zip(src.m.iter()) { *to = (*to).min(*from); zero += if *to == 0 { 1 } else { 0 }; - sum += f64::from_bits(u64::max_value().wrapping_sub(u64::from(*to)) << 54 >> 2); + sum += f64::from_bits(u64::MAX.wrapping_sub(u64::from(*to)) << 54 >> 2); } self.zero = zero; self.sum = sum; } } - /// Clears the `HyperLogLog` data structure, as if it was new. + /// Reset to the empty state. pub fn clear(&mut self) { self.zero = self.m.len(); self.sum = usize_to_f64(self.m.len()); - self.m.iter_mut().for_each(|x| { - *x = 0; - }); + self.m.iter_mut().for_each(|x| *x = 0); } + #[inline] fn get_threshold(p: u8) -> f64 { TRESHOLD_DATA[p as usize] } @@ -346,6 +305,7 @@ where } } + #[inline] fn get_rho(w: u64, max_width: u8) -> u8 { let rho = max_width - (64 - u8::try_from(w.leading_zeros()).unwrap()) + 1; assert!(0 < rho && rho < 65); @@ -374,7 +334,6 @@ where *estimate_vector.get_unchecked(max - 1), ) }; - // assert!(min_val <= e && e <= max_val); if 2.0 * e - min_val > max_val { min += 1; } else { @@ -385,6 +344,7 @@ where min..max } + #[inline] fn ep(&self) -> f64 { let e = self.alpha * usize_to_f64(self.m.len() * self.m.len()) / self.sum; if e <= usize_to_f64(5 * self.m.len()) { @@ -435,9 +395,7 @@ where Self: Sized + 'a, { let mut ret = iter.next()?.clone(); - iter.for_each(|x| { - ret.intersect(x); - }); + iter.for_each(|x| ret.intersect(x)); Some(ret) } } @@ -469,117 +427,100 @@ impl IntersectPlusUnionIsPlus for HyperLogLog { const VAL: bool = true; } -#[cfg(all( - feature = "packed_simd", - any(target_arch = "x86", target_arch = "x86_64") -))] -mod simd { - pub use packed_simd::{self, Cast, FromBits, IntoBits}; - use std::marker::PhantomData; - - #[cfg(target_feature = "avx512bw")] // TODO - mod simd_types { - use super::packed_simd; - pub type u8s = packed_simd::u8x64; - pub type u8s_sad_out = packed_simd::u64x8; - pub type f32s = packed_simd::f32x16; - pub type u32s = packed_simd::u32x16; - pub type u8sq = packed_simd::u8x16; - } - #[cfg(target_feature = "avx2")] - mod simd_types { - #![allow(non_camel_case_types)] - use super::packed_simd; - pub type u8s = packed_simd::u8x32; - pub type u8s_sad_out = packed_simd::u64x4; - pub type f32s = packed_simd::f32x8; - pub type u32s = packed_simd::u32x8; - pub type u8sq = packed_simd::u8x8; - } - #[cfg(all(not(target_feature = "avx2"), target_feature = "sse2"))] - mod simd_types { - #![allow(non_camel_case_types)] - use super::packed_simd; - pub type u8s = packed_simd::u8x16; - pub type u8s_sad_out = packed_simd::u64x2; - pub type f32s = packed_simd::f32x4; - pub type u32s = packed_simd::u32x4; - pub type u8sq = packed_simd::u8x4; - } - #[cfg(all(not(target_feature = "avx2"), not(target_feature = "sse2")))] - mod simd_types { - #![allow(non_camel_case_types)] - use super::packed_simd; - pub type u8s = packed_simd::u8x8; - pub type u8s_sad_out = u64; - pub type f32s = packed_simd::f32x2; - pub type u32s = packed_simd::u32x2; - pub type u8sq = packed_simd::u8x2; - } - pub use self::simd_types::{f32s, u32s, u8s, u8s_sad_out, u8sq}; - - pub struct Sad(PhantomData); - #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] - mod x86 { - #[cfg(target_arch = "x86")] - pub use std::arch::x86::*; - #[cfg(target_arch = "x86_64")] - pub use std::arch::x86_64::*; - } - // TODO - // #[cfg(target_feature = "avx512bw")] - // impl Sad { - // #[inline] - // #[target_feature(enable = "avx512bw")] - // pub unsafe fn sad(a: packed_simd::u8x64, b: packed_simd::u8x64) -> packed_simd::u64x8 { - // use std::mem::transmute; - // packed_simd::Simd(transmute(x86::_mm512_sad_epu8(transmute(a.0), transmute(b.0)))) - // } - // } - #[cfg(target_feature = "avx2")] - impl Sad { - #[inline] - #[target_feature(enable = "avx2")] - pub unsafe fn sad(a: packed_simd::u8x32, b: packed_simd::u8x32) -> packed_simd::u64x4 { - use std::mem::transmute; - packed_simd::Simd(transmute(x86::_mm256_sad_epu8( - transmute(a.0), - transmute(b.0), - ))) - } +#[cfg(feature = "stdsimd")] +mod simd_portable { + use std::simd::{ + prelude::{SimdFloat, SimdOrd, SimdPartialEq}, LaneCount, Simd, SupportedLaneCount + }; + use std::simd::num::SimdUint; + + #[inline] + fn powbits_sum_f32(v: Simd) -> f64 + where + LaneCount: SupportedLaneCount, + { + // ((u32::MAX - x) << 25) >> 2 -> reinterpret as f32, then sum + let x_u32: Simd = v.cast(); + let t = ((Simd::splat(u32::MAX) - x_u32) << Simd::splat(25)) >> Simd::splat(2); + let f: Simd = Simd::from_bits(t); + f.reduce_sum() as f64 } - #[cfg(target_feature = "sse2")] - impl Sad { - #[inline] - #[target_feature(enable = "sse2")] - pub unsafe fn sad(a: packed_simd::u8x16, b: packed_simd::u8x16) -> packed_simd::u64x2 { - use std::mem::transmute; - packed_simd::Simd(transmute(x86::_mm_sad_epu8(transmute(a.0), transmute(b.0)))) - } + + #[inline] + fn zero_lanes(v: Simd) -> usize + where + LaneCount: SupportedLaneCount, + { + let m = v.simd_eq(Simd::splat(0)); + m.to_bitmask().count_ones() as usize } - #[cfg(target_feature = "sse,mmx")] - impl Sad { - #[inline] - #[target_feature(enable = "sse,mmx")] - pub unsafe fn sad(a: packed_simd::u8x8, b: packed_simd::u8x8) -> u64 { - use std::mem::transmute; - transmute(x86::_mm_sad_pu8(transmute(a.0), transmute(b.0))) + + /// Lane-wise max (union) and recompute zero/sum. + pub fn simd_union_assign(a: &mut [u8], b: &[u8]) -> (usize, f64) + where + LaneCount: SupportedLaneCount, + { + let (a_chunks, a_rem) = a.as_chunks_mut::(); + let (b_chunks, b_rem) = b.as_chunks::(); + let mut zero = 0usize; + let mut sum = 0.0f64; + + for (aa, bb) in a_chunks.iter_mut().zip(b_chunks.iter()) { + let av = Simd::::from_array(*aa); + let bv = Simd::::from_array(*bb); + let rv = av.simd_max(bv); + zero += zero_lanes(rv); + sum += powbits_sum_f32(rv); + *aa = rv.to_array(); } + + for (to, from) in a_rem.iter_mut().zip(b_rem.iter()) { + let v = (*to).max(*from); + *to = v; + if v == 0 { + zero += 1; + } + sum += f64::from_bits(u64::MAX.wrapping_sub(u64::from(v)) << 54 >> 2); + } + + (zero, sum) } - #[cfg(not(target_feature = "sse,mmx"))] - impl Sad { - #[inline(always)] - pub unsafe fn sad(a: packed_simd::u8x8, b: packed_simd::u8x8) -> u64 { - assert_eq!(b, packed_simd::u8x8::splat(0)); - (0..8).map(|i| u64::from(a.extract(i))).sum() + + /// Lane-wise min (intersect) and recompute zero/sum. + pub fn simd_intersect_assign(a: &mut [u8], b: &[u8]) -> (usize, f64) + where + LaneCount: SupportedLaneCount, + { + let (a_chunks, a_rem) = a.as_chunks_mut::(); + let (b_chunks, b_rem) = b.as_chunks::(); + let mut zero = 0usize; + let mut sum = 0.0f64; + + for (aa, bb) in a_chunks.iter_mut().zip(b_chunks.iter()) { + let av = Simd::::from_array(*aa); + let bv = Simd::::from_array(*bb); + let rv = av.simd_min(bv); + zero += zero_lanes(rv); + sum += powbits_sum_f32(rv); + *aa = rv.to_array(); + } + + for (to, from) in a_rem.iter_mut().zip(b_rem.iter()) { + let v = (*to).min(*from); + *to = v; + if v == 0 { + zero += 1; + } + sum += f64::from_bits(u64::MAX.wrapping_sub(u64::from(v)) << 54 >> 2); } + + (zero, sum) } } -#[cfg(all( - feature = "packed_simd", - any(target_arch = "x86", target_arch = "x86_64") -))] -use simd::{f32s, u32s, u8s, u8s_sad_out, u8sq, Cast, FromBits, IntoBits, Sad}; + +// Bring the SIMD functions into the parent module’s scope when enabled: +#[cfg(feature = "stdsimd")] +use simd_portable::{simd_intersect_assign, simd_union_assign}; #[cfg(test)] mod test { @@ -588,12 +529,10 @@ mod test { #[test] fn pow_bithack() { - // build the float from x, manipulating it to be the mantissa we want. - // no portability issues in theory https://doc.rust-lang.org/stable/std/primitive.f64.html#method.from_bits for x in 0_u8..65 { let a = 2.0_f64.powi(-(i32::from(x))); - let b = f64::from_bits(u64::max_value().wrapping_sub(u64::from(x)) << 54 >> 2); - let c = f32::from_bits(u32::max_value().wrapping_sub(u32::from(x)) << 25 >> 2); + let b = f64::from_bits(u64::MAX.wrapping_sub(u64::from(x)) << 54 >> 2); + let c = f32::from_bits(u32::MAX.wrapping_sub(u32::from(x)) << 25 >> 2); assert_eq!(a, b); assert_eq!(a, f64::from(c)); } @@ -641,9 +580,6 @@ mod test { for i in 0..f64_to_usize(actual) { hll.push(&i); } - - // assert_eq!(111013.12482663046, hll.len()); - assert!(hll.len() > (actual - (actual * p * 3.0))); assert!(hll.len() < (actual + (actual * p * 3.0))); } diff --git a/src/lib.rs b/src/lib.rs index b918f0e..4fe9436 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![cfg_attr(feature = "stdsimd", feature(portable_simd))] //! SIMD-accelerated implementations of various [streaming algorithms](https://en.wikipedia.org/wiki/Streaming_algorithm). //! //!

diff --git a/src/linked_list.rs b/src/linked_list.rs index 527391a..6e8b745 100644 --- a/src/linked_list.rs +++ b/src/linked_list.rs @@ -96,7 +96,7 @@ impl LinkedList { self.free = idx; } #[inline(always)] - pub fn head(&self) -> Option { + pub fn head(&self) -> Option> { if self.head != usize::max_value() { Some(LinkedListIndex(self.head, marker::PhantomData)) } else { @@ -104,7 +104,7 @@ impl LinkedList { } } #[inline(always)] - pub fn tail(&self) -> Option { + pub fn tail(&self) -> Option> { if self.tail != usize::max_value() { Some(LinkedListIndex(self.tail, marker::PhantomData)) } else { @@ -119,7 +119,7 @@ impl LinkedList { pub fn capacity(&self) -> usize { self.vec.len() } - pub fn push_back(&mut self, t: T) -> LinkedListIndex { + pub fn push_back(&mut self, t: T) -> LinkedListIndex<'_> { let idx = self.alloc(); self.vec[idx] = (self.tail, usize::max_value(), Some(t)); if self.tail != usize::max_value() { @@ -132,7 +132,7 @@ impl LinkedList { self.assert(); LinkedListIndex(idx, marker::PhantomData) } - pub fn push_front(&mut self, t: T) -> LinkedListIndex { + pub fn push_front(&mut self, t: T) -> LinkedListIndex<'_> { let idx = self.alloc(); self.vec[idx] = (usize::max_value(), self.head, Some(t)); if self.head != usize::max_value() { @@ -175,7 +175,7 @@ impl LinkedList { self.assert(); ret } - pub fn insert_after(&mut self, index: LinkedListIndex, t: T) -> LinkedListIndex { + pub fn insert_after(&mut self, index: LinkedListIndex, t: T) -> LinkedListIndex<'_> { let idx = self.alloc(); let next = self.vec[index.0].1; self.vec[idx] = (index.0, next, Some(t)); @@ -189,7 +189,7 @@ impl LinkedList { self.assert(); LinkedListIndex(idx, marker::PhantomData) } - pub fn insert_before(&mut self, index: LinkedListIndex, t: T) -> LinkedListIndex { + pub fn insert_before(&mut self, index: LinkedListIndex, t: T) -> LinkedListIndex<'_> { let idx = self.alloc(); let prev = self.vec[index.0].0; self.vec[idx] = (prev, index.0, Some(t)); diff --git a/src/ordered_linked_list.rs b/src/ordered_linked_list.rs index 771cf2e..3e3c271 100644 --- a/src/ordered_linked_list.rs +++ b/src/ordered_linked_list.rs @@ -41,11 +41,11 @@ impl OrderedLinkedList { assert_eq!(count, self.0.len()); } #[inline(always)] - pub fn head(&self) -> Option { + pub fn head(&self) -> Option> { self.0.head().map(OrderedLinkedListIndex) } #[inline(always)] - pub fn tail(&self) -> Option { + pub fn tail(&self) -> Option> { self.0.tail().map(OrderedLinkedListIndex) } #[inline(always)] @@ -56,7 +56,7 @@ impl OrderedLinkedList { pub fn capacity(&self) -> usize { self.0.capacity() } - pub fn push_back(&mut self, t: T) -> OrderedLinkedListIndex { + pub fn push_back(&mut self, t: T) -> OrderedLinkedListIndex<'_> { if self.0.len() == 0 { return OrderedLinkedListIndex(self.0.push_back(t)); } @@ -74,7 +74,7 @@ impl OrderedLinkedList { self.assert(); ret } - pub fn push_front(&mut self, t: T) -> OrderedLinkedListIndex { + pub fn push_front(&mut self, t: T) -> OrderedLinkedListIndex<'_> { if self.0.len() == 0 { return OrderedLinkedListIndex(self.0.push_front(t)); } @@ -144,12 +144,12 @@ impl OrderedLinkedList { } pub fn insert_after( &mut self, _index: OrderedLinkedListIndex, _t: T, - ) -> OrderedLinkedListIndex { + ) -> OrderedLinkedListIndex<'_> { unimplemented!() } pub fn insert_before( &mut self, _index: OrderedLinkedListIndex, _t: T, - ) -> OrderedLinkedListIndex { + ) -> OrderedLinkedListIndex<'_> { unimplemented!() } #[inline(always)] diff --git a/src/sample.rs b/src/sample.rs index fd63d88..679472d 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -64,25 +64,25 @@ impl FixedCapVec { assert_eq!(self.capacity(), cap); ret } - fn into_iter(self) -> std::vec::IntoIter { + fn into_iter(self) -> vec::IntoIter { self.0.into_iter() } } -impl std::ops::Index for FixedCapVec +impl ops::Index for FixedCapVec where Idx: std::slice::SliceIndex<[T]>, { - type Output = as std::ops::Index>::Output; + type Output = as ops::Index>::Output; fn index(&self, index: Idx) -> &Self::Output { - std::ops::Index::index(&self.0, index) + ops::Index::index(&self.0, index) } } -impl std::ops::IndexMut for FixedCapVec +impl ops::IndexMut for FixedCapVec where Idx: std::slice::SliceIndex<[T]>, { fn index_mut(&mut self, index: Idx) -> &mut Self::Output { - std::ops::IndexMut::index_mut(&mut self.0, index) + ops::IndexMut::index_mut(&mut self.0, index) } } impl fmt::Debug for FixedCapVec From ab4fc98b2a0a659b0248c0d4ce10b073a8b7dac4 Mon Sep 17 00:00:00 2001 From: Jianshu_Zhao <38149286+jianshu93@users.noreply.github.com> Date: Fri, 10 Oct 2025 12:54:14 -0700 Subject: [PATCH 02/16] portable simd --- src/distinct.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/distinct.rs b/src/distinct.rs index 1c63b60..4758467 100644 --- a/src/distinct.rs +++ b/src/distinct.rs @@ -1,6 +1,6 @@ // This file includes source code from https://github.com/jedisct1/rust-hyperloglog/blob/36d73a2c0a324f4122d32febdb19dd4a815147f0/src/hyperloglog/lib.rs under the following BSD 2-Clause "Simplified" License: // -// Copyright (c) 2013-20125, Frank Denis and Jianshu Zhao +// Copyright (c) 2013-2025, Frank Denis and Jianshu Zhao // All rights reserved. // // Redistribution and use in source and binary forms, with or without modification, From b59f29b3a3063679c7adc2f8159d892c172f4efc Mon Sep 17 00:00:00 2001 From: Jianshu_Zhao <38149286+jianshu93@users.noreply.github.com> Date: Fri, 10 Oct 2025 12:56:01 -0700 Subject: [PATCH 03/16] portable simd --- azure-pipelines.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 80fa078..f80e9d7 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -20,7 +20,7 @@ jobs: endpoint: alecmocatta default: rust_toolchain: stable nightly - rust_lint_toolchain: nightly-2020-07-12 + rust_lint_toolchain: nightly-2025-10-09 rust_flags: '' rust_features: '' rust_target_check: 'aarch64-unknown-linux-gnu aarch64-unknown-linux-musl' From 967dc207d05af628e395e6316e803c7ba376fdc0 Mon Sep 17 00:00:00 2001 From: Jianshu_Zhao <38149286+jianshu93@users.noreply.github.com> Date: Fri, 10 Oct 2025 14:45:47 -0700 Subject: [PATCH 04/16] portable simd --- azure-pipelines.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index f80e9d7..80d9601 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -37,7 +37,7 @@ jobs: mac1: imageName: 'macos-latest' rust_toolchain: nightly - rust_features: ';nightly' + rust_features: ';stdsimd' rust_target_build: 'x86_64-unknown-linux-musl i686-unknown-linux-musl aarch64-apple-ios' rust_target_run: 'x86_64-apple-darwin' linux: From e20737415690fe3aa7220fdfd9450299ad4a92bd Mon Sep 17 00:00:00 2001 From: jianshu93 Date: Sat, 11 Oct 2025 15:54:53 -0700 Subject: [PATCH 05/16] add new template --- Cargo.toml | 2 +- azure-pipelines.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e7a71c3..0988008 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "streaming_algorithms" version = "0.3.1" license = "MIT OR Apache-2.0" -authors = ["Alec Mocatta "] +authors = ["Alec Mocatta ", "Jianshu Zhao "] categories = ["data-structures","algorithms","science"] keywords = ["streaming-algorithm","probabilistic","sketch","data-structure","hyperloglog"] description = """ diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 80d9601..266e09e 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -11,7 +11,7 @@ resources: repositories: - repository: templates type: github - name: alecmocatta/azure-pipeline-templates + name: jianshu93/azure-pipeline-templates endpoint: alecmocatta jobs: From 40e6895d0768c797199b052e54e25c4be99a3d22 Mon Sep 17 00:00:00 2001 From: jianshu93 Date: Sat, 11 Oct 2025 20:45:00 -0700 Subject: [PATCH 06/16] update --- src/distinct.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/distinct.rs b/src/distinct.rs index 4758467..b92648c 100644 --- a/src/distinct.rs +++ b/src/distinct.rs @@ -197,7 +197,7 @@ where let new = cmp::max(old, rho); self.zero -= if old == 0 { 1 } else { 0 }; - // accumulate 2^-register via bit trick + // accumulate 2^-register via bit trick. self.sum -= f64::from_bits(u64::MAX.wrapping_sub(u64::from(old)) << 54 >> 2) - f64::from_bits(u64::MAX.wrapping_sub(u64::from(new)) << 54 >> 2); From 9bf04cd33522aacc8b21c983e08d44e0a08c0b34 Mon Sep 17 00:00:00 2001 From: jianshu93 Date: Sat, 11 Oct 2025 20:52:00 -0700 Subject: [PATCH 07/16] update --- src/distinct.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/distinct.rs b/src/distinct.rs index b92648c..4178445 100644 --- a/src/distinct.rs +++ b/src/distinct.rs @@ -430,9 +430,8 @@ impl IntersectPlusUnionIsPlus for HyperLogLog { #[cfg(feature = "stdsimd")] mod simd_portable { use std::simd::{ - prelude::{SimdFloat, SimdOrd, SimdPartialEq}, LaneCount, Simd, SupportedLaneCount + num::SimdUint, prelude::{SimdFloat, SimdOrd, SimdPartialEq}, LaneCount, Simd, SupportedLaneCount }; - use std::simd::num::SimdUint; #[inline] fn powbits_sum_f32(v: Simd) -> f64 From 1f6c1ee015486ac68f438cdac24da0fa3666edc0 Mon Sep 17 00:00:00 2001 From: jianshu93 Date: Sat, 11 Oct 2025 21:04:10 -0700 Subject: [PATCH 08/16] update --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 0988008..755c6ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ edition = "2018" azure-devops = { project = "alecmocatta/streaming_algorithms", pipeline = "tests", build = "16" } maintenance = { status = "actively-developed" } + [features] stdsimd = [] assert = [] From a770410c1eb0d2c7c9e73d4c431d871a639915af Mon Sep 17 00:00:00 2001 From: jianshu93 Date: Sat, 11 Oct 2025 21:12:56 -0700 Subject: [PATCH 09/16] edition 2021 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 755c6ad..4d2c33a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ repository = "https://github.com/alecmocatta/streaming_algorithms" homepage = "https://github.com/alecmocatta/streaming_algorithms" documentation = "https://docs.rs/streaming_algorithms" readme = "README.md" -edition = "2018" +edition = "2021" [badges] azure-devops = { project = "alecmocatta/streaming_algorithms", pipeline = "tests", build = "16" } From ab069bde00b980dd9e466619a4c45412eb4f06a2 Mon Sep 17 00:00:00 2001 From: jianshu93 Date: Sat, 11 Oct 2025 21:18:41 -0700 Subject: [PATCH 10/16] edition 2021 --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4d2c33a..6541dc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ edition = "2021" azure-devops = { project = "alecmocatta/streaming_algorithms", pipeline = "tests", build = "16" } maintenance = { status = "actively-developed" } - [features] stdsimd = [] assert = [] From e1bae73406dcad674c438ed9f2e9ad63a81cad17 Mon Sep 17 00:00:00 2001 From: jianshu93 Date: Sun, 12 Oct 2025 14:31:29 -0700 Subject: [PATCH 11/16] add with_p --- src/distinct.rs | 74 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 4 deletions(-) diff --git a/src/distinct.rs b/src/distinct.rs index 4178445..2762711 100644 --- a/src/distinct.rs +++ b/src/distinct.rs @@ -157,14 +157,30 @@ where pub fn new(error_rate: f64) -> Self { assert!(0.0 < error_rate && error_rate < 1.0); let p = f64_to_u8((f64::log2(1.04 / error_rate) * 2.0).ceil()); - assert!(0 < p && p < 64); + // Clamp/validate against supported tables: + assert!( + (4..=16).contains(&p), + "computed p={} from error_rate is outside supported range [4,16]", + p + ); + Self::with_p(p) + } + + /// Create an empty HLL with exactly 2^p registers. + pub fn with_p(p: u8) -> Self { + // Keep in sync with your bias/threshold tables + assert!( + (4..=16).contains(&p), + "p out of supported range for bias/threshold tables (have 4..=16)" + ); + let m = 1usize << p; let alpha = Self::get_alpha(p); Self { alpha, - zero: 1 << p, - sum: f64::from(1 << p), + zero: m, + sum: m as f64, p, - m: vec![0; 1 << p].into_boxed_slice(), + m: vec![0u8; m].into_boxed_slice(), marker: PhantomData, } } @@ -203,6 +219,26 @@ where *mjr = new; } + /// Update HLL with an already-uniform 64-bit hash + pub fn push_hash64(&mut self, x: u64) { + let j = (x & ((self.m.len() as u64) - 1)) as usize; + let w = x >> self.p; + let rho = Self::get_rho(w, 64 - self.p); + + let mjr = &mut self.m[j]; + let old = *mjr; + let new = old.max(rho); + + if old == 0 { + self.zero -= 1; + } + + // subtract 2^-old and add 2^-new (same bithack used in `push`) + self.sum -= f64::from_bits(u64::MAX.wrapping_sub(old as u64) << 54 >> 2) + - f64::from_bits(u64::MAX.wrapping_sub(new as u64) << 54 >> 2); + + *mjr = new; + } /// Estimated cardinality. pub fn len(&self) -> f64 { @@ -582,4 +618,34 @@ mod test { assert!(hll.len() > (actual - (actual * p * 3.0))); assert!(hll.len() < (actual + (actual * p * 3.0))); } + #[test] + fn push_hash64() { + use std::hash::Hash; + use xxhash_rust::xxh3::Xxh3; + use std::hash::Hasher; + + // Same precision for both sketches + let mut h_from_val = HyperLogLog::new(0.01); + let mut h_from_hash = HyperLogLog::new_from(&h_from_val); + + // Feed 10k distinct items both ways: + // - h_from_val: via `push(&value)` (internally hashes with Xxh3) + // - h_from_hash: via `push_hash64(x)` where x is the same Xxh3 hash + for i in 0u64..10_000 { + let mut hasher = Xxh3::default(); + i.hash(&mut hasher); + let x = hasher.finish(); + + h_from_val.push(&i); + h_from_hash.push_hash64(x); + } + + let a = h_from_val.len(); + let b = h_from_hash.len(); + + // They should be identical (same registers updated in the same order). + assert!((a - b).abs() < f64::EPSILON, "len mismatch: a={a}, b={b}"); + // Quick sanity: ballpark distinct count is close to 10k + assert!(a > 9000.0 && a < 11000.0, "unexpected estimate: {a}"); + } } From 087a6784c406efed83123a647bbed887afe893c9 Mon Sep 17 00:00:00 2001 From: Jianshu_Zhao <38149286+jianshu93@users.noreply.github.com> Date: Mon, 13 Oct 2025 16:19:50 -0700 Subject: [PATCH 12/16] add save and load method --- Cargo.toml | 2 ++ src/distinct.rs | 82 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 6541dc2..8cdcc48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,9 +21,11 @@ maintenance = { status = "actively-developed" } [features] stdsimd = [] assert = [] +serde = [] [dependencies] twox-hash = "1.1" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } serde = { version = "1.0", features = ["derive"] } rand = { version = "0.7", features = ["small_rng"] } +bincode = "1" diff --git a/src/distinct.rs b/src/distinct.rs index 2762711..bba92d1 100644 --- a/src/distinct.rs +++ b/src/distinct.rs @@ -355,6 +355,45 @@ where bias_vector[neighbors].iter().sum::() / 6.0_f64 } + #[cfg(feature = "serde")] + /// Serialize this HLL to a writer using bincode (stream-safe). + pub fn save(&self, mut writer: W) -> std::io::Result<()> { + bincode::serialize_into(&mut writer, &self) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + } + + #[cfg(feature = "serde")] + /// Load an HLL from a reader using bincode, with basic validation. + pub fn load(mut reader: R) -> std::io::Result { + let mut sketch: Self = bincode::deserialize_from(&mut reader) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + // Validate p and register array length + if !(4..=16).contains(&sketch.p) { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid p={} (expected 4..=16)", sketch.p), + )); + } + let expected_m = 1usize << sketch.p; + if sketch.m.len() != expected_m { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("state length {} does not match 2^p ({})", sketch.m.len(), expected_m), + )); + } + + // Recompute derived fields for consistency with register state. + sketch.alpha = Self::get_alpha(sketch.p); + sketch.zero = sketch.m.iter().filter(|&&r| r == 0).count(); + sketch.sum = sketch + .m + .iter() + .map(|&r| f64::from_bits(u64::MAX.wrapping_sub(r as u64) << 54 >> 2)) + .sum::(); + + Ok(sketch) + } fn get_nearest_neighbors(e: f64, estimate_vector: &[f64]) -> Range { let index = estimate_vector .binary_search_by(|a| a.partial_cmp(&e).unwrap_or(Ordering::Equal)) @@ -648,4 +687,47 @@ mod test { // Quick sanity: ballpark distinct count is close to 10k assert!(a > 9000.0 && a < 11000.0, "unexpected estimate: {a}"); } + #[cfg(feature = "serde")] + #[test] + fn hll_save_and_load_roundtrip() { + use std::fs::{remove_file, File}; + use std::io::{BufReader, BufWriter}; + + // Build a sketch and feed it some items + let mut hll = HyperLogLog::with_p(12); // 4096 registers, ~1.6% RSE + for i in 0u64..10_000 { + hll.push(&i); + } + let est_before = hll.len(); + + // Save to disk + let path = "test_hll.bin"; + { + let f = File::create(path).expect("create file"); + let w = BufWriter::new(f); + hll.save(w).expect("save hll"); + } + + // Load back + let loaded = { + let f = File::open(path).expect("open file"); + let r = BufReader::new(f); + HyperLogLog::::load(r).expect("load hll") // the V type is phantom; any V is fine + }; + + // Registers should match exactly + assert_eq!(loaded.p, hll.p); + assert_eq!(loaded.m.len(), hll.m.len()); + assert_eq!(&*loaded.m, &*hll.m, "register arrays differ after load"); + + // Estimate should be (very) close; allow a tiny FP drift + let est_after = loaded.len(); + let diff = (est_after - est_before).abs(); + assert!( + diff <= 1e-12, + "estimate drift too large: before={est_before}, after={est_after}, diff={diff}" + ); + + remove_file(path).ok(); + } } From 4af6768e82799cb6adabc6137dec33924630bb83 Mon Sep 17 00:00:00 2001 From: Jianshu_Zhao <38149286+jianshu93@users.noreply.github.com> Date: Mon, 13 Oct 2025 16:20:30 -0700 Subject: [PATCH 13/16] add save and load method --- src/distinct.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/distinct.rs b/src/distinct.rs index bba92d1..84cc865 100644 --- a/src/distinct.rs +++ b/src/distinct.rs @@ -356,7 +356,7 @@ where } #[cfg(feature = "serde")] - /// Serialize this HLL to a writer using bincode (stream-safe). + /// Serialize this HLL to a writer using bincode. pub fn save(&self, mut writer: W) -> std::io::Result<()> { bincode::serialize_into(&mut writer, &self) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) From 90707570f7748de8fecffa34959be6721303d57d Mon Sep 17 00:00:00 2001 From: Jianshu_Zhao <38149286+jianshu93@users.noreply.github.com> Date: Mon, 13 Oct 2025 16:38:26 -0700 Subject: [PATCH 14/16] add save and load method --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8cdcc48..1da19f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "streaming_algorithms" -version = "0.3.1" +version = "0.3.2" license = "MIT OR Apache-2.0" authors = ["Alec Mocatta ", "Jianshu Zhao "] categories = ["data-structures","algorithms","science"] From f1a41628ee97fbc8f9a7a856eb084628d4cabf27 Mon Sep 17 00:00:00 2001 From: jianshu93 Date: Mon, 13 Oct 2025 23:49:15 -0700 Subject: [PATCH 15/16] save and load bug --- src/distinct.rs | 111 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 102 insertions(+), 9 deletions(-) diff --git a/src/distinct.rs b/src/distinct.rs index 84cc865..a519756 100644 --- a/src/distinct.rs +++ b/src/distinct.rs @@ -365,25 +365,42 @@ where #[cfg(feature = "serde")] /// Load an HLL from a reader using bincode, with basic validation. pub fn load(mut reader: R) -> std::io::Result { - let mut sketch: Self = bincode::deserialize_from(&mut reader) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + use std::io; + use bincode::ErrorKind as BinErr; + + // 1) Deserialize and propagate IO kinds (including UnexpectedEof) + let mut sketch: Self = match bincode::deserialize_from::<_, Self>(&mut reader) { + Ok(s) => s, + Err(e) => { + match *e { + BinErr::Io(ref ioe) => { + // Recreate an io::Error with the same kind & message. + return Err(io::Error::new(ioe.kind(), ioe.to_string())); + } + _ => { + // Non-IO bincode error. + return Err(io::Error::new(io::ErrorKind::Other, e)); + } + } + } + }; - // Validate p and register array length + // 2) Validate p and register length if !(4..=16).contains(&sketch.p) { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, + return Err(io::Error::new( + io::ErrorKind::InvalidData, format!("invalid p={} (expected 4..=16)", sketch.p), )); } let expected_m = 1usize << sketch.p; if sketch.m.len() != expected_m { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, + return Err(io::Error::new( + io::ErrorKind::InvalidData, format!("state length {} does not match 2^p ({})", sketch.m.len(), expected_m), )); } - // Recompute derived fields for consistency with register state. + // 3) Recompute derived fields from the registers sketch.alpha = Self::get_alpha(sketch.p); sketch.zero = sketch.m.iter().filter(|&&r| r == 0).count(); sketch.sum = sketch @@ -394,6 +411,7 @@ where Ok(sketch) } + fn get_nearest_neighbors(e: f64, estimate_vector: &[f64]) -> Range { let index = estimate_vector .binary_search_by(|a| a.partial_cmp(&e).unwrap_or(Ordering::Equal)) @@ -600,7 +618,7 @@ use simd_portable::{simd_intersect_assign, simd_union_assign}; mod test { use super::{super::f64_to_usize, HyperLogLog}; use std::f64; - + use std::io::Write; #[test] fn pow_bithack() { for x in 0_u8..65 { @@ -730,4 +748,79 @@ mod test { remove_file(path).ok(); } + #[cfg(feature = "serde")] + #[test] + fn hll_save_two_then_load_back_streaming_cardinality() -> std::io::Result<()> { + use super::HyperLogLog; + use std::fs::{remove_file, File}; + use std::io::{BufReader, BufWriter}; + + // Two sketches with 2^12 registers + let mut s1 = HyperLogLog::::with_p(12); + let mut s2 = HyperLogLog::::with_p(12); + + // s1: 0..10_000 (10k uniques) + for i in 0u64..10_000 { + s1.push(&i); + } + // s2: 5_000..15_000 (10k uniques), 5k overlap with s1 + for i in 5_000u64..15_000 { + s2.push(&i); + } + + // Ground-truth cardinalities + let t1 = 10_000.0; + let t2 = 10_000.0; + let tu = 15_000.0; + let rel_tol = 0.05; // 5% tolerance (>> ~1.6% RSE for p=12) + + // Check before saving + let e1 = s1.len(); + let e2 = s2.len(); + let mut u = s1.clone(); + u.union(&s2); + let eu = u.len(); + + assert!((e1 - t1).abs() <= t1 * rel_tol, "s1 est={} truth={}", e1, t1); + assert!((e2 - t2).abs() <= t2 * rel_tol, "s2 est={} truth={}", e2, t2); + assert!((eu - tu).abs() <= tu * rel_tol, "union est={} truth={}", eu, tu); + + // Save both sketches back-to-back + let path = "test_hll_stream_two_card.bin"; + { + let f = File::create(path)?; + let mut w = BufWriter::new(f); + s1.save(&mut w)?; + s2.save(&mut w)?; + w.flush()?; + } + + // Load both back + let (s1b, s2b) = { + let f = File::open(path)?; + let mut r = BufReader::new(f); + let a = HyperLogLog::::load(&mut r)?; + let b = HyperLogLog::::load(&mut r)?; + (a, b) + }; + + // Check after loading (still close to truth)... + let e1b = s1b.len(); + let e2b = s2b.len(); + let mut ub = s1b.clone(); + ub.union(&s2b); + let eub = ub.len(); + + assert!((e1b - t1).abs() <= t1 * rel_tol, "s1(load) est={} truth={}", e1b, t1); + assert!((e2b - t2).abs() <= t2 * rel_tol, "s2(load) est={} truth={}", e2b, t2); + assert!((eub - tu).abs() <= tu * rel_tol, "union(load) est={} truth={}", eub, tu); + + // ...and unchanged by serialization + assert!((e1 - e1b).abs() < 1e-9, "s1 est changed by save/load"); + assert!((e2 - e2b).abs() < 1e-9, "s2 est changed by save/load"); + assert!((eu - eub).abs() < 1e-9, "union est changed by save/load"); + + remove_file(path).ok(); + Ok(()) + } } From e6ff8a79c4a3e4e13d7dc05fb6ea3d2446ddea3c Mon Sep 17 00:00:00 2001 From: jianshu93 Date: Mon, 13 Oct 2025 23:49:29 -0700 Subject: [PATCH 16/16] save and load bug --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1da19f8..94fe62e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "streaming_algorithms" -version = "0.3.2" +version = "0.3.3" license = "MIT OR Apache-2.0" authors = ["Alec Mocatta ", "Jianshu Zhao "] categories = ["data-structures","algorithms","science"]