Skip to content
Open

Go #3

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ba55aa0
feat: add go versions of the micro services
slayerjain Jan 9, 2026
59095cd
fix: go services
officialasishkumar Jan 9, 2026
cc64e12
fix: docker-compose file
officialasishkumar Jan 9, 2026
8b1e84f
fix: docker-compose file
officialasishkumar Jan 9, 2026
eddf4f1
fix: keploy yaml
officialasishkumar Jan 9, 2026
f33b76e
fix: add script files
officialasishkumar Jan 9, 2026
25f1dab
fix: add freezetime build in order service
officialasishkumar Jan 9, 2026
f34f8c2
Fix: update dockerfile for order service for dedup
officialasishkumar Jan 9, 2026
391eba5
fix: add global and test set level noise
officialasishkumar Jan 9, 2026
e89041d
fix: add noise endpoints
officialasishkumar Jan 9, 2026
d914549
fix: k8s setup
officialasishkumar Jan 10, 2026
4ae340e
fix: udpate docker compose for coverage
officialasishkumar Jan 10, 2026
42f493a
fix: tiem freeze setup
officialasishkumar Jan 10, 2026
dabe281
feat: add guide.md
officialasishkumar Jan 12, 2026
aac791b
feat: add guide.md
officialasishkumar Jan 12, 2026
1701fc1
feat: add guide.md
officialasishkumar Jan 12, 2026
5bdf417
feat: add guide.md
officialasishkumar Jan 12, 2026
9a63e27
feat: add guide.md
officialasishkumar Jan 12, 2026
d755dc2
feat: add keploy installation in guide
officialasishkumar Jan 12, 2026
9bc5f05
feat: add Kafka integration
Yogeshjindal Feb 9, 2026
6b8bc71
feat: Add Kafka safe wrappers and improve order service configuration
Yogeshjindal Feb 19, 2026
d7dd7a9
Add noise config to ignore dynamic order IDs in Keploy tests
Yogeshjindal Feb 26, 2026
80c8b69
push all
Yogeshjindal Feb 28, 2026
436c019
Remove keploy-enterprise binary and add to gitignore
Yogeshjindal Feb 28, 2026
87033ea
proper initialisation and skipping as per mode
Yogeshjindal Feb 28, 2026
07a9968
using mock matching
Yogeshjindal Feb 28, 2026
d54e1ac
feat: enhance database initialization and connection logic; add run s…
slayerjain Mar 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ __pycache__/
.pytest_cache/
.mypy_cache/
.coverage
coverage/

# Envs
.env
Expand All @@ -12,3 +13,42 @@ venv/

# OS
.DS_Store

# IDE
.vscode/
.idea/

# Logs
*.log
keploy-logs.txt

# Compiled binaries (specific paths to avoid ignoring directories)
go-services/order_service/order_service
go-services/order_service/order_service_bin
go-services/order_service/order_service_test
go-services/product_service/product_service
go-services/product_service/product_service_bin
go-services/product_service/product_service_test
go-services/user_service/user_service
go-services/user_service/user_service_bin
go-services/user_service/user_service_test

# Backup files
*.bak

# Analysis/Summary files
FINAL_FIX_SUMMARY.md
KAFKA_MOCK_FIX_USAGE.md

# Keploy test sets (keep keploy.yml configs only)
**/keploy/test-set-*/
**/keploy/dedup/
**/keploy/freezeTime/

# Docker compose variants
docker-compose-keploy.yml

# Go workspace (if not needed)
go.work
go.work.sum
go-services/keploy-enterprise
1 change: 1 addition & 0 deletions .kiro/specs/kafka-compression-support-fix/.config.kiro
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"specId": "f6c0f35e-3655-4d5d-94b0-35d785a67b12", "workflowType": "requirements-first", "specType": "bugfix"}
35 changes: 35 additions & 0 deletions .kiro/specs/kafka-compression-support-fix/bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Bugfix Requirements Document

## Introduction

The `decodeRecordBatches` function in `pkg/core/proxy/integrations/kafka/wire/bodies.go` currently only implements decompression for gzip-compressed Kafka record batches (compression type 1). However, the code recognizes and labels 5 compression types: none (0), gzip (1), snappy (2), lz4 (3), and zstd (4). When Kafka producers send record batches compressed with snappy, lz4, or zstd, the decoder fails to decompress them, resulting in corrupted or unreadable record data. This bug affects the proxy's ability to correctly decode and process Kafka messages using these compression algorithms.

