Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 91 additions & 34 deletions QuantileFlow/ddsketch/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Core DDSketch implementation."""
"""Core DDSketch implementation.

Optimized for high throughput with efficient bucket indexing and quantile queries.
"""

from typing import Literal, Union
from .mapping.logarithmic import LogarithmicMapping
Expand All @@ -8,6 +11,7 @@
from .storage.contiguous import ContiguousStorage
from .storage.sparse import SparseStorage


class DDSketch:
"""
DDSketch implementation for quantile approximation with relative-error guarantees.
Expand All @@ -21,6 +25,9 @@ class DDSketch:
by Charles Masson, Jee E. Rim and Homin K. Lee
"""

__slots__ = ('relative_accuracy', 'cont_neg', 'mapping', 'positive_store',
'negative_store', 'count', 'zero_count', '_min', '_max', '_sum')

def __init__(
self,
relative_accuracy: float,
Expand Down Expand Up @@ -54,7 +61,6 @@ def __init__(
self.relative_accuracy = relative_accuracy
self.cont_neg = cont_neg


# Initialize mapping scheme
if mapping_type == 'logarithmic':
self.mapping = LogarithmicMapping(relative_accuracy)
Expand All @@ -71,31 +77,53 @@ def __init__(
self.positive_store = SparseStorage(strategy=bucket_strategy)
self.negative_store = SparseStorage(strategy=bucket_strategy) if cont_neg else None

self.count = 0
self.zero_count = 0
self.count = 0.0
self.zero_count = 0.0

# Summary stats
self._min = float('+inf')
self._max = float('-inf')
self._sum = 0.0

def insert(self, value: Union[int, float]) -> None:
def insert(self, value: Union[int, float], weight: float = 1.0) -> None:
"""
Insert a value into the sketch.

Args:
value: The value to insert.
weight: The weight of the value (default 1.0).

Raises:
ValueError: If value is negative and cont_neg is False.
"""
# Cache method lookups for hot path optimization
if value > 0:
bucket_idx = self.mapping.compute_bucket_index(value)
self.positive_store.add(bucket_idx)
# Most common case: positive values
# Inline the hot path with cached local references
compute_idx = self.mapping.compute_bucket_index
self.positive_store.add(compute_idx(value), weight)
elif value < 0:
if self.cont_neg:
bucket_idx = self.mapping.compute_bucket_index(-value)
self.negative_store.add(bucket_idx)
compute_idx = self.mapping.compute_bucket_index
self.negative_store.add(compute_idx(-value), weight)
else:
raise ValueError("Negative values not supported when cont_neg is False")
else:
self.zero_count += 1
self.count += 1
self.zero_count += weight

# Track summary stats - combined update
self.count += weight
self._sum += value * weight
# Update min/max - use local to avoid repeated attribute access
if value < self._min:
self._min = value
if value > self._max:
self._max = value

# Alias for API compatibility
def add(self, value: Union[int, float], weight: float = 1.0) -> None:
"""Alias for insert()."""
self.insert(value, weight)

def delete(self, value: Union[int, float]) -> None:
"""
Expand Down Expand Up @@ -125,6 +153,7 @@ def delete(self, value: Union[int, float]) -> None:

if deleted:
self.count -= 1
self._sum -= value

