tried simplified protocol, testing
This commit is contained in:
parent
426adf8d3a
commit
cbce5f7c95
10
conn.go
10
conn.go
@ -33,7 +33,7 @@ func bridge() {
|
||||
clientSignalChan.TX <- msg
|
||||
}
|
||||
}
|
||||
func handle(clientOriginal net.Conn) {
|
||||
func handleConn(clientOriginal net.Conn) {
|
||||
client := timeoutConn{clientOriginal}
|
||||
defer clientOriginal.Close()
|
||||
queryChan := make(QueryChan)
|
||||
@ -66,18 +66,16 @@ func handle(clientOriginal net.Conn) {
|
||||
wg.Wait()
|
||||
GetLogger().Infof("Connection from %s closed", clientOriginal.RemoteAddr())
|
||||
}
|
||||
CntChan <- INCREASE
|
||||
defer func() { CntChan <- DECREASE }()
|
||||
queryChan <- STOPPED
|
||||
st, _ := <-queryChan
|
||||
switch st {
|
||||
case RUNNING:
|
||||
CntChan <- INCREASE
|
||||
defer func() { CntChan <- DECREASE }()
|
||||
proceed()
|
||||
case STOPPED, WAITING:
|
||||
// client.Write([]byte("Server not ready!"))
|
||||
GetLogger().Infof("Connection queued, server currently at %s state", stateToStr[state])
|
||||
CntChan <- INCREASE
|
||||
defer func() { CntChan <- DECREASE }()
|
||||
curChan := make(chan MCState)
|
||||
defer close(curChan)
|
||||
clientSignalChan.Add(curChan)
|
||||
@ -106,6 +104,6 @@ func Listen() {
|
||||
}
|
||||
//conn.SetReadDeadline(time.Now().Add(time.Duration(config.ConnectTimeout) * time.Second))
|
||||
GetLogger().Infof("New connection from %s", conn.RemoteAddr())
|
||||
go handle(conn)
|
||||
go handleConn(conn)
|
||||
}
|
||||
}
|
||||
|
||||
57
ds.go
57
ds.go
@ -79,26 +79,22 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
|
||||
modify: make(chan modify_t[T]),
|
||||
}
|
||||
mode := m
|
||||
reloadTX := make(chan struct{})
|
||||
reloadRX := make(chan struct{})
|
||||
reload := make(chan struct{})
|
||||
used := make(map[int]bool)
|
||||
unused := make(map[int]bool)
|
||||
delta := -1
|
||||
// delta := -1
|
||||
list := make([]chan T, 0)
|
||||
selectCases := make([]reflect.SelectCase, 1)
|
||||
viewCnt := 0
|
||||
addChan := make(chan int, 1)
|
||||
go func() {
|
||||
for {
|
||||
modification, _ := <-ret.modify
|
||||
op := modification.op
|
||||
id := modification.id
|
||||
ch := modification.ch
|
||||
reloadTX <- struct{}{}
|
||||
if mode == 2 {
|
||||
reloadTX <- struct{}{}
|
||||
}
|
||||
<-reloadRX // ok it's safe to do operation
|
||||
if mode == 2 {
|
||||
<-reloadRX
|
||||
for range viewCnt {
|
||||
reload <- struct{}{}
|
||||
}
|
||||
switch op {
|
||||
case ADD:
|
||||
@ -108,7 +104,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
|
||||
Send: reflect.Value{},
|
||||
}
|
||||
if len(unused) == 0 {
|
||||
delta = len(list)
|
||||
addChan <- len(list)
|
||||
used[len(list)] = true
|
||||
list = append(list, ch)
|
||||
selectCases = append(selectCases, nelem)
|
||||
@ -118,7 +114,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
|
||||
which = x
|
||||
break
|
||||
}
|
||||
delta = which
|
||||
addChan <- which
|
||||
used[which] = true
|
||||
delete(unused, which)
|
||||
list[which] = ch
|
||||
@ -135,21 +131,17 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
|
||||
}
|
||||
//deleted <- struct{}{} // <-reloadTX //extra communication
|
||||
}
|
||||
reloadTX <- struct{}{}
|
||||
if mode == 2 {
|
||||
reloadTX <- struct{}{} //I'm finished
|
||||
for range viewCnt {
|
||||
reload <- struct{}{}
|
||||
}
|
||||
<-reloadRX
|
||||
if mode == 2 {
|
||||
<-reloadRX //ok I'll cleanup
|
||||
}
|
||||
delta = -1
|
||||
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
viewCnt++
|
||||
selectCases[0] = reflect.SelectCase{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(reloadTX),
|
||||
Chan: reflect.ValueOf(reload),
|
||||
}
|
||||
prevId := -1
|
||||
for {
|
||||
@ -164,10 +156,10 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
|
||||
//
|
||||
// delete(used, id)
|
||||
ret.modify <- modify_t[T]{DELETE, id - 1, nil}
|
||||
<-reloadTX
|
||||
reloadRX <- struct{}{} //I'm currently not dealing with other chan
|
||||
<-reloadTX //waiting for you finished
|
||||
reloadRX <- struct{}{} //I've read all deltas, you can release them
|
||||
<-reload
|
||||
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
|
||||
<-reload //waiting for you finished
|
||||
// reloadRX <- struct{}{} //I've read all deltas, you can release them
|
||||
continue
|
||||
}
|
||||
if mode == 1 && id != 0 {
|
||||
@ -179,9 +171,9 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
|
||||
prevId = id
|
||||
}
|
||||
} else if id == 0 {
|
||||
reloadRX <- struct{}{} //I'm currently not dealing with other chan
|
||||
<-reloadTX //waiting for you finished
|
||||
reloadRX <- struct{}{} //I've read all deltas, you can release them
|
||||
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
|
||||
<-reload //waiting for you finished
|
||||
// reloadRX <- struct{}{} //I've read all deltas, you can release them
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -190,15 +182,16 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
|
||||
msgList := make([]T, 0)
|
||||
for {
|
||||
select {
|
||||
case <-reloadTX:
|
||||
reloadRX <- struct{}{} //I'm currently not dealing with other chan
|
||||
<-reloadTX //waiting for you finished
|
||||
case <-reload:
|
||||
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
|
||||
<-reload //waiting for you finished
|
||||
delta, _ := <-addChan
|
||||
if delta != -1 {
|
||||
for _, x := range msgList {
|
||||
list[delta] <- x
|
||||
}
|
||||
}
|
||||
reloadRX <- struct{}{} //I've read all deltas, you can release them
|
||||
// reloadRX <- struct{}{} //I've read all deltas, you can release them
|
||||
case msg, _ := <-ret.TX:
|
||||
for id, ch := range list {
|
||||
if used[id] {
|
||||
|
||||
4
fsm.go
4
fsm.go
@ -137,7 +137,7 @@ func handleState() { //goroutine only, handles both write and read
|
||||
handleWaitingToRunning()
|
||||
//case RUNNING:
|
||||
default:
|
||||
panic("handleState():written wrong!" + stateToStr[state])
|
||||
//panic("handleState():written wrong!" + stateToStr[state])
|
||||
}
|
||||
case EMPTY:
|
||||
switch state {
|
||||
@ -145,7 +145,7 @@ func handleState() { //goroutine only, handles both write and read
|
||||
handleRunningToWaiting()
|
||||
//case BOOTING:
|
||||
default:
|
||||
panic("handleState():written wrong!" + stateToStr[state])
|
||||
//panic("handleState():written wrong!" + stateToStr[state])
|
||||
}
|
||||
}
|
||||
case <-multiChan.RX:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user