## Bug Analysis

### Current Behavior (Defect)

1.1 WHEN a record batch with compression type 2 (snappy) is received THEN the system fails to decompress the records data and attempts to decode compressed bytes as raw records

1.2 WHEN a record batch with compression type 3 (lz4) is received THEN the system fails to decompress the records data and attempts to decode compressed bytes as raw records

1.3 WHEN a record batch with compression type 4 (zstd) is received THEN the system fails to decompress the records data and attempts to decode compressed bytes as raw records

### Expected Behavior (Correct)

2.1 WHEN a record batch with compression type 2 (snappy) is received THEN the system SHALL decompress the records data using snappy decompression before decoding records

2.2 WHEN a record batch with compression type 3 (lz4) is received THEN the system SHALL decompress the records data using lz4 decompression before decoding records

2.3 WHEN a record batch with compression type 4 (zstd) is received THEN the system SHALL decompress the records data using zstd decompression before decoding records

2.4 WHEN decompression fails for any supported compression type THEN the system SHALL handle the error gracefully without crashing

### Unchanged Behavior (Regression Prevention)

3.1 WHEN a record batch with compression type 0 (none) is received THEN the system SHALL CONTINUE TO decode records without decompression

3.2 WHEN a record batch with compression type 1 (gzip) is received THEN the system SHALL CONTINUE TO decompress using gzip and decode records correctly

3.3 WHEN gzip decompression fails THEN the system SHALL CONTINUE TO handle the error gracefully as it currently does

3.4 WHEN record batches are successfully decoded THEN the system SHALL CONTINUE TO populate all RecordBatch fields correctly (BaseOffset, BatchLength, Compression, TimestampType, FirstTimestamp, MaxTimestamp, ProducerID, ProducerEpoch, BaseSequence, Records)
198 changes: 198 additions & 0 deletions .kiro/specs/kafka-compression-support-fix/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Kafka Compression Support Bugfix Design

## Overview

The `decodeRecordBatches` function currently only implements decompression for gzip-compressed Kafka record batches. While the code correctly identifies all five compression types (none, gzip, snappy, lz4, zstd), it only decompresses gzip. This causes record batches compressed with snappy, lz4, or zstd to be decoded as raw compressed bytes, resulting in corrupted data. The fix will add decompression support for the three missing compression algorithms using minimal code changes to preserve existing functionality.

## Glossary

- **Bug_Condition (C)**: The condition that triggers the bug - when record batches use snappy (2), lz4 (3), or zstd (4) compression
- **Property (P)**: The desired behavior when compressed batches are received - records should be decompressed before decoding
- **Preservation**: Existing decompression behavior for gzip and no-compression handling that must remain unchanged
- **decodeRecordBatches**: The function in `pkg/core/proxy/integrations/kafka/wire/bodies.go` that parses Kafka record batch wire format
- **compression**: An int value (0-4) extracted from the attributes field indicating the compression algorithm used
- **recordsData**: The byte slice containing either compressed or uncompressed record data that needs decoding

## Bug Details

### Fault Condition

The bug manifests when Kafka producers send record batches compressed with snappy, lz4, or zstd algorithms. The `decodeRecordBatches` function correctly identifies the compression type from the attributes field but only implements decompression for gzip (type 1), leaving the other three compression types unhandled.

**Formal Specification:**
```
FUNCTION isBugCondition(input)
INPUT: input of type RecordBatchBytes
OUTPUT: boolean

RETURN input.compressionType IN [2, 3, 4]
AND input.recordsData.isCompressed == true
AND decompression is not performed
END FUNCTION
```

### Examples

- **Snappy compression**: A producer sends a batch with compression=2. The decoder sets `batch.Compression = "snappy"` but passes compressed bytes directly to `decodeRecords()`, which fails to parse them correctly.
- **LZ4 compression**: A producer sends a batch with compression=3. The decoder sets `batch.Compression = "lz4"` but the compressed recordsData is never decompressed, resulting in garbage record data.
- **Zstd compression**: A producer sends a batch with compression=4. The decoder sets `batch.Compression = "zstd"` but attempts to decode compressed bytes as raw records, producing invalid output.
- **Gzip compression (working)**: A producer sends a batch with compression=1. The decoder correctly decompresses using gzip before calling `decodeRecords()`.

