fix bugs for not able to recycle

This commit is contained in:
MahnoKropotkinvich 2024-12-24 17:57:01 +08:00
parent d10e497d61
commit 42d2933f99
5 changed files with 61 additions and 35 deletions

View File

@ -12,3 +12,4 @@ var CriticalSignalChan = make(chan MCState)
var DaemonChanRX = make(chan struct{}) var DaemonChanRX = make(chan struct{})
var DaemonChanTX = make(chan struct{}) var DaemonChanTX = make(chan struct{})
var NullChan = make(chan struct{})

View File

@ -25,7 +25,7 @@ type signalChan chan MCState
var signalChanChan = make(chan signalChan) var signalChanChan = make(chan signalChan)
// 单条管道->多条管道派发器 goroutine only // 单条管道->多条管道派发器 goroutine only
var clientSignalChan = NewDynamicMultiChan[MCState](false) var clientSignalChan = NewDynamicMultiChan[MCState](false, 2)
func bridge() { func bridge() {
for { for {
@ -37,6 +37,7 @@ func handle(clientOriginal net.Conn) {
client := timeoutConn{clientOriginal} client := timeoutConn{clientOriginal}
defer clientOriginal.Close() defer clientOriginal.Close()
queryChan := make(QueryChan) queryChan := make(QueryChan)
defer close(queryChan)
QueryChanChan <- queryChan QueryChanChan <- queryChan
proceed := func() { proceed := func() {
server, err := net.Dial("tcp", "127.0.0.1:25565") server, err := net.Dial("tcp", "127.0.0.1:25565")
@ -65,7 +66,9 @@ func handle(clientOriginal net.Conn) {
wg.Wait() wg.Wait()
GetLogger().Infof("Connection from %s closed", clientOriginal.RemoteAddr()) GetLogger().Infof("Connection from %s closed", clientOriginal.RemoteAddr())
} }
switch state { queryChan <- STOPPED
st, _ := <-queryChan
switch st {
case RUNNING: case RUNNING:
CntChan <- INCREASE CntChan <- INCREASE
defer func() { CntChan <- DECREASE }() defer func() { CntChan <- DECREASE }()
@ -76,6 +79,7 @@ func handle(clientOriginal net.Conn) {
CntChan <- INCREASE CntChan <- INCREASE
defer func() { CntChan <- DECREASE }() defer func() { CntChan <- DECREASE }()
curChan := make(chan MCState) curChan := make(chan MCState)
defer close(curChan)
clientSignalChan.Add(curChan) clientSignalChan.Add(curChan)
state, _ := <-curChan state, _ := <-curChan
if state == RUNNING { if state == RUNNING {

82
ds.go
View File

@ -76,7 +76,7 @@ type DynamicMultiChan[T constraints.Ordered] struct {
modify chan modify_t[T] modify chan modify_t[T]
} }
func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T] { func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMultiChan[T] {
ret := &DynamicMultiChan[T]{ ret := &DynamicMultiChan[T]{
TX: make(chan T), TX: make(chan T),
RX: make(chan T), RX: make(chan T),
@ -84,6 +84,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T]
reply: reply, reply: reply,
modify: make(chan modify_t[T]), modify: make(chan modify_t[T]),
} }
mode := m
reloadTX := make(chan struct{}) reloadTX := make(chan struct{})
reloadRX := make(chan struct{}) reloadRX := make(chan struct{})
used := make(map[int]bool) used := make(map[int]bool)
@ -91,7 +92,6 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T]
delta := -1 delta := -1
list := make([]chan T, 0) list := make([]chan T, 0)
selectCases := make([]reflect.SelectCase, 1) selectCases := make([]reflect.SelectCase, 1)
go func() { go func() {
for { for {
modification, _ := <-ret.modify modification, _ := <-ret.modify
@ -99,14 +99,19 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T]
id := modification.id id := modification.id
ch := modification.ch ch := modification.ch
reloadTX <- struct{}{} reloadTX <- struct{}{}
reloadTX <- struct{}{} if mode == 2 {
<-reloadRX reloadTX <- struct{}{}
}
<-reloadRX // ok it's safe to do operation <-reloadRX // ok it's safe to do operation
if mode == 2 {
<-reloadRX
}
switch op { switch op {
case ADD: case ADD:
nelem := reflect.SelectCase{ nelem := reflect.SelectCase{
Dir: reflect.SelectRecv, Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch), Chan: reflect.ValueOf(ch),
Send: reflect.Value{},
} }
if len(unused) == 0 { if len(unused) == 0 {
delta = len(list) delta = len(list)
@ -123,16 +128,27 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T]
used[which] = true used[which] = true
delete(unused, which) delete(unused, which)
list[which] = ch list[which] = ch
selectCases[which] = nelem selectCases[which+1] = nelem
} }
case DELETE: case DELETE:
delete(used, id) delete(used, id)
unused[id] = true unused[id] = true
list[id] = nil
selectCases[id+1] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(NullChan),
Send: reflect.Value{},
}
//deleted <- struct{}{} // <-reloadTX //extra communication
} }
reloadTX <- struct{}{} reloadTX <- struct{}{}
reloadTX <- struct{}{} //I'm finished if mode == 2 {
reloadTX <- struct{}{} //I'm finished
}
<-reloadRX <-reloadRX
<-reloadRX //ok I'll cleanup if mode == 2 {
<-reloadRX //ok I'll cleanup
}
delta = -1 delta = -1
} }
}() }()
@ -153,45 +169,53 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T]
// ret.used.Push(id) // ret.used.Push(id)
// //
// delete(used, id) // delete(used, id)
ret.modify <- modify_t[T]{DELETE, id, nil} ret.modify <- modify_t[T]{DELETE, id - 1, nil}
<-reloadTX
reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished
reloadRX <- struct{}{} //I've read all deltas, you can release them
continue continue
} }
if id != 0 { if mode == 1 && id != 0 {
ret.RX <- To[T](recv) // possible for below to send to Chan? ret.RX <- To[T](recv) // possible for below to send to Chan?
if reply { if ret.reply {
msg, _ := <-ret.TX msg, _ := <-ret.TX
selectCases[id].Dir = reflect.SelectSend selectCases[id].Dir = reflect.SelectSend
selectCases[id].Send = reflect.ValueOf(msg) selectCases[id].Send = reflect.ValueOf(msg)
prevId = id prevId = id
} }
} else { } else if id == 0 {
reloadRX <- struct{}{} //I'm currently not dealing with other chan reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished <-reloadTX //waiting for you finished
reloadRX <- struct{}{} //I've read all deltas, you can release them reloadRX <- struct{}{} //I've read all deltas, you can release them
} }
} }
}() }()
go func() { if mode == 2 {
msgList := make([]T, 0) go func() {
for { msgList := make([]T, 0)
select { for {
case <-reloadTX: select {
reloadRX <- struct{}{} //I'm currently not dealing with other chan case <-reloadTX:
<-reloadTX //waiting for you finished reloadRX <- struct{}{} //I'm currently not dealing with other chan
if delta != -1 { <-reloadTX //waiting for you finished
for _, x := range msgList { if delta != -1 {
list[delta] <- x for _, x := range msgList {
list[delta] <- x
}
} }
reloadRX <- struct{}{} //I've read all deltas, you can release them
case msg, _ := <-ret.TX:
for id, ch := range list {
if used[id] {
ch <- msg
}
}
msgList = append(msgList, msg)
} }
reloadRX <- struct{}{} //I've read all deltas, you can release them
case msg, _ := <-ret.TX:
for _, ch := range list {
ch <- msg
}
msgList = append(msgList, msg)
} }
} }()
}() }
return ret return ret
} }
func (self DynamicMultiChan[T]) IsReply() bool { func (self DynamicMultiChan[T]) IsReply() bool {

2
fsm.go
View File

@ -182,7 +182,7 @@ func handleState() { //goroutine only, handles both write and read
// } // }
// go queryThread() // go queryThread()
logger.Debug("handleState(): ready") logger.Debug("handleState(): ready")
multiChan := NewDynamicMultiChan[MCState](true) multiChan := NewDynamicMultiChan[MCState](true, 1)
for { for {
select { select {
case nchan, _ := <-QueryChanChan: case nchan, _ := <-QueryChanChan:

View File

@ -7,9 +7,6 @@ import (
) )
func To[T constraints.Ordered](x reflect.Value) T { func To[T constraints.Ordered](x reflect.Value) T {
if x.Type() == reflect.TypeOf(*new(T)) {
panic("To(): type does not match")
}
ret, ok := x.Interface().(T) ret, ok := x.Interface().(T)
if !ok { if !ok {
panic("To(): doesn't work") panic("To(): doesn't work")