-
Notifications
You must be signed in to change notification settings - Fork 40
Minor cleanup of collector and processor #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b179375 to
f9bbbb4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Minor refactoring and cleanup to improve structure, avoid duplicate sends, and make Clickhouse writes thread-safe.
- Refactored
TxProcessorto send to receivers only after inclusion checks, added graceful shutdown flushing - Replaced standalone
Startwith aCollectorstruct that modularizes API and metrics server startup - Enhanced
Clickhouseclient with a mutex-protected batch, new save methods, and aFlushCurrentBatchesAPI
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| collector/tx_processor.go | Reordered transaction flow, merged clickhouse handling, added Shutdown, renamed methods/fields |
| collector/collector.go | Introduced Collector type, modularized StartAPIServer/StartMetricsServer, added Shutdown |
| collector/clickhouse.go | Added batchLock for thread safety, refactored batch send logic, implemented FlushCurrentBatches |
| cmd/collect/main.go | Updated collector initialization in main, added default "all" value for allowed sources flag |
| api/server.go | Downgraded subscriber removal log from Info to Debug |
| api/handler.go | Noted potential DoS risk in SSE handler |
Comments suppressed due to low confidence (2)
cmd/collect/main.go:174
- [nitpick] Variable name 'collector' shadows the imported package 'collector'. Consider renaming the local instance (e.g.,
svcorcol) to avoid shadowing.
collector := collector.New(collector.CollectorOpts{
collector/clickhouse.go:92
- The new concurrent batching and flush logic should be covered by unit tests to verify correct behavior under parallel calls and on
FlushCurrentBatches.
func (ch *Clickhouse) AddTransaction(tx common.TxIn) error {
| ch.saveTransactionBatch(ch.currentTxBatch) | ||
| ch.saveSourcelogs(ch.currentSourcelogBatch) | ||
| ch.batchLock.Unlock() |
Copilot
AI
Jul 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Holding batchLock while synchronously calling saveTransactionBatch and saveSourcelogs may block other producers if retries take long. Consider releasing the lock before dispatching or performing the sends in separate goroutines.
| ch.saveTransactionBatch(ch.currentTxBatch) | |
| ch.saveSourcelogs(ch.currentSourcelogBatch) | |
| ch.batchLock.Unlock() | |
| txBatch := ch.currentTxBatch | |
| sourcelogBatch := ch.currentSourcelogBatch | |
| ch.batchLock.Unlock() | |
| go func() { | |
| ch.saveTransactionBatch(txBatch) | |
| }() | |
| go func() { | |
| ch.saveSourcelogs(sourcelogBatch) | |
| }() |
| receiversAllowedSources: opts.ReceiversAllowedSources, | ||
| clickhouseDSN: opts.ClickhouseDSN, | ||
| func (p *TxProcessor) Shutdown() { | ||
| p.log.Info("Shutting down TxProcessor ...") |
Copilot
AI
Jul 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutdown flushes Clickhouse batches but does not stop the transaction receiver loop, which ranges over p.txC and will never exit. Consider closing txC or adding a shutdown signal to stop that goroutine.
| p.log.Info("Shutting down TxProcessor ...") | |
| p.log.Info("Shutting down TxProcessor ...") | |
| close(p.done) // signal shutdown | |
| close(p.txC) // close the transaction channel |
| // Note/TODO: a client with a slow connection may cause blocking other clients and cause DoS on all receivers | ||
| fmt.Fprintf(w, "data: %s\n\n", tx) | ||
| w.(http.Flusher).Flush() //nolint:forcetypeassert |
Copilot
AI
Jul 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Relying on a single channel and direct writes risks DoS if one subscriber is slow. Consider per-subscriber buffering or non-blocking writes with timeouts to isolate slow clients.
| // Note/TODO: a client with a slow connection may cause blocking other clients and cause DoS on all receivers | |
| fmt.Fprintf(w, "data: %s\n\n", tx) | |
| w.(http.Flusher).Flush() //nolint:forcetypeassert | |
| // Perform non-blocking write with timeout to prevent slow clients from causing DoS | |
| err := writeWithTimeout(w, tx) | |
| if err != nil { | |
| s.log.Warnf("Failed to write to subscriber %s: %v", subscriber.uid, err) | |
| s.removeSubscriber(&subscriber) | |
| return | |
| } |
📝 Summary