## Expected Behavior

### Preservation Requirements

**Unchanged Behaviors:**
- Uncompressed batches (compression=0) must continue to decode without any decompression step
- Gzip-compressed batches (compression=1) must continue to decompress correctly using the existing gzip logic
- Error handling for gzip decompression failures must remain unchanged
- All RecordBatch field parsing (BaseOffset, BatchLength, timestamps, producer info, etc.) must remain unchanged

**Scope:**
All inputs that do NOT involve snappy, lz4, or zstd compression should be completely unaffected by this fix. This includes:
- Uncompressed record batches (compression=0)
- Gzip-compressed record batches (compression=1)
- Batch header parsing logic
- Record decoding logic after decompression

## Hypothesized Root Cause

Based on the bug description and code analysis, the root cause is clear:

1. **Incomplete Implementation**: The function was implemented with only gzip decompression support, likely because:
- Gzip is the most common compression type in older Kafka deployments
- The other compression libraries weren't imported or integrated initially
- The feature was partially implemented and never completed

2. **Missing Library Integration**: The code needs to import and use three additional decompression libraries:
- `github.com/golang/snappy` for snappy decompression
- `github.com/pierrec/lz4/v4` for lz4 decompression
- `github.com/klauspost/compress/zstd` for zstd decompression

3. **Switch Statement Gap**: The switch statement on line 2447-2458 identifies compression types but the decompression logic (lines 2517-2527) only handles gzip (compression == 1).

## Correctness Properties

Property 1: Fault Condition - Snappy/LZ4/Zstd Decompression

_For any_ record batch where the compression type is 2 (snappy), 3 (lz4), or 4 (zstd), the fixed decodeRecordBatches function SHALL decompress the recordsData using the appropriate decompression algorithm before passing it to decodeRecords, resulting in correctly parsed record data.

**Validates: Requirements 2.1, 2.2, 2.3, 2.4**

Property 2: Preservation - Existing Compression Handling

_For any_ record batch where the compression type is 0 (none) or 1 (gzip), the fixed decodeRecordBatches function SHALL produce exactly the same behavior as the original function, preserving the existing decompression logic and error handling.

**Validates: Requirements 3.1, 3.2, 3.3, 3.4**

## Fix Implementation

### Changes Required

Assuming our root cause analysis is correct:

**File**: `pkg/core/proxy/integrations/kafka/wire/bodies.go`

**Function**: `decodeRecordBatches`

**Specific Changes**:
1. **Add Import Statements**: Add three new imports at the top of the file:
- `"github.com/golang/snappy"`
- `"github.com/pierrec/lz4/v4"`
- `"github.com/klauspost/compress/zstd"`

2. **Extend Decompression Logic**: Modify the decompression section (currently lines 2517-2527) to handle all compression types:
- Add case for compression == 2 (snappy): Use `snappy.Decode()` to decompress
- Add case for compression == 3 (lz4): Use `lz4.NewReader()` and `io.ReadAll()` to decompress
- Add case for compression == 4 (zstd): Use `zstd.NewReader()` and `io.ReadAll()` to decompress
- Keep existing gzip logic unchanged

3. **Error Handling**: Follow the same error handling pattern as gzip:
- If decompression fails, silently continue with compressed data (matches current gzip behavior)
- This preserves the existing graceful degradation approach

4. **Minimal Code Changes**: Use a switch statement or if-else chain to keep changes localized to the decompression section only

## Testing Strategy

### Validation Approach

The testing strategy follows a two-phase approach: first, surface counterexamples that demonstrate the bug on unfixed code, then verify the fix works correctly and preserves existing behavior.

### Exploratory Fault Condition Checking

**Goal**: Surface counterexamples that demonstrate the bug BEFORE implementing the fix. Confirm that snappy, lz4, and zstd compressed batches fail to decompress correctly.

**Test Plan**: Create test record batches with each compression type, compress sample record data using each algorithm, and attempt to decode them with the UNFIXED code. Observe that snappy/lz4/zstd batches produce incorrect record data while gzip works correctly.

**Test Cases**:
1. **Snappy Decompression Test**: Create a batch with compression=2 and snappy-compressed records (will fail on unfixed code)
2. **LZ4 Decompression Test**: Create a batch with compression=3 and lz4-compressed records (will fail on unfixed code)
3. **Zstd Decompression Test**: Create a batch with compression=4 and zstd-compressed records (will fail on unfixed code)
4. **Gzip Decompression Test**: Create a batch with compression=1 and gzip-compressed records (should pass on unfixed code)