def quantile(self, q: float) -> float:
"""
Expand All @@ -146,32 +175,52 @@ def quantile(self, q: float) -> float:

rank = q * (self.count - 1)

if self.cont_neg:
neg_count = self.negative_store.total_count
if self.cont_neg and self.negative_store is not None:
neg_count = self.negative_store.count
if rank < neg_count:
# Handle negative values
curr_count = 0
if self.negative_store.min_index is not None:
for idx in range(self.negative_store.max_index, self.negative_store.min_index - 1, -1):
bucket_count = self.negative_store.get_count(idx)
curr_count += bucket_count
if curr_count > rank:
return -self.mapping.compute_value_from_index(idx)
# Handle negative values - use reversed rank
reversed_rank = neg_count - rank - 1
key = self.negative_store.key_at_rank(reversed_rank, lower=False)
return -self.mapping.compute_value_from_index(key)
rank -= neg_count

if rank < self.zero_count:
return 0
return 0.0
rank -= self.zero_count

curr_count = 0
if self.positive_store.min_index is not None:
for idx in range(self.positive_store.min_index, self.positive_store.max_index + 1):
bucket_count = self.positive_store.get_count(idx)
curr_count += bucket_count
if curr_count > rank:
return self.mapping.compute_value_from_index(idx)

return float('inf')
# Use key_at_rank for consistency with storage implementation
key = self.positive_store.key_at_rank(rank)
return self.mapping.compute_value_from_index(key)

# Alias for API compatibility
def get_quantile_value(self, quantile: float) -> float:
"""Alias for quantile()."""
try:
return self.quantile(quantile)
except ValueError:
return None

@property
def avg(self) -> float:
"""Return the exact average of values added to the sketch."""
if self.count == 0:
return 0.0
return self._sum / self.count

@property
def sum(self) -> float:
"""Return the exact sum of values added to the sketch."""
return self._sum

@property
def min(self) -> float:
"""Return the minimum value added to the sketch."""
return self._min

@property
def max(self) -> float:
"""Return the maximum value added to the sketch."""
return self._max

def merge(self, other: 'DDSketch') -> None:
"""
Expand All @@ -185,12 +234,20 @@ def merge(self, other: 'DDSketch') -> None:
"""
if self.relative_accuracy != other.relative_accuracy:
raise ValueError("Cannot merge sketches with different relative accuracies")

if other.count == 0:
return

self.positive_store.merge(other.positive_store)
if self.cont_neg and other.cont_neg:
if self.cont_neg and other.cont_neg and other.negative_store is not None:
self.negative_store.merge(other.negative_store)
elif other.cont_neg and sum(other.negative_store.counts.values()) > 0:
elif other.cont_neg and other.negative_store is not None and other.negative_store.count > 0:
raise ValueError("Cannot merge sketch containing negative values when cont_neg is False")

self.zero_count += other.zero_count
self.count += other.count
self.count += other.count
self._sum += other._sum
if other._min < self._min:
self._min = other._min
if other._max > self._max:
self._max = other._max
12 changes: 3 additions & 9 deletions QuantileFlow/ddsketch/mapping/cubic_interpolation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
"""
This file contains a Python implementation of the cubic interpolation mapping algorithm
described in Datadog's Java DDSketch implementation (https://github.com/DataDog/sketches-java).

Original work Copyright 2021 Datadog, Inc.
Licensed under Apache License 2.0 (http://www.apache.org/licenses/LICENSE-2.0)

Cubic interpolation mapping scheme for DDSketch.

This implementation approximates the memory-optimal logarithmic mapping by:
1. Extracting the floor value of log2 from binary representation
Expand Down Expand Up @@ -34,8 +29,7 @@ def __init__(self, relative_accuracy: float):

# Multiplier m = 7/(10*log(2)) ≈ 1.01
# This gives us the minimum multiplier that maintains relative accuracy guarantee
# Divide by C as per Datadog's implementation
self.m = 1/ (self.C * math.log(2))
self.m = 1 / (self.C * math.log(2))

def _extract_exponent_and_significand(self, value: float) -> tuple[int, float]:
"""
Expand All @@ -55,7 +49,7 @@ def _cubic_interpolation(self, s: float) -> float:
Compute the cubic interpolation P(s) = As³ + Bs² + Cs
where s is the normalized significand in [0, 1).
"""
# Use Datadog's order of operations for better numerical stability
# Use Horner's method for better numerical stability
return s * (self.C + s * (self.B + s * self.A))

def compute_bucket_index(self, value: float) -> int:
Expand Down
27 changes: 24 additions & 3 deletions QuantileFlow/ddsketch/mapping/logarithmic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,40 @@
import math
from .base import MappingScheme


class LogarithmicMapping(MappingScheme):
"""
A memory-optimal KeyMapping that uses logarithmic mapping.

Given a targeted relative accuracy, it requires the least number of keys
to cover a given range of values.
"""
__slots__ = ('relative_accuracy', 'gamma', 'multiplier')

def __init__(self, relative_accuracy: float):
self.relative_accuracy = relative_accuracy
self.gamma = (1 + relative_accuracy) / (1 - relative_accuracy)
self.multiplier = 1 / math.log(self.gamma)

def key(self, value: float) -> int:
"""Alias for compute_bucket_index for API compatibility."""
return self.compute_bucket_index(value)

def value(self, key: int) -> float:
"""Alias for compute_value_from_index for API compatibility."""
return self.compute_value_from_index(key)

def compute_bucket_index(self, value: float) -> int:
# ceil(log_gamma(value) = ceil(log(value) / log(gamma))
"""Compute the bucket index for a given value.

ceil(log_gamma(value)) = ceil(log(value) / log(gamma))
"""
return math.ceil(math.log(value) * self.multiplier)

def compute_value_from_index(self, index: int) -> float:
# Return geometric mean of bucket boundaries
# This ensures the relative error is bounded by relative_accuracy
"""Compute the representative value for a given bucket index.

Returns the geometric mean of bucket boundaries to ensure
the relative error is bounded by relative_accuracy.
"""
return math.pow(self.gamma, index) * (2.0 / (1.0 + self.gamma))
Loading