From 42d2933f99744875f051576874cff5b32e9d1864 Mon Sep 17 00:00:00 2001 From: MahnoKropotkinvich Date: Tue, 24 Dec 2024 17:57:01 +0800 Subject: [PATCH] fix bugs for not able to recycle --- chan.go | 1 + conn.go | 8 ++++-- ds.go | 82 ++++++++++++++++++++++++++++++++++------------------- fsm.go | 2 +- typeconv.go | 3 -- 5 files changed, 61 insertions(+), 35 deletions(-) diff --git a/chan.go b/chan.go index ef76f9f..d7a85cf 100644 --- a/chan.go +++ b/chan.go @@ -12,3 +12,4 @@ var CriticalSignalChan = make(chan MCState) var DaemonChanRX = make(chan struct{}) var DaemonChanTX = make(chan struct{}) +var NullChan = make(chan struct{}) diff --git a/conn.go b/conn.go index 393dcfd..6debf8b 100644 --- a/conn.go +++ b/conn.go @@ -25,7 +25,7 @@ type signalChan chan MCState var signalChanChan = make(chan signalChan) // 单条管道->多条管道派发器 goroutine only -var clientSignalChan = NewDynamicMultiChan[MCState](false) +var clientSignalChan = NewDynamicMultiChan[MCState](false, 2) func bridge() { for { @@ -37,6 +37,7 @@ func handle(clientOriginal net.Conn) { client := timeoutConn{clientOriginal} defer clientOriginal.Close() queryChan := make(QueryChan) + defer close(queryChan) QueryChanChan <- queryChan proceed := func() { server, err := net.Dial("tcp", "127.0.0.1:25565") @@ -65,7 +66,9 @@ func handle(clientOriginal net.Conn) { wg.Wait() GetLogger().Infof("Connection from %s closed", clientOriginal.RemoteAddr()) } - switch state { + queryChan <- STOPPED + st, _ := <-queryChan + switch st { case RUNNING: CntChan <- INCREASE defer func() { CntChan <- DECREASE }() @@ -76,6 +79,7 @@ func handle(clientOriginal net.Conn) { CntChan <- INCREASE defer func() { CntChan <- DECREASE }() curChan := make(chan MCState) + defer close(curChan) clientSignalChan.Add(curChan) state, _ := <-curChan if state == RUNNING { diff --git a/ds.go b/ds.go index 932df9b..37ed552 100644 --- a/ds.go +++ b/ds.go @@ -76,7 +76,7 @@ type DynamicMultiChan[T constraints.Ordered] struct { modify chan modify_t[T] } -func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T] { +func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMultiChan[T] { ret := &DynamicMultiChan[T]{ TX: make(chan T), RX: make(chan T), @@ -84,6 +84,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T] reply: reply, modify: make(chan modify_t[T]), } + mode := m reloadTX := make(chan struct{}) reloadRX := make(chan struct{}) used := make(map[int]bool) @@ -91,7 +92,6 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T] delta := -1 list := make([]chan T, 0) selectCases := make([]reflect.SelectCase, 1) - go func() { for { modification, _ := <-ret.modify @@ -99,14 +99,19 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T] id := modification.id ch := modification.ch reloadTX <- struct{}{} - reloadTX <- struct{}{} - <-reloadRX + if mode == 2 { + reloadTX <- struct{}{} + } <-reloadRX // ok it's safe to do operation + if mode == 2 { + <-reloadRX + } switch op { case ADD: nelem := reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch), + Send: reflect.Value{}, } if len(unused) == 0 { delta = len(list) @@ -123,16 +128,27 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T] used[which] = true delete(unused, which) list[which] = ch - selectCases[which] = nelem + selectCases[which+1] = nelem } case DELETE: delete(used, id) unused[id] = true + list[id] = nil + selectCases[id+1] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(NullChan), + Send: reflect.Value{}, + } + //deleted <- struct{}{} // <-reloadTX //extra communication } reloadTX <- struct{}{} - reloadTX <- struct{}{} //I'm finished + if mode == 2 { + reloadTX <- struct{}{} //I'm finished + } <-reloadRX - <-reloadRX //ok I'll cleanup + if mode == 2 { + <-reloadRX //ok I'll cleanup + } delta = -1 } }() @@ -153,45 +169,53 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T] // ret.used.Push(id) // // delete(used, id) - ret.modify <- modify_t[T]{DELETE, id, nil} + ret.modify <- modify_t[T]{DELETE, id - 1, nil} + <-reloadTX + reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reloadTX //waiting for you finished + reloadRX <- struct{}{} //I've read all deltas, you can release them continue } - if id != 0 { + if mode == 1 && id != 0 { ret.RX <- To[T](recv) // possible for below to send to Chan? - if reply { + if ret.reply { msg, _ := <-ret.TX selectCases[id].Dir = reflect.SelectSend selectCases[id].Send = reflect.ValueOf(msg) prevId = id } - } else { + } else if id == 0 { reloadRX <- struct{}{} //I'm currently not dealing with other chan <-reloadTX //waiting for you finished reloadRX <- struct{}{} //I've read all deltas, you can release them } } }() - go func() { - msgList := make([]T, 0) - for { - select { - case <-reloadTX: - reloadRX <- struct{}{} //I'm currently not dealing with other chan - <-reloadTX //waiting for you finished - if delta != -1 { - for _, x := range msgList { - list[delta] <- x + if mode == 2 { + go func() { + msgList := make([]T, 0) + for { + select { + case <-reloadTX: + reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reloadTX //waiting for you finished + if delta != -1 { + for _, x := range msgList { + list[delta] <- x + } } + reloadRX <- struct{}{} //I've read all deltas, you can release them + case msg, _ := <-ret.TX: + for id, ch := range list { + if used[id] { + ch <- msg + } + } + msgList = append(msgList, msg) } - reloadRX <- struct{}{} //I've read all deltas, you can release them - case msg, _ := <-ret.TX: - for _, ch := range list { - ch <- msg - } - msgList = append(msgList, msg) } - } - }() + }() + } return ret } func (self DynamicMultiChan[T]) IsReply() bool { diff --git a/fsm.go b/fsm.go index d6dbbf6..827a77c 100644 --- a/fsm.go +++ b/fsm.go @@ -182,7 +182,7 @@ func handleState() { //goroutine only, handles both write and read // } // go queryThread() logger.Debug("handleState(): ready") - multiChan := NewDynamicMultiChan[MCState](true) + multiChan := NewDynamicMultiChan[MCState](true, 1) for { select { case nchan, _ := <-QueryChanChan: diff --git a/typeconv.go b/typeconv.go index 1e508e3..cc85f3d 100644 --- a/typeconv.go +++ b/typeconv.go @@ -7,9 +7,6 @@ import ( ) func To[T constraints.Ordered](x reflect.Value) T { - if x.Type() == reflect.TypeOf(*new(T)) { - panic("To(): type does not match") - } ret, ok := x.Interface().(T) if !ok { panic("To(): doesn't work")