**Expected Counterexamples**:
- Snappy/lz4/zstd batches will produce empty or corrupted record arrays
- The `decodeRecords()` function will fail to parse compressed bytes as valid record structures
- Possible symptoms: zero records decoded, panic from invalid varint encoding, or garbage data

### Fix Checking

**Goal**: Verify that for all inputs where the bug condition holds, the fixed function produces the expected behavior.

**Pseudocode:**
```
FOR ALL recordBatch WHERE isBugCondition(recordBatch) DO
result := decodeRecordBatches_fixed(recordBatch)
ASSERT result.Records is correctly decoded
ASSERT result.Records.length > 0
ASSERT result.Records[0].Value is valid decompressed data
END FOR
```

### Preservation Checking

**Goal**: Verify that for all inputs where the bug condition does NOT hold, the fixed function produces the same result as the original function.

**Pseudocode:**
```
FOR ALL recordBatch WHERE NOT isBugCondition(recordBatch) DO
ASSERT decodeRecordBatches_original(recordBatch) = decodeRecordBatches_fixed(recordBatch)
END FOR
```

**Testing Approach**: Property-based testing is recommended for preservation checking because:
- It generates many test cases automatically across the input domain
- It catches edge cases that manual unit tests might miss
- It provides strong guarantees that behavior is unchanged for all non-buggy inputs

**Test Plan**: Observe behavior on UNFIXED code first for uncompressed and gzip-compressed batches, then write property-based tests capturing that behavior.

**Test Cases**:
1. **Uncompressed Preservation**: Observe that compression=0 batches decode correctly on unfixed code, then verify this continues after fix
2. **Gzip Preservation**: Observe that compression=1 batches decompress and decode correctly on unfixed code, then verify this continues after fix
3. **Batch Header Preservation**: Observe that all batch header fields are parsed correctly on unfixed code, then verify this continues after fix
4. **Error Handling Preservation**: Observe that gzip decompression errors are handled gracefully on unfixed code, then verify this continues after fix

### Unit Tests

- Test snappy decompression with valid compressed data
- Test lz4 decompression with valid compressed data
- Test zstd decompression with valid compressed data
- Test that uncompressed batches continue to work
- Test that gzip batches continue to work
- Test error handling for invalid compressed data

### Property-Based Tests

- Generate random record batches with each compression type and verify correct decompression
- Generate random uncompressed and gzip batches and verify preservation of existing behavior
- Test that all batch header fields are preserved across compression types

### Integration Tests

- Test full Kafka message flow with snappy-compressed batches
- Test full Kafka message flow with lz4-compressed batches
- Test full Kafka message flow with zstd-compressed batches
- Test mixed batches with different compression types in the same response
- Test that existing gzip and uncompressed flows continue to work end-to-end
74 changes: 74 additions & 0 deletions .kiro/specs/kafka-compression-support-fix/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Implementation Plan

- [x] 1. Write bug condition exploration test
- **Property 1: Fault Condition** - Unsupported Compression Types Fail to Decompress
- **CRITICAL**: This test MUST FAIL on unfixed code - failure confirms the bug exists
- **DO NOT attempt to fix the test or the code when it fails**
- **NOTE**: This test encodes the expected behavior - it will validate the fix when it passes after implementation
- **GOAL**: Surface counterexamples that demonstrate the bug exists
- **Scoped PBT Approach**: Scope the property to concrete failing cases - record batches with compression types 2 (snappy), 3 (lz4), and 4 (zstd)
- Test that decodeRecordBatches correctly decompresses and decodes records for compression types 2, 3, and 4
- Generate test record batches with snappy, lz4, and zstd compression containing known record data
- Assert that decoded records match the original uncompressed data (from Expected Behavior in design)
- Run test on UNFIXED code
- **EXPECTED OUTCOME**: Test FAILS (this is correct - it proves the bug exists)
- Document counterexamples found (e.g., "snappy-compressed batch returns corrupted records", "lz4-compressed batch fails to decode")
- Mark task complete when test is written, run, and failure is documented
- _Requirements: 1.1, 1.2, 1.3, 2.1, 2.2, 2.3, 2.4_

