diff --git a/QuantileFlow/ddsketch/core.py b/QuantileFlow/ddsketch/core.py index d512790..0e756ec 100644 --- a/QuantileFlow/ddsketch/core.py +++ b/QuantileFlow/ddsketch/core.py @@ -1,4 +1,7 @@ -"""Core DDSketch implementation.""" +"""Core DDSketch implementation. + +Optimized for high throughput with efficient bucket indexing and quantile queries. +""" from typing import Literal, Union from .mapping.logarithmic import LogarithmicMapping @@ -8,6 +11,7 @@ from .storage.contiguous import ContiguousStorage from .storage.sparse import SparseStorage + class DDSketch: """ DDSketch implementation for quantile approximation with relative-error guarantees. @@ -21,6 +25,9 @@ class DDSketch: by Charles Masson, Jee E. Rim and Homin K. Lee """ + __slots__ = ('relative_accuracy', 'cont_neg', 'mapping', 'positive_store', + 'negative_store', 'count', 'zero_count', '_min', '_max', '_sum') + def __init__( self, relative_accuracy: float, @@ -54,7 +61,6 @@ def __init__( self.relative_accuracy = relative_accuracy self.cont_neg = cont_neg - # Initialize mapping scheme if mapping_type == 'logarithmic': self.mapping = LogarithmicMapping(relative_accuracy) @@ -71,31 +77,53 @@ def __init__( self.positive_store = SparseStorage(strategy=bucket_strategy) self.negative_store = SparseStorage(strategy=bucket_strategy) if cont_neg else None - self.count = 0 - self.zero_count = 0 + self.count = 0.0 + self.zero_count = 0.0 + + # Summary stats + self._min = float('+inf') + self._max = float('-inf') + self._sum = 0.0 - def insert(self, value: Union[int, float]) -> None: + def insert(self, value: Union[int, float], weight: float = 1.0) -> None: """ Insert a value into the sketch. Args: value: The value to insert. + weight: The weight of the value (default 1.0). Raises: ValueError: If value is negative and cont_neg is False. """ + # Cache method lookups for hot path optimization if value > 0: - bucket_idx = self.mapping.compute_bucket_index(value) - self.positive_store.add(bucket_idx) + # Most common case: positive values + # Inline the hot path with cached local references + compute_idx = self.mapping.compute_bucket_index + self.positive_store.add(compute_idx(value), weight) elif value < 0: if self.cont_neg: - bucket_idx = self.mapping.compute_bucket_index(-value) - self.negative_store.add(bucket_idx) + compute_idx = self.mapping.compute_bucket_index + self.negative_store.add(compute_idx(-value), weight) else: raise ValueError("Negative values not supported when cont_neg is False") else: - self.zero_count += 1 - self.count += 1 + self.zero_count += weight + + # Track summary stats - combined update + self.count += weight + self._sum += value * weight + # Update min/max - use local to avoid repeated attribute access + if value < self._min: + self._min = value + if value > self._max: + self._max = value + + # Alias for API compatibility + def add(self, value: Union[int, float], weight: float = 1.0) -> None: + """Alias for insert().""" + self.insert(value, weight) def delete(self, value: Union[int, float]) -> None: """ @@ -125,6 +153,7 @@ def delete(self, value: Union[int, float]) -> None: if deleted: self.count -= 1 + self._sum -= value def quantile(self, q: float) -> float: """ @@ -146,32 +175,52 @@ def quantile(self, q: float) -> float: rank = q * (self.count - 1) - if self.cont_neg: - neg_count = self.negative_store.total_count + if self.cont_neg and self.negative_store is not None: + neg_count = self.negative_store.count if rank < neg_count: - # Handle negative values - curr_count = 0 - if self.negative_store.min_index is not None: - for idx in range(self.negative_store.max_index, self.negative_store.min_index - 1, -1): - bucket_count = self.negative_store.get_count(idx) - curr_count += bucket_count - if curr_count > rank: - return -self.mapping.compute_value_from_index(idx) + # Handle negative values - use reversed rank + reversed_rank = neg_count - rank - 1 + key = self.negative_store.key_at_rank(reversed_rank, lower=False) + return -self.mapping.compute_value_from_index(key) rank -= neg_count if rank < self.zero_count: - return 0 + return 0.0 rank -= self.zero_count - curr_count = 0 - if self.positive_store.min_index is not None: - for idx in range(self.positive_store.min_index, self.positive_store.max_index + 1): - bucket_count = self.positive_store.get_count(idx) - curr_count += bucket_count - if curr_count > rank: - return self.mapping.compute_value_from_index(idx) - - return float('inf') + # Use key_at_rank for consistency with storage implementation + key = self.positive_store.key_at_rank(rank) + return self.mapping.compute_value_from_index(key) + + # Alias for API compatibility + def get_quantile_value(self, quantile: float) -> float: + """Alias for quantile().""" + try: + return self.quantile(quantile) + except ValueError: + return None + + @property + def avg(self) -> float: + """Return the exact average of values added to the sketch.""" + if self.count == 0: + return 0.0 + return self._sum / self.count + + @property + def sum(self) -> float: + """Return the exact sum of values added to the sketch.""" + return self._sum + + @property + def min(self) -> float: + """Return the minimum value added to the sketch.""" + return self._min + + @property + def max(self) -> float: + """Return the maximum value added to the sketch.""" + return self._max def merge(self, other: 'DDSketch') -> None: """ @@ -185,12 +234,20 @@ def merge(self, other: 'DDSketch') -> None: """ if self.relative_accuracy != other.relative_accuracy: raise ValueError("Cannot merge sketches with different relative accuracies") + + if other.count == 0: + return self.positive_store.merge(other.positive_store) - if self.cont_neg and other.cont_neg: + if self.cont_neg and other.cont_neg and other.negative_store is not None: self.negative_store.merge(other.negative_store) - elif other.cont_neg and sum(other.negative_store.counts.values()) > 0: + elif other.cont_neg and other.negative_store is not None and other.negative_store.count > 0: raise ValueError("Cannot merge sketch containing negative values when cont_neg is False") self.zero_count += other.zero_count - self.count += other.count \ No newline at end of file + self.count += other.count + self._sum += other._sum + if other._min < self._min: + self._min = other._min + if other._max > self._max: + self._max = other._max \ No newline at end of file diff --git a/QuantileFlow/ddsketch/mapping/cubic_interpolation.py b/QuantileFlow/ddsketch/mapping/cubic_interpolation.py index c8db29a..53f46df 100644 --- a/QuantileFlow/ddsketch/mapping/cubic_interpolation.py +++ b/QuantileFlow/ddsketch/mapping/cubic_interpolation.py @@ -1,10 +1,5 @@ """ -This file contains a Python implementation of the cubic interpolation mapping algorithm -described in Datadog's Java DDSketch implementation (https://github.com/DataDog/sketches-java). - -Original work Copyright 2021 Datadog, Inc. -Licensed under Apache License 2.0 (http://www.apache.org/licenses/LICENSE-2.0) - +Cubic interpolation mapping scheme for DDSketch. This implementation approximates the memory-optimal logarithmic mapping by: 1. Extracting the floor value of log2 from binary representation @@ -34,8 +29,7 @@ def __init__(self, relative_accuracy: float): # Multiplier m = 7/(10*log(2)) ≈ 1.01 # This gives us the minimum multiplier that maintains relative accuracy guarantee - # Divide by C as per Datadog's implementation - self.m = 1/ (self.C * math.log(2)) + self.m = 1 / (self.C * math.log(2)) def _extract_exponent_and_significand(self, value: float) -> tuple[int, float]: """ @@ -55,7 +49,7 @@ def _cubic_interpolation(self, s: float) -> float: Compute the cubic interpolation P(s) = As³ + Bs² + Cs where s is the normalized significand in [0, 1). """ - # Use Datadog's order of operations for better numerical stability + # Use Horner's method for better numerical stability return s * (self.C + s * (self.B + s * self.A)) def compute_bucket_index(self, value: float) -> int: diff --git a/QuantileFlow/ddsketch/mapping/logarithmic.py b/QuantileFlow/ddsketch/mapping/logarithmic.py index a495921..e22f9c2 100644 --- a/QuantileFlow/ddsketch/mapping/logarithmic.py +++ b/QuantileFlow/ddsketch/mapping/logarithmic.py @@ -3,19 +3,40 @@ import math from .base import MappingScheme + class LogarithmicMapping(MappingScheme): + """ + A memory-optimal KeyMapping that uses logarithmic mapping. + + Given a targeted relative accuracy, it requires the least number of keys + to cover a given range of values. + """ __slots__ = ('relative_accuracy', 'gamma', 'multiplier') def __init__(self, relative_accuracy: float): self.relative_accuracy = relative_accuracy self.gamma = (1 + relative_accuracy) / (1 - relative_accuracy) self.multiplier = 1 / math.log(self.gamma) + + def key(self, value: float) -> int: + """Alias for compute_bucket_index for API compatibility.""" + return self.compute_bucket_index(value) + + def value(self, key: int) -> float: + """Alias for compute_value_from_index for API compatibility.""" + return self.compute_value_from_index(key) def compute_bucket_index(self, value: float) -> int: - # ceil(log_gamma(value) = ceil(log(value) / log(gamma)) + """Compute the bucket index for a given value. + + ceil(log_gamma(value)) = ceil(log(value) / log(gamma)) + """ return math.ceil(math.log(value) * self.multiplier) def compute_value_from_index(self, index: int) -> float: - # Return geometric mean of bucket boundaries - # This ensures the relative error is bounded by relative_accuracy + """Compute the representative value for a given bucket index. + + Returns the geometric mean of bucket boundaries to ensure + the relative error is bounded by relative_accuracy. + """ return math.pow(self.gamma, index) * (2.0 / (1.0 + self.gamma)) \ No newline at end of file diff --git a/QuantileFlow/ddsketch/storage/contiguous.py b/QuantileFlow/ddsketch/storage/contiguous.py index 7ed33dd..e9027d9 100644 --- a/QuantileFlow/ddsketch/storage/contiguous.py +++ b/QuantileFlow/ddsketch/storage/contiguous.py @@ -1,121 +1,234 @@ -"""Contiguous array storage implementation for DDSketch using circular buffer.""" +"""Contiguous array storage implementation for DDSketch using offset-based indexing. -import numpy as np +Optimized for high throughput by using Python lists instead of numpy arrays +and chunk-based dynamic growth pattern. +""" + +import math import warnings -from .base import Storage, BucketManagementStrategy +from .base import Storage + + +# Chunk size for dynamic growth +CHUNK_SIZE = 128 + class ContiguousStorage(Storage): """ - Contiguous array storage for DDSketch using a circular buffer. + Contiguous storage for DDSketch using offset-based direct indexing. + + Uses a simple offset scheme where: + array_index = bucket_index - offset - Uses a bucket mapping scheme where: - bucket_array_index = (bucket_index - min_bucket_index + arr_index_of_min_bucket) % num_buckets + This implementation is optimized for high throughput by: + - Using Python lists instead of numpy arrays for faster scalar operations + - Growing dynamically in chunks rather than pre-allocating + - Minimizing attribute access in the hot path Implements collapsing strategy where: - - If inserting below min: collapse if range too large, otherwise adjust min + - If inserting below min: shift array or collapse if range too large - If inserting above max: collapse lowest buckets to make room """ - __slots__ = ('total_count', 'counts', 'min_index', 'max_index', - 'num_buckets', 'arr_index_of_min_bucket', 'collapse_count', - 'max_buckets', 'bucket_mask', 'strategy') + __slots__ = ('count', 'bins', 'min_key', 'max_key', + 'offset', 'collapse_count', 'bin_limit', + 'chunk_size', 'is_collapsed', + '_cumulative_sums', '_cumulative_valid') - def __init__(self, max_buckets: int = 2048): + def __init__(self, bin_limit: int = 2048, chunk_size: int = CHUNK_SIZE, max_buckets: int = None): """ Initialize contiguous storage. Args: - max_buckets: Maximum number of buckets (default 2048). + bin_limit: Maximum number of bins (default 2048). + chunk_size: Size of chunks for dynamic growth (default 128). + max_buckets: Alias for bin_limit for API compatibility. """ - if max_buckets <= 0: - raise ValueError("max_buckets must be positive for ContiguousStorage") - super().__init__(max_buckets, BucketManagementStrategy.FIXED) - self.total_count = 0 - self.counts = np.zeros(max_buckets, dtype=np.int64) - self.max_buckets = max_buckets - self.min_index = None # Minimum bucket index seen - self.max_index = None # Maximum bucket index seen - self.num_buckets = 0 # Number of non-zero buckets - self.arr_index_of_min_bucket = 0 # Array index where min bucket is stored - self.collapse_count = 0 # Number of times buckets have been collapsed - - def add(self, bucket_index: int, count: int = 1): + # Support max_buckets as alias for bin_limit + if max_buckets is not None: + bin_limit = max_buckets + + if bin_limit <= 0: + raise ValueError("bin_limit must be positive for ContiguousStorage") + + # Don't call super().__init__ to avoid overhead - inline what we need + self.count = 0.0 # Use float for weighted values + self.bins = [] # Start empty, grow dynamically + self.bin_limit = bin_limit + self.chunk_size = chunk_size + self.min_key = None # Will use special infinity values + self.max_key = None + self.offset = 0 + self.collapse_count = 0 + self.is_collapsed = False + # Lazy cumulative sums for O(log n) quantile queries + self._cumulative_sums = [] + self._cumulative_valid = False + + @property + def total_count(self): + """Alias for count to maintain API compatibility.""" + return self.count + + @total_count.setter + def total_count(self, value): + self.count = value + + @property + def min_index(self): + """Alias for min_key to maintain API compatibility.""" + return self.min_key + + @property + def max_index(self): + """Alias for max_key to maintain API compatibility.""" + return self.max_key + + @property + def max_buckets(self): + """Alias for bin_limit to maintain API compatibility.""" + return self.bin_limit + + @property + def num_buckets(self): + """Lazily compute the number of non-zero buckets.""" + if not self.bins: + return 0 + return sum(1 for b in self.bins if b > 0) + + @property + def counts(self): + """Return bins for API compatibility (used by some code paths).""" + return self.bins + + def length(self): + """Return the number of bins.""" + return len(self.bins) + + def _get_new_length(self, new_min_key, new_max_key): + """Calculate new length needed, respecting bin_limit.""" + desired_length = new_max_key - new_min_key + 1 + chunk_length = self.chunk_size * int(math.ceil(desired_length / self.chunk_size)) + return min(chunk_length, self.bin_limit) + + def add(self, key, weight=1.0): """ - Add count to bucket_index using new collapsing strategy. + Add weight to the bin at key. Args: - bucket_index: The bucket index to add to. - count: The count to add (default 1). + key: The bucket index to add to. + weight: The weight to add (default 1.0). """ - - if self.min_index is None: + idx = self._get_index(key) + self.bins[idx] += weight + self.count += weight + self._cumulative_valid = False + + def _get_index(self, key): + """Calculate the bin index for the key, extending the range if necessary. + + Optimized for the common case where key is within the existing range. + """ + # Fast path: key is within existing range (most common case) + min_key = self.min_key + if min_key is not None and min_key <= key <= self.max_key: + return key - self.offset + + # Slow path: need to extend range or handle edge cases + if min_key is None: # First insertion - self.min_index = bucket_index - self.max_index = bucket_index - self.counts[0] = count - self.num_buckets = 1 - self.arr_index_of_min_bucket = 0 + self._extend_range(key) + elif key < min_key: + if self.is_collapsed: + return 0 + self._extend_range(key) + if self.is_collapsed: + return 0 + else: # key > self.max_key + self._extend_range(key) + + return key - self.offset + + def _extend_range(self, key, second_key=None): + """Grow the bins as necessary and adjust.""" + if second_key is None: + second_key = key + + if self.min_key is None: + new_min_key = min(key, second_key) + new_max_key = max(key, second_key) else: - if bucket_index < self.min_index: - new_range = self.max_index - bucket_index + 1 - # Handle insertion below current minimum - if new_range > self.max_buckets: - # Range too large, collapse into min bucket - pos = (self.arr_index_of_min_bucket) % self.max_buckets - self.counts[pos] += count - self.collapse_count += 1 - else: - # Update min and place value - shift = self.min_index - bucket_index - self.min_index = bucket_index - self.arr_index_of_min_bucket = self.arr_index_of_min_bucket - shift - pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets - self.counts[pos] = count - self.num_buckets += 1 - - elif bucket_index > self.max_index: - new_range = bucket_index - self.min_index + 1 - if new_range > self.max_buckets: - # Handle insertion above current maximum - buckets_to_collapse = bucket_index - self.max_index - # Collapse lowest buckets - collapse_sum = 0 - for i in range(buckets_to_collapse): - if i >= self.max_index - self.min_index + 1: - warnings.warn("Collapsing all buckets in the sketch. " - "Range is too large to be contained by the buckets allocated, " - "and you should increase max_buckets.", UserWarning) - break - pos = i + self.arr_index_of_min_bucket - collapse_sum += self.counts[pos] - self.counts[pos] = 0 - - # Add collapsed values to new min bucket - new_min = self.min_index + buckets_to_collapse - new_min_pos = (buckets_to_collapse + self.arr_index_of_min_bucket) % self.max_buckets - self.counts[new_min_pos] += collapse_sum - - # Update tracking variables - self.min_index = new_min - self.arr_index_of_min_bucket = new_min_pos - self.collapse_count += buckets_to_collapse - - # Place new value - self.max_index = bucket_index - pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets - was_zero = self.counts[pos] == 0 - self.counts[pos] += count - if was_zero: - self.num_buckets += 1 + new_min_key = min(key, second_key, self.min_key) + new_max_key = max(key, second_key, self.max_key) + + if self.length() == 0: + # Initialize bins + new_length = self._get_new_length(new_min_key, new_max_key) + self.bins = [0.0] * new_length + self.offset = new_min_key + self._adjust(new_min_key, new_max_key) + elif new_min_key >= self.min_key and new_max_key < self.offset + self.length(): + # No need to change the range; just update min/max keys + self.min_key = new_min_key + self.max_key = new_max_key + else: + # Grow the bins + new_length = self._get_new_length(new_min_key, new_max_key) + if new_length > self.length(): + self.bins.extend([0.0] * (new_length - self.length())) + self._adjust(new_min_key, new_max_key) + + def _adjust(self, new_min_key, new_max_key): + """ + Adjust the bins, the offset, the min_key, and max_key. + Collapse to the left if necessary (lowest bins collapsed). + """ + if new_max_key - new_min_key + 1 > self.length(): + # The range of keys is too wide, the lowest bins need to be collapsed + new_min_key = new_max_key - self.length() + 1 + + if self.min_key is None or new_min_key >= self.max_key: + # Put everything in the first bin + self.offset = new_min_key + self.min_key = new_min_key + self.bins[:] = [0.0] * self.length() + self.bins[0] = self.count else: - # Normal insertion within current range - pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets - was_zero = self.counts[pos] == 0 - self.counts[pos] += count - if was_zero: - self.num_buckets += 1 - - self.total_count += count + shift = self.offset - new_min_key + if shift < 0: + collapse_start_index = self.min_key - self.offset + collapse_end_index = new_min_key - self.offset + collapsed_count = sum(self.bins[collapse_start_index:collapse_end_index]) + self.bins[collapse_start_index:collapse_end_index] = [0.0] * (new_min_key - self.min_key) + self.bins[collapse_end_index] += collapsed_count + self.min_key = new_min_key + self._shift_bins(shift) + else: + self.min_key = new_min_key + self._shift_bins(shift) + + self.max_key = new_max_key + self.is_collapsed = True + self.collapse_count += 1 + else: + self._center_bins(new_min_key, new_max_key) + self.min_key = new_min_key + self.max_key = new_max_key + + def _shift_bins(self, shift): + """Shift the bins; this changes the offset.""" + if shift > 0: + self.bins = self.bins[:-shift] if shift < len(self.bins) else [] + self.bins[:0] = [0.0] * shift + elif shift < 0: + self.bins = self.bins[abs(shift):] + self.bins.extend([0.0] * abs(shift)) + self.offset -= shift + + def _center_bins(self, new_min_key, new_max_key): + """Center the bins; this changes the offset.""" + middle_key = new_min_key + (new_max_key - new_min_key + 1) // 2 + self._shift_bins(self.offset + self.length() // 2 - middle_key) def remove(self, bucket_index: int, count: int = 1) -> bool: """ @@ -128,43 +241,43 @@ def remove(self, bucket_index: int, count: int = 1) -> bool: Returns: bool: True if any value was actually removed, False otherwise. """ - if count <= 0 or self.min_index is None: + if count <= 0 or self.min_key is None: return False + + if self.min_key <= bucket_index <= self.max_key: + pos = bucket_index - self.offset + if pos < 0 or pos >= len(self.bins): + return False - if self.min_index <= bucket_index <= self.max_index: - pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets - old_count = self.counts[pos] - + old_count = self.bins[pos] if old_count == 0: return False - - self.counts[pos] = max(0, old_count - count) - self.total_count = max(0, self.total_count - count) - if old_count > 0 and self.counts[pos] == 0: - self.num_buckets -= 1 - if self.num_buckets == 0: - self.min_index = None - self.max_index = None - elif bucket_index == self.min_index: - # Find new minimum index - for i in range(self.max_index - self.min_index + 1): - pos = (self.arr_index_of_min_bucket + i) % self.max_buckets - if self.counts[pos] > 0: - self.min_index += i - self.arr_index_of_min_bucket = pos + self.bins[pos] = max(0, old_count - count) + self.count = max(0, self.count - count) + self._cumulative_valid = False + + # Update min/max keys if we emptied a boundary bucket + if old_count > 0 and self.bins[pos] == 0: + if bucket_index == self.min_key: + # Find new minimum + for i in range(len(self.bins)): + if self.bins[i] > 0: + self.min_key = self.offset + i break - elif bucket_index == self.max_index: - # Find new maximum index - for i in range(self.max_index - self.min_index + 1): - pos = (self.arr_index_of_min_bucket + (self.max_index - self.min_index - i)) % self.max_buckets - if self.counts[pos] > 0: - self.max_index -= i + else: + self.min_key = None + self.max_key = None + elif bucket_index == self.max_key: + # Find new maximum + for i in range(len(self.bins) - 1, -1, -1): + if self.bins[i] > 0: + self.max_key = self.offset + i break return True else: warnings.warn("Removing count from non-existent bucket. " - "Bucket index is out of range.", UserWarning) + "Bucket index is out of range.", UserWarning) return False def get_count(self, bucket_index: int) -> int: @@ -177,10 +290,76 @@ def get_count(self, bucket_index: int) -> int: Returns: The count at the specified bucket index. """ - if self.min_index is None or bucket_index < self.min_index or bucket_index > self.max_index: + if self.min_key is None or bucket_index < self.min_key or bucket_index > self.max_key: return 0 - pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets - return int(self.counts[pos]) + pos = bucket_index - self.offset + if pos < 0 or pos >= len(self.bins): + return 0 + return int(self.bins[pos]) + + def _rebuild_cumulative_sums(self): + """Rebuild cumulative sums array for O(log n) rank queries.""" + bins = self.bins + n = len(bins) + if n == 0: + self._cumulative_sums = [] + else: + # Build cumulative sums + cumsum = [0.0] * n + running = 0.0 + for i in range(n): + running += bins[i] + cumsum[i] = running + self._cumulative_sums = cumsum + self._cumulative_valid = True + + def key_at_rank(self, rank, lower=True): + """ + Return the key for the value at given rank. + + Uses lazy cumulative sums and binary search for O(log n) performance. + + Args: + rank: The rank to find. + lower: If True, return key where running_count > rank. + If False, return key where running_count >= rank + 1. + + Returns: + The key at the specified rank. + """ + if not self._cumulative_valid: + self._rebuild_cumulative_sums() + + cumsum = self._cumulative_sums + n = len(cumsum) + if n == 0: + return self.max_key if self.max_key is not None else 0 + + # Use binary search for O(log n) lookup + # Binary search to find first index where condition is true + lo, hi = 0, n + if lower: + # Find first index where cumsum[i] > rank + while lo < hi: + mid = (lo + hi) >> 1 + if cumsum[mid] > rank: + hi = mid + else: + lo = mid + 1 + else: + # Find first index where cumsum[i] >= rank + 1 + target = rank + 1 + while lo < hi: + mid = (lo + hi) >> 1 + if cumsum[mid] >= target: + hi = mid + else: + lo = mid + 1 + + if lo < n: + return lo + self.offset + + return self.max_key if self.max_key is not None else 0 def merge(self, other: 'ContiguousStorage'): """ @@ -189,13 +368,33 @@ def merge(self, other: 'ContiguousStorage'): Args: other: Another ContiguousStorage instance to merge with this one. """ - if other.min_index is None: + if other.count == 0: return - - # Add each non-zero bucket - for i in range(other.max_index - other.min_index + 1): - pos = (other.arr_index_of_min_bucket + i) % other.max_buckets - if other.counts[pos] > 0: - bucket_index = other.min_index + i - self.add(bucket_index, int(other.counts[pos])) - \ No newline at end of file + + if self.count == 0: + self.copy(other) + return + + if other.min_key < self.min_key or other.max_key > self.max_key: + self._extend_range(other.min_key, other.max_key) + + for key in range(other.min_key, other.max_key + 1): + other_idx = key - other.offset + if 0 <= other_idx < len(other.bins): + self_idx = key - self.offset + if 0 <= self_idx < len(self.bins): + self.bins[self_idx] += other.bins[other_idx] + + self.count += other.count + self._cumulative_valid = False + + def copy(self, store: 'ContiguousStorage'): + """Copy another storage into this one.""" + self.bins = store.bins[:] + self.count = store.count + self.min_key = store.min_key + self.max_key = store.max_key + self.offset = store.offset + self.is_collapsed = store.is_collapsed + self.collapse_count = store.collapse_count + self._cumulative_valid = False diff --git a/QuantileFlow/ddsketch/storage/sparse.py b/QuantileFlow/ddsketch/storage/sparse.py index 98616b2..5c5e83f 100644 --- a/QuantileFlow/ddsketch/storage/sparse.py +++ b/QuantileFlow/ddsketch/storage/sparse.py @@ -1,6 +1,6 @@ """Sparse storage implementation for DDSketch using dictionary.""" -from typing import Dict +from typing import Dict, List from .base import Storage, BucketManagementStrategy class SparseStorage(Storage): @@ -26,6 +26,25 @@ def __init__(self, max_buckets: int = 2048, self.counts: Dict[int, int] = {} self.min_index = None # Minimum bucket index seen self.max_index = None # Maximum bucket index seen + # Cached sorted keys and cumulative sums for O(log n) quantile queries + self._sorted_keys: List[int] = [] + self._cumulative_sums: List[float] = [] + self._cache_valid: bool = False + + @property + def count(self): + """Alias for total_count for API compatibility.""" + return self.total_count + + @property + def min_key(self): + """Alias for min_index for API compatibility.""" + return self.min_index + + @property + def max_key(self): + """Alias for max_index for API compatibility.""" + return self.max_index def add(self, bucket_index: int, count: int = 1): """ @@ -40,6 +59,7 @@ def add(self, bucket_index: int, count: int = 1): self.counts[bucket_index] = self.counts.get(bucket_index, 0) + count self.total_count += count + self._cache_valid = False # Update min and max indices if self.min_index is None or bucket_index < self.min_index: @@ -70,6 +90,7 @@ def remove(self, bucket_index: int, count: int = 1) -> bool: self.counts[bucket_index] = max(0, self.counts[bucket_index] - count) self.total_count = max(0, self.total_count - count) + self._cache_valid = False if self.counts[bucket_index] == 0: del self.counts[bucket_index] @@ -123,4 +144,71 @@ def collapse_smallest_buckets(self): # Merge buckets self.counts[i1] += self.counts[i0] - del self.counts[i0] \ No newline at end of file + del self.counts[i0] + self._cache_valid = False + + def _rebuild_cache(self): + """Rebuild sorted keys and cumulative sums for O(log n) rank queries.""" + if not self.counts: + self._sorted_keys = [] + self._cumulative_sums = [] + else: + self._sorted_keys = sorted(self.counts.keys()) + # Build cumulative sums + cumsum = [] + running = 0.0 + for key in self._sorted_keys: + running += self.counts[key] + cumsum.append(running) + self._cumulative_sums = cumsum + self._cache_valid = True + + def key_at_rank(self, rank, lower=True): + """ + Return the key for the value at given rank. + + Uses cached sorted keys and binary search for O(log n) performance. + + Args: + rank: The rank to find. + lower: If True, return key where running_count > rank. + If False, return key where running_count >= rank + 1. + + Returns: + The key at the specified rank. + """ + if not self.counts: + return 0 + + if not self._cache_valid: + self._rebuild_cache() + + cumsum = self._cumulative_sums + n = len(cumsum) + if n == 0: + return self.max_index if self.max_index is not None else 0 + + # Use binary search for O(log n) lookup + lo, hi = 0, n + if lower: + # Find first index where cumsum[i] > rank + while lo < hi: + mid = (lo + hi) >> 1 + if cumsum[mid] > rank: + hi = mid + else: + lo = mid + 1 + else: + # Find first index where cumsum[i] >= rank + 1 + target = rank + 1 + while lo < hi: + mid = (lo + hi) >> 1 + if cumsum[mid] >= target: + hi = mid + else: + lo = mid + 1 + + if lo < len(self._sorted_keys): + return self._sorted_keys[lo] + + return self.max_index if self.max_index is not None else 0 \ No newline at end of file diff --git a/benchmarks/optimized_offset_inline.json b/benchmarks/optimized_offset_inline.json new file mode 100644 index 0000000..5cf9476 --- /dev/null +++ b/benchmarks/optimized_offset_inline.json @@ -0,0 +1,99 @@ +{ + "timestamp": "2026-01-19T18:07:06.343610", + "name": "optimized_offset_inline", + "metadata": { + "num_values": 10000000, + "num_trials": 3, + "timestamp": "2026-01-19T18:07:06.343600" + }, + "stats": [ + { + "function": "profile_ddsketch.py:26(run_sketch_operations)", + "ncalls": 1, + "tottime": 2.106967754333333, + "cumtime": 12.268609562, + "percall_tot": 2.106967754333333, + "percall_cum": 12.268609562 + }, + { + "function": "core.py:97(insert)", + "ncalls": 10000000, + "tottime": 8.299598266666667, + "cumtime": 10.160096177, + "percall_tot": 8.299598266666668e-07, + "percall_cum": 1.0160096177000002e-06 + }, + { + "function": "~:0()", + "ncalls": 10000000, + "tottime": 0.9981760419999999, + "cumtime": 0.9981760419999999, + "percall_tot": 9.98176042e-08, + "percall_cum": 9.98176042e-08 + }, + { + "function": "~:0()", + "ncalls": 10000000, + "tottime": 0.8620358106666668, + "cumtime": 0.8620358106666668, + "percall_tot": 8.620358106666668e-08, + "percall_cum": 8.620358106666668e-08 + }, + { + "function": "core.py:168(quantile)", + "ncalls": 4, + "tottime": 0.0007727616666666667, + "cumtime": 0.0015440526666666667, + "percall_tot": 0.00019319041666666668, + "percall_cum": 0.0003860131666666667 + }, + { + "function": "contiguous.py:216(get_count)", + "ncalls": 3344, + "tottime": 0.0007546813333333334, + "cumtime": 0.0007546813333333334, + "percall_tot": 2.2605956897592843e-07, + "percall_cum": 2.2605956897592843e-07 + }, + { + "function": "contiguous.py:64(_add_with_expansion)", + "ncalls": 21, + "tottime": 0.0002860576666666667, + "cumtime": 0.0002860576666666667, + "percall_tot": 1.317209009661836e-05, + "percall_cum": 1.317209009661836e-05 + }, + { + "function": "~:0()", + "ncalls": 1, + "tottime": 3.222e-05, + "cumtime": 3.222e-05, + "percall_tot": 3.222e-05, + "percall_cum": 3.222e-05 + }, + { + "function": "logarithmic.py:18(compute_value_from_index)", + "ncalls": 4, + "tottime": 1.3633333333333336e-05, + "cumtime": 1.6609666666666668e-05, + "percall_tot": 3.408333333333334e-06, + "percall_cum": 4.152416666666667e-06 + }, + { + "function": "~:0()", + "ncalls": 4, + "tottime": 2.9763333333333336e-06, + "cumtime": 2.9763333333333336e-06, + "percall_tot": 7.440833333333334e-07, + "percall_cum": 7.440833333333334e-07 + }, + { + "function": "~:0()", + "ncalls": 1, + "tottime": 1.5780000000000002e-06, + "cumtime": 1.5780000000000002e-06, + "percall_tot": 1.5780000000000002e-06, + "percall_cum": 1.5780000000000002e-06 + } + ] +} \ No newline at end of file diff --git a/metrics-streaming-service/README.md b/metrics-streaming-service/README.md index eb0d188..cb558cd 100644 --- a/metrics-streaming-service/README.md +++ b/metrics-streaming-service/README.md @@ -74,7 +74,76 @@ python ./src/main.py --csv ./data/HDFS_v1/preprocessed/Event_traces.csv --sketch | `momentsketch` | QuantileFlow MomentSketch | Moment-based quantile estimation | | `hdrhistogram` | QuantileFlow HDRHistogram | High Dynamic Range Histogram | -### 5. Monitor & Observe +### 5. Line-by-Line Profiling + +For performance optimization, you can enable **line-by-line profiling** of the sketch operations during the actual Kafka benchmark. This provides detailed per-line runtime analysis on the real HDFS dataset. + +**Prerequisites:** +```bash +pip install line_profiler +``` + +**Basic Usage:** +```bash +# Enable line profiling with default settings +python ./src/main.py --csv ./data/HDFS_v1/preprocessed/Event_traces.csv --line-profile + +# Profile different sketch implementations +python ./src/main.py --csv ./data/HDFS_v1/preprocessed/Event_traces.csv --sketch quantileflow --line-profile + +python ./src/main.py --csv ./data/HDFS_v1/preprocessed/Event_traces.csv --sketch momentsketch --line-profile + +# Specify custom output file +python ./src/main.py --csv ./data/HDFS_v1/preprocessed/Event_traces.csv --line-profile --profile-output results/ddsketch_profile.txt +``` + +**What Gets Profiled:** + +The profiler tracks execution time for every line in the following critical functions: + +| Sketch Type | Profiled Functions | +|-------------|-------------------| +| `quantileflow` | `DDSketch.insert`, `DDSketch.quantile`, `ContiguousStorage.add`, `LogarithmicMapping.compute_bucket_index` | +| `momentsketch` | `MomentSketch.insert`, `MomentSketch.quantile` | +| `hdrhistogram` | `HDRHistogram.insert`, `HDRHistogram.quantile` | + +**Output:** + +The profiling results are saved to a file (default: `line_profile_kafka.txt`) and include: +- Total runtime for each line of code +- Number of times each line was executed (hits) +- Percentage of time spent in each line +- Time per execution + +**Example Output:** +``` +Line # Hits Time Per Hit % Time Line Contents +============================================================== + 45 575061 12345678 21.5 15.2 bucket_idx = int(math.floor(self.gamma * math.log(value) + self.offset)) + 46 575061 8234567 14.3 10.1 self.positive_counts.add(bucket_idx) +``` + +**Profiling Workflow:** + +1. **Baseline profiling** - Profile with random data: + ```bash + cd /home/taira/Code/QuantileFlow + python profile_ddsketch.py --line-profile --num-values 1000000 --num-trials 3 + ``` + +2. **Real-world profiling** - Profile with HDFS dataset via Kafka: + ```bash + cd /home/taira/Code/QuantileFlow/metrics-streaming-service + python ./src/main.py --csv ./data/HDFS_v1/preprocessed/Event_traces.csv --line-profile + ``` + +3. **Compare results** - Analyze differences between synthetic and real-world performance + +4. **Optimize** - Focus on lines with highest `% Time` and `Time` values + +5. **Verify** - Re-run profiling after optimizations to measure improvements + +### 6. Monitor & Observe ```bash # Terminal 2: Monitor consumer group @@ -86,7 +155,7 @@ watch -n 1 kafka-run-class kafka.tools.GetOffsetShell \ --broker-list localhost:9092 --topic latency-metrics ``` -### 6. Stopping Kafka +### 7. Stopping Kafka ```bash # Stop Kafka server diff --git a/metrics-streaming-service/docker-compose.yml b/metrics-streaming-service/docker-compose.yml new file mode 100644 index 0000000..14b96c1 --- /dev/null +++ b/metrics-streaming-service/docker-compose.yml @@ -0,0 +1,31 @@ +services: + kafka: + image: apache/kafka:latest + container_name: kafka-quantileflow + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_NUM_PARTITIONS: 3 + CLUSTER_ID: 'QuantileFlow-Kafka-Cluster' + volumes: + - kafka-data:/var/lib/kafka/data + healthcheck: + test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092"] + interval: 10s + timeout: 10s + retries: 5 + +volumes: + kafka-data: + driver: local diff --git a/metrics-streaming-service/line_profile_kafka.txt b/metrics-streaming-service/line_profile_kafka.txt new file mode 100644 index 0000000..2daff17 --- /dev/null +++ b/metrics-streaming-service/line_profile_kafka.txt @@ -0,0 +1,121 @@ + +======================================================================================================================== +Line-by-Line Profiling Results - Kafka Metrics Streaming Benchmark +======================================================================================================================== +Sketch Algorithm: QuantileFlow DDSketch +Messages Processed: 872,161 +Total Runtime: 2091.55 seconds +Timestamp: 2026-01-21T22:31:52.067860 +======================================================================================================================== + +Timer unit: 1e-09 s + +Total time: 15.3584 s +File: /home/taira/Code/QuantileFlow/QuantileFlow/ddsketch/core.py +Function: DDSketch.insert at line 88 + +Line # Hits Time Per Hit % Time Line Contents +============================================================== + 88 def insert(self, value: Union[int, float], weight: float = 1.0) -> None: + 89 """ + 90 Insert a value into the sketch. + 91 + 92 Args: + 93 value: The value to insert. + 94 weight: The weight of the value (default 1.0). + 95 + 96 Raises: + 97 ValueError: If value is negative and cont_neg is False. + 98 """ + 99 872161 816360452.0 936.0 5.3 if value > 0: + 100 867922 1.17e+10 13455.3 76.0 self.positive_store.add(self.mapping.compute_bucket_index(value), weight) + 101 4239 3014973.0 711.2 0.0 elif value < 0: + 102 if self.cont_neg: + 103 self.negative_store.add(self.mapping.compute_bucket_index(-value), weight) + 104 else: + 105 raise ValueError("Negative values not supported when cont_neg is False") + 106 else: + 107 4239 3109202.0 733.5 0.0 self.zero_count += weight + 108 + 109 # Track summary stats + 110 872161 348912595.0 400.1 2.3 self.count += weight + 111 872161 1080794776.0 1239.2 7.0 self._sum += value * weight + 112 872161 489626681.0 561.4 3.2 if value < self._min: + 113 9 2982.0 331.3 0.0 self._min = value + 114 872161 938384938.0 1075.9 6.1 if value > self._max: + 115 9 6437.0 715.2 0.0 self._max = value + +Total time: 1404.78 s +File: /home/taira/Code/QuantileFlow/QuantileFlow/ddsketch/core.py +Function: DDSketch.quantile at line 152 + +Line # Hits Time Per Hit % Time Line Contents +============================================================== + 152 def quantile(self, q: float) -> float: + 153 """ + 154 Compute the approximate quantile. + 155 + 156 Args: + 157 q: The desired quantile (between 0 and 1). + 158 + 159 Returns: + 160 The approximate value at the specified quantile. + 161 + 162 Raises: + 163 ValueError: If q is not between 0 and 1 or if the sketch is empty. + 164 """ + 165 2616483 2277023243.0 870.3 0.2 if not 0 <= q <= 1: + 166 raise ValueError("Quantile must be between 0 and 1") + 167 2616483 1371491141.0 524.2 0.1 if self.count == 0: + 168 raise ValueError("Cannot compute quantile of empty sketch") + 169 + 170 2616483 1518291210.0 580.3 0.1 rank = q * (self.count - 1) + 171 + 172 2616483 1273997484.0 486.9 0.1 if self.cont_neg and self.negative_store is not None: + 173 2616483 1054785649.0 403.1 0.1 neg_count = self.negative_store.count + 174 2616483 1143741956.0 437.1 0.1 if rank < neg_count: + 175 # Handle negative values - use reversed rank + 176 reversed_rank = neg_count - rank - 1 + 177 key = self.negative_store.key_at_rank(reversed_rank, lower=False) + 178 return -self.mapping.compute_value_from_index(key) + 179 2616483 1138537472.0 435.1 0.1 rank -= neg_count + 180 + 181 2616483 1081722072.0 413.4 0.1 if rank < self.zero_count: + 182 return 0.0 + 183 2616483 923188783.0 352.8 0.1 rank -= self.zero_count + 184 + 185 # Use key_at_rank for consistency with storage implementation + 186 2616483 1.38e+12 528555.5 98.4 key = self.positive_store.key_at_rank(rank) + 187 2616483 1e+10 3836.4 0.7 return self.mapping.compute_value_from_index(key) + +Total time: 2.48174 s +File: /home/taira/Code/QuantileFlow/QuantileFlow/ddsketch/mapping/logarithmic.py +Function: LogarithmicMapping.compute_bucket_index at line 29 + +Line # Hits Time Per Hit % Time Line Contents +============================================================== + 29 def compute_bucket_index(self, value: float) -> int: + 30 """Compute the bucket index for a given value. + 31 + 32 ceil(log_gamma(value)) = ceil(log(value) / log(gamma)) + 33 """ + 34 867922 2481735173.0 2859.4 100.0 return math.ceil(math.log(value) * self.multiplier) + +Total time: 4.76142 s +File: /home/taira/Code/QuantileFlow/QuantileFlow/ddsketch/storage/contiguous.py +Function: ContiguousStorage.add at line 110 + +Line # Hits Time Per Hit % Time Line Contents +============================================================== + 110 def add(self, key, weight=1.0): + 111 """ + 112 Add weight to the bin at key. + 113 + 114 Args: + 115 key: The bucket index to add to. + 116 weight: The weight to add (default 1.0). + 117 """ + 118 867922 3367877436.0 3880.4 70.7 idx = self._get_index(key) + 119 867922 659017479.0 759.3 13.8 self.bins[idx] += weight + 120 867922 734526582.0 846.3 15.4 self.count += weight + diff --git a/metrics-streaming-service/profile_kafka_benchmark.sh b/metrics-streaming-service/profile_kafka_benchmark.sh new file mode 100755 index 0000000..37f96df --- /dev/null +++ b/metrics-streaming-service/profile_kafka_benchmark.sh @@ -0,0 +1,118 @@ +#!/bin/bash + +# Profile Kafka Benchmark Script +# This script helps you run line-by-line profiling on the Kafka metrics streaming service + +set -e + +# Colors for output +GREEN='\033[0;32m' +BLUE='\033[0;34m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Default values +CSV_FILE="./data/HDFS_v1/preprocessed/Event_traces.csv" +SKETCH_TYPE="quantileflow" +OUTPUT_DIR="./profiling_results" + +# Create output directory +mkdir -p "$OUTPUT_DIR" + +echo -e "${BLUE}╔════════════════════════════════════════════════════════════╗${NC}" +echo -e "${BLUE}║ Kafka Metrics Streaming - Line Profiling Benchmark ║${NC}" +echo -e "${BLUE}╚════════════════════════════════════════════════════════════╝${NC}" +echo "" + +# Parse command line arguments +while [[ $# -gt 0 ]]; do + case $1 in + --csv) + CSV_FILE="$2" + shift 2 + ;; + --sketch) + SKETCH_TYPE="$2" + shift 2 + ;; + --help) + echo "Usage: $0 [OPTIONS]" + echo "" + echo "Options:" + echo " --csv FILE CSV file path (default: ./data/HDFS_v1/preprocessed/Event_traces.csv)" + echo " --sketch TYPE Sketch type: quantileflow, datadog, momentsketch, hdrhistogram (default: quantileflow)" + echo " --help Show this help message" + echo "" + echo "Examples:" + echo " $0" + echo " $0 --sketch momentsketch" + echo " $0 --csv ./data/custom.csv --sketch quantileflow" + exit 0 + ;; + *) + echo "Unknown option: $1" + echo "Use --help for usage information" + exit 1 + ;; + esac +done + +# Validate CSV file exists +if [ ! -f "$CSV_FILE" ]; then + echo -e "${YELLOW}⚠ Warning: CSV file not found: $CSV_FILE${NC}" + echo "Please ensure the data file exists before running." + exit 1 +fi + +# Check if line_profiler is installed +if ! python -c "import line_profiler" 2>/dev/null; then + echo -e "${YELLOW}⚠ Warning: line_profiler is not installed${NC}" + echo "Installing line_profiler..." + pip install line_profiler +fi + +# Generate timestamp for output file +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") +OUTPUT_FILE="${OUTPUT_DIR}/${SKETCH_TYPE}_profile_${TIMESTAMP}.txt" + +echo -e "${GREEN}Configuration:${NC}" +echo " • CSV File: $CSV_FILE" +echo " • Sketch Type: $SKETCH_TYPE" +echo " • Output File: $OUTPUT_FILE" +echo "" + +echo -e "${BLUE}Starting Kafka profiling benchmark...${NC}" +echo "" + +# Check if Kafka is running +if ! lsof -i :9092 > /dev/null 2>&1; then + echo -e "${YELLOW}⚠ Warning: Kafka doesn't appear to be running on port 9092${NC}" + echo "Please start Kafka first. See README.md for instructions." + echo "" + read -p "Do you want to continue anyway? (y/N) " -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + exit 1 + fi +fi + +# Run the profiling +python ./src/main.py \ + --csv "$CSV_FILE" \ + --sketch "$SKETCH_TYPE" \ + --line-profile \ + --profile-output "$OUTPUT_FILE" + +echo "" +echo -e "${GREEN}✓ Profiling complete!${NC}" +echo "" +echo -e "${BLUE}Results saved to: ${OUTPUT_FILE}${NC}" +echo "" +echo "To view the results:" +echo " cat $OUTPUT_FILE" +echo " less $OUTPUT_FILE" +echo "" +echo "Tips for analyzing results:" +echo " • Look for lines with high '% Time' - these are bottlenecks" +echo " • Compare 'Time' vs 'Per Hit' to find frequently-called slow operations" +echo " • Focus optimization on functions with high cumulative time" diff --git a/metrics-streaming-service/src/main.py b/metrics-streaming-service/src/main.py index b8f23fd..a9977fa 100644 --- a/metrics-streaming-service/src/main.py +++ b/metrics-streaming-service/src/main.py @@ -11,21 +11,52 @@ def run_producer(csv_file): producer = LogProducer() producer.stream_logs(csv_file) -def run_consumer(sketch_type): +def run_consumer(sketch_type, enable_line_profile=False, profile_output='line_profile_kafka.txt'): from metrics_streamer.consumer import LatencyMonitor - consumer = LatencyMonitor(sketch_type=sketch_type) + consumer = LatencyMonitor( + sketch_type=sketch_type, + enable_line_profile=enable_line_profile, + profile_output=profile_output + ) consumer.process_metrics() def main(): - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser( + description='Metrics Streaming Service with Sketch Algorithms', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Basic usage with QuantileFlow DDSketch + python main.py --csv data/HDFS_v1/HDFS_v1.csv + + # Compare different sketch algorithms + python main.py --csv data/HDFS_v1/HDFS_v1.csv --sketch momentsketch + + # Enable line-by-line profiling for performance analysis + python main.py --csv data/HDFS_v1/HDFS_v1.csv --line-profile + + # Profile with custom output file + python main.py --csv data/HDFS_v1/HDFS_v1.csv --line-profile --profile-output results/profile.txt + + # Profile different sketch implementation + python main.py --csv data/HDFS_v1/HDFS_v1.csv --sketch momentsketch --line-profile --profile-output results/momentsketch_profile.txt + """ + ) parser.add_argument('--csv', required=True, help='Input CSV file path') parser.add_argument('--sketch', choices=['quantileflow', 'datadog', 'momentsketch', 'hdrhistogram'], default='quantileflow', help='Sketch algorithm to use (default: quantileflow)') + parser.add_argument('--line-profile', action='store_true', + help='Enable line-by-line profiling of sketch operations (requires line_profiler)') + parser.add_argument('--profile-output', type=str, default='line_profile_kafka.txt', + help='Output file for line profiling results (default: line_profile_kafka.txt)') args = parser.parse_args() try: # Create processes with target functions - consumer_process = Process(target=run_consumer, args=(args.sketch,)) + consumer_process = Process( + target=run_consumer, + args=(args.sketch, args.line_profile, args.profile_output) + ) producer_process = Process(target=run_producer, args=(args.csv,)) # Start processes diff --git a/metrics-streaming-service/src/metrics_streamer/consumer.py b/metrics-streaming-service/src/metrics_streamer/consumer.py index 245cfba..c3f1a64 100644 --- a/metrics-streaming-service/src/metrics_streamer/consumer.py +++ b/metrics-streaming-service/src/metrics_streamer/consumer.py @@ -5,14 +5,26 @@ import sys import time from datetime import datetime +from pathlib import Path +from io import StringIO from .config import KAFKA_CONFIG +# Add local QuantileFlow folder to path +sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent)) + # Import sketch algorithms from QuantileFlow from QuantileFlow.ddsketch import DDSketch as QuantileFlowDDSketch from QuantileFlow.hdrhistogram import HDRHistogram as QuantileFlowHDRHistogram from ddsketch import DDSketch as DatadogDDSketch from QuantileFlow.momentsketch import MomentSketch as QuantileFlowMomentSketch +# Optional line profiler import +try: + from line_profiler import LineProfiler + LINE_PROFILER_AVAILABLE = True +except ImportError: + LINE_PROFILER_AVAILABLE = False + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -20,7 +32,7 @@ def deserialize_message(msg): return json.loads(msg.decode('utf-8')) class LatencyMonitor: - def __init__(self, sketch_type='quantileflow', window_size=100, refresh_interval=0.0001, dd_accuracy=0.01, moment_count=10): + def __init__(self, sketch_type='quantileflow', window_size=100, refresh_interval=0.0001, dd_accuracy=0.01, moment_count=10, enable_line_profile=False, profile_output='line_profile_kafka.txt'): self.window_size = window_size self.running = True self.start_time = time.time() @@ -28,6 +40,9 @@ def __init__(self, sketch_type='quantileflow', window_size=100, refresh_interval self.refresh_interval = refresh_interval self.msg_count = 0 self.sketch_type = sketch_type + self.enable_line_profile = enable_line_profile + self.profile_output = profile_output + self.line_profiler = None # Initialize the selected sketch for quantile computation if sketch_type == 'quantileflow': @@ -46,6 +61,51 @@ def __init__(self, sketch_type='quantileflow', window_size=100, refresh_interval else: raise ValueError(f"Unknown sketch type: {sketch_type}") + # Setup line profiler if enabled + if self.enable_line_profile: + if not LINE_PROFILER_AVAILABLE: + logger.error("line_profiler is not installed! Install with: pip install line_profiler") + self.enable_line_profile = False + else: + self._setup_line_profiler() + + def _setup_line_profiler(self): + """Setup line profiler for the sketch methods.""" + logger.info(f"Setting up line profiler for {self.sketch_name}...") + self.line_profiler = LineProfiler() + + # Add sketch methods to profile based on sketch type + if self.sketch_type == 'quantileflow': + # Import modules for profiling + from QuantileFlow.ddsketch.core import DDSketch + from QuantileFlow.ddsketch.storage.contiguous import ContiguousStorage + from QuantileFlow.ddsketch.mapping.logarithmic import LogarithmicMapping + + self.line_profiler.add_function(DDSketch.insert) + self.line_profiler.add_function(DDSketch.quantile) + self.line_profiler.add_function(ContiguousStorage.add) + self.line_profiler.add_function(LogarithmicMapping.compute_bucket_index) + logger.info("Profiling: DDSketch.insert, DDSketch.quantile, ContiguousStorage.add, LogarithmicMapping.compute_bucket_index") + + elif self.sketch_type == 'momentsketch': + from QuantileFlow.momentsketch.core import MomentSketch + + self.line_profiler.add_function(MomentSketch.insert) + self.line_profiler.add_function(MomentSketch.quantile) + logger.info("Profiling: MomentSketch.insert, MomentSketch.quantile") + + elif self.sketch_type == 'hdrhistogram': + from QuantileFlow.hdrhistogram.core import HDRHistogram + + self.line_profiler.add_function(HDRHistogram.insert) + self.line_profiler.add_function(HDRHistogram.quantile) + logger.info("Profiling: HDRHistogram.insert, HDRHistogram.quantile") + + # Note: datadog DDSketch profiling would require profiling their library code + # which is not as useful for optimizing QuantileFlow + + logger.info("Line profiler setup complete. Profiling will be saved on shutdown.") + def _create_consumer(self): return KafkaConsumer( KAFKA_CONFIG['topic'], @@ -64,6 +124,11 @@ def process_metrics(self): signal.signal(signal.SIGTERM, self.shutdown) signal.signal(signal.SIGINT, self.shutdown) + # Enable line profiler if configured + if self.enable_line_profile and self.line_profiler: + logger.info("Enabling line profiler...") + self.line_profiler.enable() + while self.running: records = consumer.poll(timeout_ms=3000) for tp, messages in records.items(): @@ -72,6 +137,12 @@ def process_metrics(self): except Exception as e: logger.error(f"Consumer error: {e}") finally: + # Disable and save profiler results + if self.enable_line_profile and self.line_profiler: + logger.info("Disabling line profiler and saving results...") + self.line_profiler.disable() + self._save_profile_results() + consumer.close() def calculate_stats(self): @@ -169,6 +240,48 @@ def _print_metrics(self, data, stats): sys.stdout.write(f"\n{'='*80}\n") sys.stdout.flush() + def _save_profile_results(self): + """Save line profiler results to file.""" + if not self.line_profiler: + return + + try: + # Get profiling results + string_buffer = StringIO() + self.line_profiler.print_stats(stream=string_buffer) + profile_output = string_buffer.getvalue() + + # Add header with metadata + header = f""" +{'='*120} +Line-by-Line Profiling Results - Kafka Metrics Streaming Benchmark +{'='*120} +Sketch Algorithm: {self.sketch_name} +Messages Processed: {self.msg_count:,} +Total Runtime: {time.time() - self.start_time:.2f} seconds +Timestamp: {datetime.now().isoformat()} +{'='*120} + +""" + + # Write to file + output_path = Path(self.profile_output) + output_path.write_text(header + profile_output) + + logger.info(f"✓ Line profile saved to: {output_path.absolute()}") + + # Also print summary to console + print("\n" + "="*120) + print(f"Line Profiling Summary - {self.sketch_name}") + print("="*120) + print(f"Messages processed: {self.msg_count:,}") + print(f"Total runtime: {time.time() - self.start_time:.2f} seconds") + print(f"Full results saved to: {output_path.absolute()}") + print("="*120 + "\n") + + except Exception as e: + logger.error(f"Error saving profile results: {e}") + def shutdown(self, signum, frame): logger.info("Shutting down consumer...") self.running = False diff --git a/metrics-streaming-service/src/metrics_streamer/producer.py b/metrics-streaming-service/src/metrics_streamer/producer.py index 157fea4..d05d98a 100644 --- a/metrics-streaming-service/src/metrics_streamer/producer.py +++ b/metrics-streaming-service/src/metrics_streamer/producer.py @@ -2,8 +2,13 @@ import pandas as pd import json import logging +import sys +from pathlib import Path from .config import KAFKA_CONFIG +# Add local QuantileFlow folder to path +sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent)) + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)