A high-performance publish/subscribe messaging server built with Rust, featuring real-time WebSocket communication and a web-based monitoring interface.
- High Performance: Built with Axum and Tokio for async I/O
- In-Memory Database: SQLite in-memory mode by default (configurable)
- Automatic Data Purging: Prevents memory growth with size and time-based limits
- Real-time Monitoring: WebSocket-powered dashboards
- Embedded Assets: Single binary with all web assets included
- Docker Support: Multi-stage build for minimal image size
- Health Checks: Built-in health endpoint
# Start the server
docker-compose up -d
# View logs
docker-compose logs -f
# Stop the server
docker-compose downThe server will be available at http://localhost:5000
# Build the image
docker build -t pubsub-server .
# Run the container
docker run -d -p 5000:5000 --name pubsub-server pubsub-server
# View logs
docker logs -f pubsub-server
# Stop and remove
docker stop pubsub-server && docker rm pubsub-server# Build release
cargo build --release
# Run the server
./target/release/pubsub_server# Show available commands
make help
# Build and run in release mode
make dev
# Build and run in debug mode
make run
# Stop the server
make stop
# Kill any process using port 5000 (fixes "Address already in use" error)
make kill-port
# Clean restart
make fresh
make dev
# Run Python Socket.IO demo client
make demoDATABASE_FILE: Database file path (default::memory:)RUST_LOG: Logging level (default:info)
To use a persistent database instead of in-memory:
With Docker Compose:
Edit docker-compose.yml and uncomment the volume and DATABASE_FILE sections:
environment:
DATABASE_FILE: /data/pubsub.db
volumes:
- ./data:/dataWith Docker:
docker run -d -p 5000:5000 \
-v $(pwd)/data:/data \
-e DATABASE_FILE=/data/pubsub.db \
pubsub-serverWith Cargo:
DATABASE_FILE=pubsub.db ./target/release/pubsub_serverThe server automatically purges old data to prevent unbounded memory growth:
- Max Messages: 10,000
- Max Consumptions: 10,000
- Max Age: 24 hours
- Purge Interval: Every 30 minutes
These limits are applied with OR logic - data is deleted if it exceeds EITHER the count limit OR the age limit.
Configuration constants are in src/broker.rs:
const MAX_MESSAGES: i64 = 10_000;
const MAX_CONSUMPTIONS: i64 = 10_000;
const MAX_AGE_HOURS: f64 = 24.0;
const PURGE_INTERVAL_MINUTES: u64 = 30;POST /publish- Publish a message to a topicGET /clients- List connected clientsGET /messages- Get recent messages (cached, 2s TTL)GET /consumptions- Get consumption history (cached, 2s TTL)GET /graph/state- Get graph state for visualization (cached, 2s TTL)GET /health- Health check endpoint
GET /ws- WebSocket endpoint for real-time subscriptions
- Socket.IO endpoint at root (
/) for easy client integration - Supports Python, JavaScript, and other Socket.IO clients
http://localhost:5000/control-panel.html- Main control panelhttp://localhost:5000/activity-map.html- Activity visualizationhttp://localhost:5000/circular-graph.html- Circular graph view
curl -X POST http://localhost:5000/publish \
-H "Content-Type: application/json" \
-d '{
"topic": "events",
"message_id": "msg-001",
"message": {"text": "Hello World"},
"producer": "my-producer"
}'import socketio
import json
sio = socketio.Client()
@sio.event
def connect():
print("Connected!")
# Subscribe to topics
sio.emit('subscribe', {
'consumer': 'my-consumer',
'topics': ['events']
})
@sio.event
def message(data):
# Parse message
if isinstance(data, str):
msg = json.loads(data)
else:
msg = data
print(f"Received: {msg}")
# Send consumption acknowledgment (required for control panel tracking)
sio.emit('consumed', {
'consumer': 'my-consumer',
'topic': msg['topic'],
'message_id': msg['message_id'],
'message': msg['message']
})
sio.connect('http://localhost:5000')
sio.wait()Install dependencies:
pip3 install --break-system-packages python-socketioRun the included demo:
make democurl http://localhost:5000/health- Multi-stage Docker build: Separates build and runtime for minimal image size
- Batch writes: Database operations batched every 20ms or 500 commands
- Query caching: 2-second TTL cache for expensive queries
- Non-blocking purge: Background task for data cleanup
- Single binary: All assets embedded using
rust-embed
.
├── Cargo.toml
├── Cargo.lock
├── Dockerfile
├── docker-compose.yml
├── .dockerignore
├── src/
│ ├── main.rs # Entry point and routing
│ ├── broker.rs # Core pub/sub logic
│ ├── handlers.rs # HTTP handlers
│ ├── websocket.rs # WebSocket handling
│ ├── database.rs # Database initialization
│ ├── models.rs # Data structures
│ ├── cache.rs # Query cache
│ ├── app_state.rs # Shared state
│ └── embedded.rs # Asset embedding
├── migrations/
│ └── 001_add_message_id_and_producer.sql
├── static/ # CSS/JS files
└── *.html # Web interfaces
- axum: Web framework
- tokio: Async runtime
- sqlx: Database access
- socketio: WebSocket support
- rust-embed: Asset embedding
- serde: Serialization
MIT