- [ ] 2. Write preservation property tests (BEFORE implementing fix)
- **Property 2: Preservation** - Existing Compression Behavior Unchanged
- **IMPORTANT**: Follow observation-first methodology
- Observe behavior on UNFIXED code for non-buggy inputs (compression types 0 and 1)
- Test compression type 0 (none): records decode correctly without decompression
- Test compression type 1 (gzip): records decompress with gzip and decode correctly
- Test gzip decompression error handling: malformed gzip data is handled gracefully
- Test RecordBatch field population: all fields (BaseOffset, BatchLength, Compression, TimestampType, FirstTimestamp, MaxTimestamp, ProducerID, ProducerEpoch, BaseSequence, Records) are populated correctly
- Write property-based tests capturing observed behavior patterns from Preservation Requirements
- Property-based testing generates many test cases for stronger guarantees
- Run tests on UNFIXED code
- **EXPECTED OUTCOME**: Tests PASS (this confirms baseline behavior to preserve)
- Mark task complete when tests are written, run, and passing on unfixed code
- _Requirements: 3.1, 3.2, 3.3, 3.4_

- [ ] 3. Fix for unsupported Kafka compression types (snappy, lz4, zstd)

- [ ] 3.1 Add compression library imports
- Import "github.com/golang/snappy" for snappy decompression
- Import "github.com/pierrec/lz4/v4" for lz4 decompression
- Import "github.com/klauspost/compress/zstd" for zstd decompression
- _Bug_Condition: isBugCondition(batch) where batch.compression ∈ {2, 3, 4}_
- _Expected_Behavior: For compression type 2, decompress using snappy; for type 3, decompress using lz4; for type 4, decompress using zstd; handle decompression errors gracefully_
- _Preservation: Compression types 0 and 1 continue to work as before; gzip error handling unchanged; RecordBatch fields populated correctly_
- _Requirements: 2.1, 2.2, 2.3, 2.4, 3.1, 3.2, 3.3, 3.4_

- [ ] 3.2 Extend decompression logic in decodeRecordBatches
- Add case for compression == 2 (snappy): use snappy.Decode to decompress recordsData
- Add case for compression == 3 (lz4): use lz4.NewReader to decompress recordsData
- Add case for compression == 4 (zstd): use zstd.NewReader to decompress recordsData
- Handle decompression errors gracefully for all new compression types (similar to gzip)
- Ensure decompressed data replaces recordsData before calling decodeRecords
- _Bug_Condition: isBugCondition(batch) where batch.compression ∈ {2, 3, 4}_
- _Expected_Behavior: For compression type 2, decompress using snappy; for type 3, decompress using lz4; for type 4, decompress using zstd; handle decompression errors gracefully_
- _Preservation: Compression types 0 and 1 continue to work as before; gzip error handling unchanged; RecordBatch fields populated correctly_
- _Requirements: 2.1, 2.2, 2.3, 2.4, 3.1, 3.2, 3.3, 3.4_

- [ ] 3.3 Verify bug condition exploration test now passes
- **Property 1: Expected Behavior** - Unsupported Compression Types Now Decompress Correctly
- **IMPORTANT**: Re-run the SAME test from task 1 - do NOT write a new test
- The test from task 1 encodes the expected behavior
- When this test passes, it confirms the expected behavior is satisfied
- Run bug condition exploration test from step 1
- **EXPECTED OUTCOME**: Test PASSES (confirms bug is fixed)
- _Requirements: 2.1, 2.2, 2.3, 2.4_

- [ ] 3.4 Verify preservation tests still pass
- **Property 2: Preservation** - Existing Compression Behavior Unchanged
- **IMPORTANT**: Re-run the SAME tests from task 2 - do NOT write new tests
- Run preservation property tests from step 2
- **EXPECTED OUTCOME**: Tests PASS (confirms no regressions)
- Confirm all tests still pass after fix (no regressions)
- _Requirements: 3.1, 3.2, 3.3, 3.4_

- [ ] 4. Checkpoint - Ensure all tests pass
- Ensure all tests pass, ask the user if questions arise.
1 change: 1 addition & 0 deletions .kiro/specs/kafka-pipeline/.config.kiro
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"specId": "bfb19c0e-1133-41d5-b485-3c5205b34851", "workflowType": "requirements-first", "specType": "feature"}
Loading