diff --git a/conn.go b/conn.go index aa6a35a..be716a3 100644 --- a/conn.go +++ b/conn.go @@ -33,7 +33,7 @@ func bridge() { clientSignalChan.TX <- msg } } -func handle(clientOriginal net.Conn) { +func handleConn(clientOriginal net.Conn) { client := timeoutConn{clientOriginal} defer clientOriginal.Close() queryChan := make(QueryChan) @@ -66,18 +66,16 @@ func handle(clientOriginal net.Conn) { wg.Wait() GetLogger().Infof("Connection from %s closed", clientOriginal.RemoteAddr()) } + CntChan <- INCREASE + defer func() { CntChan <- DECREASE }() queryChan <- STOPPED st, _ := <-queryChan switch st { case RUNNING: - CntChan <- INCREASE - defer func() { CntChan <- DECREASE }() proceed() case STOPPED, WAITING: // client.Write([]byte("Server not ready!")) GetLogger().Infof("Connection queued, server currently at %s state", stateToStr[state]) - CntChan <- INCREASE - defer func() { CntChan <- DECREASE }() curChan := make(chan MCState) defer close(curChan) clientSignalChan.Add(curChan) @@ -106,6 +104,6 @@ func Listen() { } //conn.SetReadDeadline(time.Now().Add(time.Duration(config.ConnectTimeout) * time.Second)) GetLogger().Infof("New connection from %s", conn.RemoteAddr()) - go handle(conn) + go handleConn(conn) } } diff --git a/ds.go b/ds.go index ea4b423..6e98b90 100644 --- a/ds.go +++ b/ds.go @@ -79,26 +79,22 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti modify: make(chan modify_t[T]), } mode := m - reloadTX := make(chan struct{}) - reloadRX := make(chan struct{}) + reload := make(chan struct{}) used := make(map[int]bool) unused := make(map[int]bool) - delta := -1 + // delta := -1 list := make([]chan T, 0) selectCases := make([]reflect.SelectCase, 1) + viewCnt := 0 + addChan := make(chan int, 1) go func() { for { modification, _ := <-ret.modify op := modification.op id := modification.id ch := modification.ch - reloadTX <- struct{}{} - if mode == 2 { - reloadTX <- struct{}{} - } - <-reloadRX // ok it's safe to do operation - if mode == 2 { - <-reloadRX + for range viewCnt { + reload <- struct{}{} } switch op { case ADD: @@ -108,7 +104,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti Send: reflect.Value{}, } if len(unused) == 0 { - delta = len(list) + addChan <- len(list) used[len(list)] = true list = append(list, ch) selectCases = append(selectCases, nelem) @@ -118,7 +114,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti which = x break } - delta = which + addChan <- which used[which] = true delete(unused, which) list[which] = ch @@ -135,21 +131,17 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti } //deleted <- struct{}{} // <-reloadTX //extra communication } - reloadTX <- struct{}{} - if mode == 2 { - reloadTX <- struct{}{} //I'm finished + for range viewCnt { + reload <- struct{}{} } - <-reloadRX - if mode == 2 { - <-reloadRX //ok I'll cleanup - } - delta = -1 + } }() go func() { + viewCnt++ selectCases[0] = reflect.SelectCase{ Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(reloadTX), + Chan: reflect.ValueOf(reload), } prevId := -1 for { @@ -164,10 +156,10 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti // // delete(used, id) ret.modify <- modify_t[T]{DELETE, id - 1, nil} - <-reloadTX - reloadRX <- struct{}{} //I'm currently not dealing with other chan - <-reloadTX //waiting for you finished - reloadRX <- struct{}{} //I've read all deltas, you can release them + <-reload + // reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reload //waiting for you finished + // reloadRX <- struct{}{} //I've read all deltas, you can release them continue } if mode == 1 && id != 0 { @@ -179,9 +171,9 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti prevId = id } } else if id == 0 { - reloadRX <- struct{}{} //I'm currently not dealing with other chan - <-reloadTX //waiting for you finished - reloadRX <- struct{}{} //I've read all deltas, you can release them + // reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reload //waiting for you finished + // reloadRX <- struct{}{} //I've read all deltas, you can release them } } }() @@ -190,15 +182,16 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti msgList := make([]T, 0) for { select { - case <-reloadTX: - reloadRX <- struct{}{} //I'm currently not dealing with other chan - <-reloadTX //waiting for you finished + case <-reload: + // reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reload //waiting for you finished + delta, _ := <-addChan if delta != -1 { for _, x := range msgList { list[delta] <- x } } - reloadRX <- struct{}{} //I've read all deltas, you can release them + // reloadRX <- struct{}{} //I've read all deltas, you can release them case msg, _ := <-ret.TX: for id, ch := range list { if used[id] { diff --git a/fsm.go b/fsm.go index 2e71ac5..a70c3c1 100644 --- a/fsm.go +++ b/fsm.go @@ -137,7 +137,7 @@ func handleState() { //goroutine only, handles both write and read handleWaitingToRunning() //case RUNNING: default: - panic("handleState():written wrong!" + stateToStr[state]) + //panic("handleState():written wrong!" + stateToStr[state]) } case EMPTY: switch state { @@ -145,7 +145,7 @@ func handleState() { //goroutine only, handles both write and read handleRunningToWaiting() //case BOOTING: default: - panic("handleState():written wrong!" + stateToStr[state]) + //panic("handleState():written wrong!" + stateToStr[state]) } } case <-multiChan.RX: