Skip to content

Conversation

@metachris
Copy link
Contributor

@metachris metachris commented Jul 3, 2025

📝 Summary

  • Making the code a little better structured and easier to understand and extend
  • Moved sending to tx receivers to after the sanity checks, which avoids sending duplicates and trash. The checks mostly need less than 100us (0.1ms), and at most up to 0.3ms (not a noticable delay).
  • Save transactions to Clickhouse after the inclusion check (instead of before), to avoid storing already included transactions

@metachris metachris requested review from Copilot, dvush and ilyaluk July 3, 2025 07:44

This comment was marked as outdated.

@metachris metachris requested a review from Copilot July 4, 2025 08:38

This comment was marked as outdated.

@metachris metachris force-pushed the cleanup branch 2 times, most recently from b179375 to f9bbbb4 Compare July 4, 2025 13:07
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Minor refactoring and cleanup to improve structure, avoid duplicate sends, and make Clickhouse writes thread-safe.

  • Refactored TxProcessor to send to receivers only after inclusion checks, added graceful shutdown flushing
  • Replaced standalone Start with a Collector struct that modularizes API and metrics server startup
  • Enhanced Clickhouse client with a mutex-protected batch, new save methods, and a FlushCurrentBatches API

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
collector/tx_processor.go Reordered transaction flow, merged clickhouse handling, added Shutdown, renamed methods/fields
collector/collector.go Introduced Collector type, modularized StartAPIServer/StartMetricsServer, added Shutdown
collector/clickhouse.go Added batchLock for thread safety, refactored batch send logic, implemented FlushCurrentBatches
cmd/collect/main.go Updated collector initialization in main, added default "all" value for allowed sources flag
api/server.go Downgraded subscriber removal log from Info to Debug
api/handler.go Noted potential DoS risk in SSE handler
Comments suppressed due to low confidence (2)

cmd/collect/main.go:174

  • [nitpick] Variable name 'collector' shadows the imported package 'collector'. Consider renaming the local instance (e.g., svc or col) to avoid shadowing.
	collector := collector.New(collector.CollectorOpts{

collector/clickhouse.go:92

  • The new concurrent batching and flush logic should be covered by unit tests to verify correct behavior under parallel calls and on FlushCurrentBatches.
func (ch *Clickhouse) AddTransaction(tx common.TxIn) error {

Comment on lines +239 to +241
ch.saveTransactionBatch(ch.currentTxBatch)
ch.saveSourcelogs(ch.currentSourcelogBatch)
ch.batchLock.Unlock()
Copy link

Copilot AI Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Holding batchLock while synchronously calling saveTransactionBatch and saveSourcelogs may block other producers if retries take long. Consider releasing the lock before dispatching or performing the sends in separate goroutines.

Suggested change
ch.saveTransactionBatch(ch.currentTxBatch)
ch.saveSourcelogs(ch.currentSourcelogBatch)
ch.batchLock.Unlock()
txBatch := ch.currentTxBatch
sourcelogBatch := ch.currentSourcelogBatch
ch.batchLock.Unlock()
go func() {
ch.saveTransactionBatch(txBatch)
}()
go func() {
ch.saveSourcelogs(sourcelogBatch)
}()

Copilot uses AI. Check for mistakes.
receiversAllowedSources: opts.ReceiversAllowedSources,
clickhouseDSN: opts.ClickhouseDSN,
func (p *TxProcessor) Shutdown() {
p.log.Info("Shutting down TxProcessor ...")
Copy link

Copilot AI Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutdown flushes Clickhouse batches but does not stop the transaction receiver loop, which ranges over p.txC and will never exit. Consider closing txC or adding a shutdown signal to stop that goroutine.

Suggested change
p.log.Info("Shutting down TxProcessor ...")
p.log.Info("Shutting down TxProcessor ...")
close(p.done) // signal shutdown
close(p.txC) // close the transaction channel

Copilot uses AI. Check for mistakes.
Comment on lines +44 to 46
// Note/TODO: a client with a slow connection may cause blocking other clients and cause DoS on all receivers
fmt.Fprintf(w, "data: %s\n\n", tx)
w.(http.Flusher).Flush() //nolint:forcetypeassert
Copy link

Copilot AI Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Relying on a single channel and direct writes risks DoS if one subscriber is slow. Consider per-subscriber buffering or non-blocking writes with timeouts to isolate slow clients.

Suggested change
// Note/TODO: a client with a slow connection may cause blocking other clients and cause DoS on all receivers
fmt.Fprintf(w, "data: %s\n\n", tx)
w.(http.Flusher).Flush() //nolint:forcetypeassert
// Perform non-blocking write with timeout to prevent slow clients from causing DoS
err := writeWithTimeout(w, tx)
if err != nil {
s.log.Warnf("Failed to write to subscriber %s: %v", subscriber.uid, err)
s.removeSubscriber(&subscriber)
return
}

Copilot uses AI. Check for mistakes.
@metachris metachris merged commit 4c38400 into main Jul 6, 2025
2 checks passed
@metachris metachris deleted the cleanup branch July 6, 2025 07:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants