Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,11 @@ cpus = 1
[[services]]
internal_port = 8080
protocol = "tcp"
min_machines_running = 2
min_machines_running = 1
auto_start_machines = false
processes = ['app']

[[services.ports]]
handlers = ["tls"]
port = 443
tls_options = { "alpn" = ["h2"] }

[services.concurrency]
type = "connections"
hard_limit = 1000
soft_limit = 500
20 changes: 16 additions & 4 deletions syncer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func (s *PersistentSyncerServer) ListenChanges(request *proto.ListenChangesReque
pubkey := context.Value(middleware.USER_PUBKEY_CONTEXT_KEY).(string)
subscription := s.eventsManager.subscribe(pubkey)
defer s.eventsManager.unsubscribe(pubkey, subscription.id)

if err := stream.Send(&proto.Notification{}); err != nil {
return err
}
for {
select {
case notification, ok := <-subscription.eventsChan:
Expand Down Expand Up @@ -159,6 +163,7 @@ type subscription struct {
}

type eventsManager struct {
sync.Mutex
globalIDs int64
streams map[string][]*subscription
msgChan chan interface{}
Expand All @@ -176,13 +181,15 @@ func newEventsManager() *eventsManager {
func (c *eventsManager) start(quitChan chan struct{}) {
go func() {
for {
log.Printf("eventsManager select started\n")
select {
case msg := <-c.msgChan:
if s, ok := msg.(*subscription); ok {
c.streams[s.pubkey] = append(c.streams[s.pubkey], s)
s.eventsChan <- &proto.Notification{}
log.Printf("eventsManager: new subscription for user %s: id - %d\n", s.pubkey, s.id)
}
if s, ok := msg.(*unsubscribe); ok {
log.Printf("eventsManager: unsubscribing user %s: id - %d\n", s.pubkey, s.id)
var newSubs []*subscription
for _, sub := range c.streams[s.pubkey] {
if sub.id != s.id {
Expand All @@ -197,14 +204,16 @@ func (c *eventsManager) start(quitChan chan struct{}) {
}
}
if s, ok := msg.(*notifyChange); ok {
log.Printf("eventsManager: notifying change for user %v\n", s.pubkey)
for _, sub := range c.streams[s.pubkey] {
sub.eventsChan <- &proto.Notification{ClientId: s.clientId}
}
}

case <-quitChan:
log.Printf("eventsManager: quitChan received\n")
return
}
log.Printf("eventsManager select finished. number of subscriptions = %v\n", len(c.streams))
}
}()
}
Expand All @@ -214,11 +223,14 @@ func (c *eventsManager) notifyChange(pubkey string, clientId *string) {
}

func (c *eventsManager) subscribe(pubkey string) *subscription {
eventsChan := make(chan *proto.Notification)
eventsChan := make(chan *proto.Notification, 10)
c.Lock()
c.globalIDs += 1
s := &subscription{pubkey: pubkey, eventsChan: eventsChan, id: c.globalIDs}
c.Unlock()

c.msgChan <- s
log.Printf("New connection for user %s: id - %d\n", pubkey, c.globalIDs)
log.Printf("New connection for user %s: id - %d\n", pubkey, s.id)
return s
}

Expand Down