tried simplified protocol, testing

This commit is contained in:
MahnoKropotkinvich 2024-12-25 18:53:10 +08:00
parent d7c0e9a26b
commit 2174afc82c
3 changed files with 31 additions and 40 deletions

10
conn.go
View File

@ -33,7 +33,7 @@ func bridge() {
clientSignalChan.TX <- msg
}
}
func handle(clientOriginal net.Conn) {
func handleConn(clientOriginal net.Conn) {
client := timeoutConn{clientOriginal}
defer clientOriginal.Close()
queryChan := make(QueryChan)
@ -66,18 +66,16 @@ func handle(clientOriginal net.Conn) {
wg.Wait()
GetLogger().Infof("Connection from %s closed", clientOriginal.RemoteAddr())
}
CntChan <- INCREASE
defer func() { CntChan <- DECREASE }()
queryChan <- STOPPED
st, _ := <-queryChan
switch st {
case RUNNING:
CntChan <- INCREASE
defer func() { CntChan <- DECREASE }()
proceed()
case STOPPED, WAITING:
// client.Write([]byte("Server not ready!"))
GetLogger().Infof("Connection queued, server currently at %s state", stateToStr[state])
CntChan <- INCREASE
defer func() { CntChan <- DECREASE }()
curChan := make(chan MCState)
defer close(curChan)
clientSignalChan.Add(curChan)
@ -106,6 +104,6 @@ func Listen() {
}
//conn.SetReadDeadline(time.Now().Add(time.Duration(config.ConnectTimeout) * time.Second))
GetLogger().Infof("New connection from %s", conn.RemoteAddr())
go handle(conn)
go handleConn(conn)
}
}

57
ds.go
View File

@ -79,26 +79,22 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
modify: make(chan modify_t[T]),
}
mode := m
reloadTX := make(chan struct{})
reloadRX := make(chan struct{})
reload := make(chan struct{})
used := make(map[int]bool)
unused := make(map[int]bool)
delta := -1
// delta := -1
list := make([]chan T, 0)
selectCases := make([]reflect.SelectCase, 1)
viewCnt := 0
addChan := make(chan int, 1)
go func() {
for {
modification, _ := <-ret.modify
op := modification.op
id := modification.id
ch := modification.ch
reloadTX <- struct{}{}
if mode == 2 {
reloadTX <- struct{}{}
}
<-reloadRX // ok it's safe to do operation
if mode == 2 {
<-reloadRX
for range viewCnt {
reload <- struct{}{}
}
switch op {
case ADD:
@ -108,7 +104,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
Send: reflect.Value{},
}
if len(unused) == 0 {
delta = len(list)
addChan <- len(list)
used[len(list)] = true
list = append(list, ch)
selectCases = append(selectCases, nelem)
@ -118,7 +114,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
which = x
break
}
delta = which
addChan <- which
used[which] = true
delete(unused, which)
list[which] = ch
@ -135,21 +131,17 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
}
//deleted <- struct{}{} // <-reloadTX //extra communication
}
reloadTX <- struct{}{}
if mode == 2 {
reloadTX <- struct{}{} //I'm finished
for range viewCnt {
reload <- struct{}{}
}
<-reloadRX
if mode == 2 {
<-reloadRX //ok I'll cleanup
}
delta = -1
}
}()
go func() {
viewCnt++
selectCases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(reloadTX),
Chan: reflect.ValueOf(reload),
}
prevId := -1
for {
@ -164,10 +156,10 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
//
// delete(used, id)
ret.modify <- modify_t[T]{DELETE, id - 1, nil}
<-reloadTX
reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished
reloadRX <- struct{}{} //I've read all deltas, you can release them
<-reload
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reload //waiting for you finished
// reloadRX <- struct{}{} //I've read all deltas, you can release them
continue
}
if mode == 1 && id != 0 {
@ -179,9 +171,9 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
prevId = id
}
} else if id == 0 {
reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished
reloadRX <- struct{}{} //I've read all deltas, you can release them
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reload //waiting for you finished
// reloadRX <- struct{}{} //I've read all deltas, you can release them
}
}
}()
@ -190,15 +182,16 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
msgList := make([]T, 0)
for {
select {
case <-reloadTX:
reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished
case <-reload:
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reload //waiting for you finished
delta, _ := <-addChan
if delta != -1 {
for _, x := range msgList {
list[delta] <- x
}
}
reloadRX <- struct{}{} //I've read all deltas, you can release them
// reloadRX <- struct{}{} //I've read all deltas, you can release them
case msg, _ := <-ret.TX:
for id, ch := range list {
if used[id] {

4
fsm.go
View File

@ -137,7 +137,7 @@ func handleState() { //goroutine only, handles both write and read
handleWaitingToRunning()
//case RUNNING:
default:
panic("handleState():written wrong!" + stateToStr[state])
//panic("handleState():written wrong!" + stateToStr[state])
}
case EMPTY:
switch state {
@ -145,7 +145,7 @@ func handleState() { //goroutine only, handles both write and read
handleRunningToWaiting()
//case BOOTING:
default:
panic("handleState():written wrong!" + stateToStr[state])
//panic("handleState():written wrong!" + stateToStr[state])
}
}
case <-multiChan.RX: