From 2174afc82c9c12babbba3e0d9c6d0639f70adc9d Mon Sep 17 00:00:00 2001 From: MahnoKropotkinvich Date: Wed, 25 Dec 2024 18:53:10 +0800 Subject: [PATCH 1/2] tried simplified protocol, testing --- conn.go | 10 ++++------ ds.go | 57 +++++++++++++++++++++++++-------------------------------- fsm.go | 4 ++-- 3 files changed, 31 insertions(+), 40 deletions(-) diff --git a/conn.go b/conn.go index aa6a35a..be716a3 100644 --- a/conn.go +++ b/conn.go @@ -33,7 +33,7 @@ func bridge() { clientSignalChan.TX <- msg } } -func handle(clientOriginal net.Conn) { +func handleConn(clientOriginal net.Conn) { client := timeoutConn{clientOriginal} defer clientOriginal.Close() queryChan := make(QueryChan) @@ -66,18 +66,16 @@ func handle(clientOriginal net.Conn) { wg.Wait() GetLogger().Infof("Connection from %s closed", clientOriginal.RemoteAddr()) } + CntChan <- INCREASE + defer func() { CntChan <- DECREASE }() queryChan <- STOPPED st, _ := <-queryChan switch st { case RUNNING: - CntChan <- INCREASE - defer func() { CntChan <- DECREASE }() proceed() case STOPPED, WAITING: // client.Write([]byte("Server not ready!")) GetLogger().Infof("Connection queued, server currently at %s state", stateToStr[state]) - CntChan <- INCREASE - defer func() { CntChan <- DECREASE }() curChan := make(chan MCState) defer close(curChan) clientSignalChan.Add(curChan) @@ -106,6 +104,6 @@ func Listen() { } //conn.SetReadDeadline(time.Now().Add(time.Duration(config.ConnectTimeout) * time.Second)) GetLogger().Infof("New connection from %s", conn.RemoteAddr()) - go handle(conn) + go handleConn(conn) } } diff --git a/ds.go b/ds.go index ea4b423..6e98b90 100644 --- a/ds.go +++ b/ds.go @@ -79,26 +79,22 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti modify: make(chan modify_t[T]), } mode := m - reloadTX := make(chan struct{}) - reloadRX := make(chan struct{}) + reload := make(chan struct{}) used := make(map[int]bool) unused := make(map[int]bool) - delta := -1 + // delta := -1 list := make([]chan T, 0) selectCases := make([]reflect.SelectCase, 1) + viewCnt := 0 + addChan := make(chan int, 1) go func() { for { modification, _ := <-ret.modify op := modification.op id := modification.id ch := modification.ch - reloadTX <- struct{}{} - if mode == 2 { - reloadTX <- struct{}{} - } - <-reloadRX // ok it's safe to do operation - if mode == 2 { - <-reloadRX + for range viewCnt { + reload <- struct{}{} } switch op { case ADD: @@ -108,7 +104,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti Send: reflect.Value{}, } if len(unused) == 0 { - delta = len(list) + addChan <- len(list) used[len(list)] = true list = append(list, ch) selectCases = append(selectCases, nelem) @@ -118,7 +114,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti which = x break } - delta = which + addChan <- which used[which] = true delete(unused, which) list[which] = ch @@ -135,21 +131,17 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti } //deleted <- struct{}{} // <-reloadTX //extra communication } - reloadTX <- struct{}{} - if mode == 2 { - reloadTX <- struct{}{} //I'm finished + for range viewCnt { + reload <- struct{}{} } - <-reloadRX - if mode == 2 { - <-reloadRX //ok I'll cleanup - } - delta = -1 + } }() go func() { + viewCnt++ selectCases[0] = reflect.SelectCase{ Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(reloadTX), + Chan: reflect.ValueOf(reload), } prevId := -1 for { @@ -164,10 +156,10 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti // // delete(used, id) ret.modify <- modify_t[T]{DELETE, id - 1, nil} - <-reloadTX - reloadRX <- struct{}{} //I'm currently not dealing with other chan - <-reloadTX //waiting for you finished - reloadRX <- struct{}{} //I've read all deltas, you can release them + <-reload + // reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reload //waiting for you finished + // reloadRX <- struct{}{} //I've read all deltas, you can release them continue } if mode == 1 && id != 0 { @@ -179,9 +171,9 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti prevId = id } } else if id == 0 { - reloadRX <- struct{}{} //I'm currently not dealing with other chan - <-reloadTX //waiting for you finished - reloadRX <- struct{}{} //I've read all deltas, you can release them + // reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reload //waiting for you finished + // reloadRX <- struct{}{} //I've read all deltas, you can release them } } }() @@ -190,15 +182,16 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti msgList := make([]T, 0) for { select { - case <-reloadTX: - reloadRX <- struct{}{} //I'm currently not dealing with other chan - <-reloadTX //waiting for you finished + case <-reload: + // reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reload //waiting for you finished + delta, _ := <-addChan if delta != -1 { for _, x := range msgList { list[delta] <- x } } - reloadRX <- struct{}{} //I've read all deltas, you can release them + // reloadRX <- struct{}{} //I've read all deltas, you can release them case msg, _ := <-ret.TX: for id, ch := range list { if used[id] { diff --git a/fsm.go b/fsm.go index 2e71ac5..a70c3c1 100644 --- a/fsm.go +++ b/fsm.go @@ -137,7 +137,7 @@ func handleState() { //goroutine only, handles both write and read handleWaitingToRunning() //case RUNNING: default: - panic("handleState():written wrong!" + stateToStr[state]) + //panic("handleState():written wrong!" + stateToStr[state]) } case EMPTY: switch state { @@ -145,7 +145,7 @@ func handleState() { //goroutine only, handles both write and read handleRunningToWaiting() //case BOOTING: default: - panic("handleState():written wrong!" + stateToStr[state]) + //panic("handleState():written wrong!" + stateToStr[state]) } } case <-multiChan.RX: From bc58c9df5bcce7b9b65b302040dfb9b7c4732b76 Mon Sep 17 00:00:00 2001 From: MahnoKropotkinvich Date: Thu, 26 Dec 2024 11:46:31 +0800 Subject: [PATCH 2/2] fix some bugs, there is yet another bug --- ds.go | 50 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 5 deletions(-) diff --git a/ds.go b/ds.go index 6e98b90..53e3b99 100644 --- a/ds.go +++ b/ds.go @@ -58,6 +58,42 @@ const ( DELETE ) +type Queue[T any] struct { + items []T +} + +func NewQueue[T any]() *Queue[T] { + return &Queue[T]{ + items: make([]T, 0), + } +} + +func (s *Queue[T]) Push(item T) { + s.items = append(s.items, item) +} +func (s *Queue[T]) Pop() T { + l := len(s.items) + if l == 0 { + panic("Stack: pop") + } + ret := s.items[0] + s.items = s.items[1:] + return ret +} +func (s Queue[T]) Peek() T { + l := len(s.items) + if l == 0 { + panic("Stack: peek") + } + return s.items[l] +} +func (s Queue[T]) IsEmpty() bool { + return len(s.items) == 0 +} +func (s Queue[T]) Length() int { + return len(s.items) +} + type modify_t[T constraints.Ordered] struct { op op id int @@ -86,7 +122,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti list := make([]chan T, 0) selectCases := make([]reflect.SelectCase, 1) viewCnt := 0 - addChan := make(chan int, 1) + addQue := NewQueue[int]() go func() { for { modification, _ := <-ret.modify @@ -104,7 +140,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti Send: reflect.Value{}, } if len(unused) == 0 { - addChan <- len(list) + addQue.Push(len(list)) used[len(list)] = true list = append(list, ch) selectCases = append(selectCases, nelem) @@ -114,7 +150,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti which = x break } - addChan <- which + addQue.Push(which) used[which] = true delete(unused, which) list[which] = ch @@ -179,14 +215,18 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti }() if mode == 2 { go func() { + viewCnt++ msgList := make([]T, 0) for { select { case <-reload: // reloadRX <- struct{}{} //I'm currently not dealing with other chan <-reload //waiting for you finished - delta, _ := <-addChan - if delta != -1 { + if addQue.Length() == 0 { + continue + } + for !addQue.IsEmpty() { + delta := addQue.Pop() for _, x := range msgList { list[delta] <- x }