From 0a51fd7ef588088ff42c801987968d54a05ca979 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Wed, 12 Mar 2025 12:19:39 +0200 Subject: [PATCH 1/2] set number of machines to 1 and remove hard limit --- fly.toml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/fly.toml b/fly.toml index 83777a4..7630180 100644 --- a/fly.toml +++ b/fly.toml @@ -21,15 +21,11 @@ cpus = 1 [[services]] internal_port = 8080 protocol = "tcp" -min_machines_running = 2 +min_machines_running = 1 +auto_start_machines = false processes = ['app'] [[services.ports]] handlers = ["tls"] port = 443 tls_options = { "alpn" = ["h2"] } - -[services.concurrency] -type = "connections" -hard_limit = 1000 -soft_limit = 500 From 57660c2a8933b111ef7d428ec2dc48f432d70304 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Wed, 12 Mar 2025 12:19:59 +0200 Subject: [PATCH 2/2] don't send notification from event loop --- syncer_server.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/syncer_server.go b/syncer_server.go index 8871e18..68bf982 100644 --- a/syncer_server.go +++ b/syncer_server.go @@ -123,6 +123,10 @@ func (s *PersistentSyncerServer) ListenChanges(request *proto.ListenChangesReque pubkey := context.Value(middleware.USER_PUBKEY_CONTEXT_KEY).(string) subscription := s.eventsManager.subscribe(pubkey) defer s.eventsManager.unsubscribe(pubkey, subscription.id) + + if err := stream.Send(&proto.Notification{}); err != nil { + return err + } for { select { case notification, ok := <-subscription.eventsChan: @@ -159,6 +163,7 @@ type subscription struct { } type eventsManager struct { + sync.Mutex globalIDs int64 streams map[string][]*subscription msgChan chan interface{} @@ -176,13 +181,15 @@ func newEventsManager() *eventsManager { func (c *eventsManager) start(quitChan chan struct{}) { go func() { for { + log.Printf("eventsManager select started\n") select { case msg := <-c.msgChan: if s, ok := msg.(*subscription); ok { c.streams[s.pubkey] = append(c.streams[s.pubkey], s) - s.eventsChan <- &proto.Notification{} + log.Printf("eventsManager: new subscription for user %s: id - %d\n", s.pubkey, s.id) } if s, ok := msg.(*unsubscribe); ok { + log.Printf("eventsManager: unsubscribing user %s: id - %d\n", s.pubkey, s.id) var newSubs []*subscription for _, sub := range c.streams[s.pubkey] { if sub.id != s.id { @@ -197,14 +204,16 @@ func (c *eventsManager) start(quitChan chan struct{}) { } } if s, ok := msg.(*notifyChange); ok { + log.Printf("eventsManager: notifying change for user %v\n", s.pubkey) for _, sub := range c.streams[s.pubkey] { sub.eventsChan <- &proto.Notification{ClientId: s.clientId} } } - case <-quitChan: + log.Printf("eventsManager: quitChan received\n") return } + log.Printf("eventsManager select finished. number of subscriptions = %v\n", len(c.streams)) } }() } @@ -214,11 +223,14 @@ func (c *eventsManager) notifyChange(pubkey string, clientId *string) { } func (c *eventsManager) subscribe(pubkey string) *subscription { - eventsChan := make(chan *proto.Notification) + eventsChan := make(chan *proto.Notification, 10) + c.Lock() c.globalIDs += 1 s := &subscription{pubkey: pubkey, eventsChan: eventsChan, id: c.globalIDs} + c.Unlock() + c.msgChan <- s - log.Printf("New connection for user %s: id - %d\n", pubkey, c.globalIDs) + log.Printf("New connection for user %s: id - %d\n", pubkey, s.id) return s }